• R/O
  • HTTP
  • SSH
  • HTTPS

vapor: Commit

Golang implemented sidechain for Bytom


Commit MetaInfo

Revisionbfbefbfbac7a98c35b04d5caeaea31c5d5a35101 (tree)
Time2019-06-19 14:12:09
Authormars <mars@byto...>
Commitermars

Log Message

Merge branch 'input' of https://github.com/bytom/vapor into input

Change Summary

Incremental Difference

--- a/api/api.go
+++ b/api/api.go
@@ -12,7 +12,6 @@ import (
1212 cmn "github.com/tendermint/tmlibs/common"
1313
1414 "github.com/vapor/accesstoken"
15- "github.com/vapor/blockchain/txfeed"
1615 cfg "github.com/vapor/config"
1716 "github.com/vapor/dashboard/dashboard"
1817 "github.com/vapor/dashboard/equity"
@@ -112,7 +111,6 @@ type API struct {
112111 chain *protocol.Chain
113112 server *http.Server
114113 handler http.Handler
115- txFeedTracker *txfeed.Tracker
116114 blockProposer *blockproposer.BlockProposer
117115 notificationMgr *websocket.WSNotificationManager
118116 eventDispatcher *event.Dispatcher
@@ -180,13 +178,12 @@ type NetSync interface {
180178 }
181179
182180 // NewAPI create and initialize the API
183-func NewAPI(sync NetSync, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, blockProposer *blockproposer.BlockProposer, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, dispatcher *event.Dispatcher, notificationMgr *websocket.WSNotificationManager) *API {
181+func NewAPI(sync NetSync, wallet *wallet.Wallet, blockProposer *blockproposer.BlockProposer, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, dispatcher *event.Dispatcher, notificationMgr *websocket.WSNotificationManager) *API {
184182 api := &API{
185183 sync: sync,
186184 wallet: wallet,
187185 chain: chain,
188186 accessTokens: token,
189- txFeedTracker: txfeeds,
190187 blockProposer: blockProposer,
191188 eventDispatcher: dispatcher,
192189 notificationMgr: notificationMgr,
@@ -266,12 +263,6 @@ func (a *API) buildHandler() {
266263 m.Handle("/delete-access-token", jsonHandler(a.deleteAccessToken))
267264 m.Handle("/check-access-token", jsonHandler(a.checkAccessToken))
268265
269- m.Handle("/create-transaction-feed", jsonHandler(a.createTxFeed))
270- m.Handle("/get-transaction-feed", jsonHandler(a.getTxFeed))
271- m.Handle("/update-transaction-feed", jsonHandler(a.updateTxFeed))
272- m.Handle("/delete-transaction-feed", jsonHandler(a.deleteTxFeed))
273- m.Handle("/list-transaction-feeds", jsonHandler(a.listTxFeeds))
274-
275266 m.Handle("/submit-transaction", jsonHandler(a.submit))
276267 m.Handle("/submit-transactions", jsonHandler(a.submitTxs))
277268 m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas))
--- a/api/txfeeds.go
+++ /dev/null
@@ -1,107 +0,0 @@
1-package api
2-
3-import (
4- "context"
5- "encoding/json"
6-
7- log "github.com/sirupsen/logrus"
8-
9- "github.com/vapor/blockchain/txfeed"
10- "github.com/vapor/errors"
11-)
12-
13-// POST /create-txfeed
14-func (a *API) createTxFeed(ctx context.Context, in struct {
15- Alias string `json:"alias"`
16- Filter string `json:"filter"`
17-}) Response {
18- if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
19- log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Add TxFeed Failed")
20- return NewErrorResponse(err)
21- }
22- return NewSuccessResponse(nil)
23-}
24-
25-// POST /get-transaction-feed
26-func (a *API) getTxFeed(ctx context.Context, in struct {
27- Alias string `json:"alias,omitempty"`
28-}) Response {
29- var tmpTxFeed interface{}
30- rawTxfeed, err := a.GetTxFeedByAlias(ctx, in.Alias)
31- if err != nil {
32- return NewErrorResponse(err)
33- }
34- err = json.Unmarshal(rawTxfeed, &tmpTxFeed)
35- if err != nil {
36- return NewErrorResponse(err)
37- }
38- data := map[string]interface{}{"txfeed": tmpTxFeed}
39- return NewSuccessResponse(data)
40-}
41-
42-// POST /delete-transaction-feed
43-func (a *API) deleteTxFeed(ctx context.Context, in struct {
44- Alias string `json:"alias,omitempty"`
45-}) Response {
46- if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
47- return NewErrorResponse(err)
48- }
49- return NewSuccessResponse(nil)
50-}
51-
52-// POST /update-transaction-feed
53-func (a *API) updateTxFeed(ctx context.Context, in struct {
54- Alias string `json:"alias"`
55- Filter string `json:"filter"`
56-}) Response {
57- if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
58- return NewErrorResponse(err)
59- }
60- if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
61- log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Update TxFeed Failed")
62- return NewErrorResponse(err)
63- }
64- return NewSuccessResponse(nil)
65-}
66-
67-func (a *API) getTxFeeds() ([]txfeed.TxFeed, error) {
68- txFeed := txfeed.TxFeed{}
69- txFeeds := make([]txfeed.TxFeed, 0)
70-
71- iter := a.txFeedTracker.DB.Iterator()
72- defer iter.Release()
73-
74- for iter.Next() {
75- if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
76- return nil, err
77- }
78- txFeeds = append(txFeeds, txFeed)
79- }
80-
81- return txFeeds, nil
82-}
83-
84-// listTxFeeds is an http handler for listing txfeeds. It does not take a filter.
85-// POST /list-transaction-feeds
86-func (a *API) listTxFeeds(ctx context.Context) Response {
87- txFeeds, err := a.getTxFeeds()
88- if err != nil {
89- return NewErrorResponse(err)
90- }
91-
92- return NewSuccessResponse(txFeeds)
93-}
94-
95-func (a *API) GetTxFeedByAlias(ctx context.Context, filter string) ([]byte, error) {
96- jf, err := json.Marshal(filter)
97- if err != nil {
98- return nil, err
99- }
100-
101- value := a.txFeedTracker.DB.Get(jf)
102- if value == nil {
103- return nil, errors.New("No transaction feed")
104- }
105-
106- return value, nil
107-}
--- a/blockchain/txfeed/txfeed.go
+++ /dev/null
@@ -1,398 +0,0 @@
1-package txfeed
2-
3-import (
4- "context"
5- "encoding/json"
6- "strconv"
7- "strings"
8-
9- log "github.com/sirupsen/logrus"
10-
11- "github.com/vapor/blockchain/query"
12- dbm "github.com/vapor/database/leveldb"
13- "github.com/vapor/errors"
14- "github.com/vapor/protocol"
15- "github.com/vapor/protocol/bc"
16- "github.com/vapor/protocol/bc/types"
17- "github.com/vapor/protocol/vm/vmutil"
18-)
19-
20-const (
21- //FilterNumMax max txfeed filter amount.
22- FilterNumMax = 1024
23- logModule = "txfeed"
24-)
25-
26-var (
27- //ErrDuplicateAlias means error of duplicate feed alias.
28- ErrDuplicateAlias = errors.New("duplicate feed alias")
29- //ErrEmptyAlias means error of empty feed alias.
30- ErrEmptyAlias = errors.New("empty feed alias")
31- //ErrNumExceedlimit means txfeed filter number exceeds the limit.
32- ErrNumExceedlimit = errors.New("txfeed exceed limit")
33- maxNewTxfeedChSize = 1000
34-)
35-
36-//Tracker filter tracker object.
37-type Tracker struct {
38- DB dbm.DB
39- TxFeeds []*TxFeed
40- chain *protocol.Chain
41- txfeedCh chan *types.Tx
42-}
43-
44-type rawOutput struct {
45- OutputID bc.Hash
46- bc.AssetAmount
47- ControlProgram []byte
48- txHash bc.Hash
49- outputIndex uint32
50- sourceID bc.Hash
51- sourcePos uint64
52- refData bc.Hash
53-}
54-
55-//TxFeed describe a filter
56-type TxFeed struct {
57- ID string `json:"id,omitempty"`
58- Alias string `json:"alias"`
59- Filter string `json:"filter,omitempty"`
60- Param filter `json:"param,omitempty"`
61-}
62-
63-type filter struct {
64- AssetID string `json:"assetid,omitempty"`
65- AmountLowerLimit uint64 `json:"lowerlimit,omitempty"`
66- AmountUpperLimit uint64 `json:"upperlimit,omitempty"`
67- TransType string `json:"transtype,omitempty"`
68-}
69-
70-//NewTracker create new txfeed tracker.
71-func NewTracker(db dbm.DB, chain *protocol.Chain) *Tracker {
72- s := &Tracker{
73- DB: db,
74- TxFeeds: make([]*TxFeed, 0, 10),
75- chain: chain,
76- txfeedCh: make(chan *types.Tx, maxNewTxfeedChSize),
77- }
78-
79- return s
80-}
81-
82-func loadTxFeed(db dbm.DB, txFeeds []*TxFeed) ([]*TxFeed, error) {
83- iter := db.Iterator()
84- defer iter.Release()
85-
86- for iter.Next() {
87- txFeed := &TxFeed{}
88- if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
89- return nil, err
90- }
91- filter, err := parseFilter(txFeed.Filter)
92- if err != nil {
93- return nil, err
94- }
95- txFeed.Param = filter
96- txFeeds = append(txFeeds, txFeed)
97- }
98- return txFeeds, nil
99-}
100-
101-func parseFilter(ft string) (filter, error) {
102- var res filter
103-
104- subFilter := strings.Split(ft, "AND")
105- for _, value := range subFilter {
106- param := getParam(value, "=")
107- if param == "" {
108- continue
109- }
110- if strings.Contains(value, "asset_id") {
111- res.AssetID = param
112- }
113- if strings.Contains(value, "amount_lower_limit") {
114- tmp, _ := strconv.ParseInt(param, 10, 64)
115- res.AmountLowerLimit = uint64(tmp)
116- }
117- if strings.Contains(value, "amount_upper_limit") {
118- tmp, _ := strconv.ParseInt(param, 10, 64)
119- res.AmountUpperLimit = uint64(tmp)
120- }
121- if strings.Contains(value, "trans_type") {
122- res.TransType = param
123- }
124- }
125- return res, nil
126-}
127-
128-//TODO
129-func getParam(str, substr string) string {
130- if result := strings.Index(str, substr); result >= 0 {
131- str := strings.Replace(str[result+1:], "'", "", -1)
132- str = strings.Replace(str, " ", "", -1)
133- return str
134- }
135- return ""
136-}
137-
138-func parseTxfeed(db dbm.DB, filters []filter) error {
139- var txFeed TxFeed
140- var index int
141-
142- iter := db.Iterator()
143- defer iter.Release()
144-
145- for iter.Next() {
146-
147- if err := json.Unmarshal(iter.Value(), &txFeed); err != nil {
148- return err
149- }
150-
151- subFilter := strings.Split(txFeed.Filter, "AND")
152- for _, value := range subFilter {
153- param := getParam(value, "=")
154- if param == "" {
155- continue
156- }
157- if strings.Contains(value, "asset_id") {
158- filters[index].AssetID = param
159- }
160- if strings.Contains(value, "amount_lower_limit") {
161- tmp, _ := strconv.ParseInt(param, 10, 64)
162- filters[index].AmountLowerLimit = uint64(tmp)
163- }
164- if strings.Contains(value, "amount_upper_limit") {
165- tmp, _ := strconv.ParseInt(param, 10, 64)
166- filters[index].AmountUpperLimit = uint64(tmp)
167- }
168- if strings.Contains(value, "trans_type") {
169- filters[index].TransType = param
170- }
171- }
172- index++
173- }
174- return nil
175-}
176-
177-//Prepare load and parse filters.
178-func (t *Tracker) Prepare(ctx context.Context) error {
179- var err error
180- t.TxFeeds, err = loadTxFeed(t.DB, t.TxFeeds)
181- return err
182-}
183-
184-//GetTxfeedCh return a txfeed channel.
185-func (t *Tracker) GetTxfeedCh() chan *types.Tx {
186- return t.txfeedCh
187-}
188-
189-//Create create a txfeed filter.
190-func (t *Tracker) Create(ctx context.Context, alias, fil string) error {
191- // Validate the filter.
192-
193- if err := query.ValidateTransactionFilter(fil); err != nil {
194- return err
195- }
196-
197- if alias == "" {
198- return errors.WithDetail(ErrEmptyAlias, "a transaction feed with empty alias")
199- }
200-
201- if len(t.TxFeeds) >= FilterNumMax {
202- return errors.WithDetail(ErrNumExceedlimit, "txfeed number exceed limit")
203- }
204-
205- for _, txfeed := range t.TxFeeds {
206- if txfeed.Alias == alias {
207- return errors.WithDetail(ErrDuplicateAlias, "txfeed alias must unique")
208- }
209- }
210-
211- feed := &TxFeed{
212- Alias: alias,
213- Filter: fil,
214- }
215-
216- filter, err := parseFilter(feed.Filter)
217- if err != nil {
218- return err
219- }
220- feed.Param = filter
221- t.TxFeeds = append(t.TxFeeds, feed)
222- return insertTxFeed(t.DB, feed)
223-}
224-
225-func deleteTxFeed(db dbm.DB, alias string) error {
226- key, err := json.Marshal(alias)
227- if err != nil {
228- return err
229- }
230- db.Delete(key)
231- return nil
232-}
233-
234-// insertTxFeed adds the txfeed to the database. If the txfeed has a client token,
235-// and there already exists a txfeed with that client token, insertTxFeed will
236-// lookup and return the existing txfeed instead.
237-func insertTxFeed(db dbm.DB, feed *TxFeed) error {
238- // var err error
239- key, err := json.Marshal(feed.Alias)
240- if err != nil {
241- return err
242- }
243- value, err := json.Marshal(feed)
244- if err != nil {
245- return err
246- }
247-
248- db.Set(key, value)
249- return nil
250-}
251-
252-//Get get txfeed filter with alias.
253-func (t *Tracker) Get(ctx context.Context, alias string) (*TxFeed, error) {
254- if alias == "" {
255- return nil, errors.WithDetail(ErrEmptyAlias, "get transaction feed with empty alias")
256- }
257-
258- for i, v := range t.TxFeeds {
259- if v.Alias == alias {
260- return t.TxFeeds[i], nil
261- }
262- }
263- return nil, nil
264-}
265-
266-//Delete delete txfeed with alias.
267-func (t *Tracker) Delete(ctx context.Context, alias string) error {
268- log.WithFields(log.Fields{"module": logModule, "delete": alias}).Info("delete txfeed")
269-
270- if alias == "" {
271- return errors.WithDetail(ErrEmptyAlias, "del transaction feed with empty alias")
272- }
273-
274- for i, txfeed := range t.TxFeeds {
275- if txfeed.Alias == alias {
276- t.TxFeeds = append(t.TxFeeds[:i], t.TxFeeds[i+1:]...)
277- return deleteTxFeed(t.DB, alias)
278- }
279- }
280- return nil
281-}
282-
283-func outputFilter(txfeed *TxFeed, value *query.AnnotatedOutput) bool {
284- assetidstr := value.AssetID.String()
285-
286- if txfeed.Param.AssetID != assetidstr && txfeed.Param.AssetID != "" {
287- return false
288- }
289- if txfeed.Param.TransType != value.Type && txfeed.Param.TransType != "" {
290- return false
291- }
292- if txfeed.Param.AmountLowerLimit > value.Amount && txfeed.Param.AmountLowerLimit != 0 {
293- return false
294- }
295- if txfeed.Param.AmountUpperLimit < value.Amount && txfeed.Param.AmountUpperLimit != 0 {
296- return false
297- }
298-
299- return true
300-}
301-
302-//TxFilter filter tx from mempool.
303-func (t *Tracker) TxFilter(tx *types.Tx) error {
304- var annotatedTx *query.AnnotatedTx
305- // Build the fully annotated transaction.
306- annotatedTx = buildAnnotatedTransaction(tx)
307- for _, output := range annotatedTx.Outputs {
308- for _, filter := range t.TxFeeds {
309- if match := outputFilter(filter, output); !match {
310- continue
311- }
312- b, err := json.Marshal(annotatedTx)
313- if err != nil {
314- return err
315- }
316- log.WithFields(log.Fields{"module:": logModule, "filter": string(b)}).Info("find new tx match filter")
317- t.txfeedCh <- tx
318- }
319- }
320- return nil
321-}
322-
323-var emptyJSONObject = json.RawMessage(`{}`)
324-
325-func buildAnnotatedTransaction(orig *types.Tx) *query.AnnotatedTx {
326- tx := &query.AnnotatedTx{
327- ID: orig.ID,
328- Inputs: make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
329- Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
330- }
331-
332- for i := range orig.Inputs {
333- tx.Inputs = append(tx.Inputs, buildAnnotatedInput(orig, uint32(i)))
334- }
335- for i := range orig.Outputs {
336- tx.Outputs = append(tx.Outputs, buildAnnotatedOutput(orig, i))
337- }
338- return tx
339-}
340-
341-func buildAnnotatedInput(tx *types.Tx, i uint32) *query.AnnotatedInput {
342- orig := tx.Inputs[i]
343- in := &query.AnnotatedInput{
344- AssetID: orig.AssetID(),
345- Amount: orig.Amount(),
346- AssetDefinition: &emptyJSONObject,
347- }
348-
349- id := tx.Tx.InputIDs[i]
350- e := tx.Entries[id]
351- switch e := e.(type) {
352- case *bc.Spend:
353- in.Type = "spend"
354- in.ControlProgram = orig.ControlProgram()
355- in.SpentOutputID = e.SpentOutputId
356-
357- case *bc.CrossChainInput:
358- in.Type = "cross_chain_in"
359- in.ControlProgram = orig.ControlProgram()
360- in.SpentOutputID = e.MainchainOutputId
361-
362- case *bc.VetoInput:
363- in.Type = "veto"
364- in.ControlProgram = orig.ControlProgram()
365- in.SpentOutputID = e.SpentOutputId
366- }
367-
368- return in
369-}
370-
371-func buildAnnotatedOutput(tx *types.Tx, idx int) *query.AnnotatedOutput {
372- orig := tx.Outputs[idx]
373- outid := tx.OutputID(idx)
374- out := &query.AnnotatedOutput{
375- OutputID: *outid,
376- Position: idx,
377- AssetID: *orig.AssetAmount().AssetId,
378- AssetDefinition: &emptyJSONObject,
379- Amount: orig.AssetAmount().Amount,
380- ControlProgram: orig.ControlProgram(),
381- }
382-
383- if vmutil.IsUnspendable(out.ControlProgram) {
384- out.Type = "retire"
385- } else {
386- e := tx.Entries[*outid]
387- switch e.(type) {
388- case *bc.CrossChainOutput:
389- out.Type = "crosschain_output"
390- case *bc.IntraChainOutput:
391- out.Type = "control"
392- case *bc.VoteOutput:
393- out.Type = "vote_control"
394- }
395- }
396-
397- return out
398-}
--- a/node/node.go
+++ b/node/node.go
@@ -1,7 +1,6 @@
11 package node
22
33 import (
4- "context"
54 "encoding/hex"
65 "errors"
76 "net"
@@ -21,7 +20,6 @@ import (
2120 "github.com/vapor/api"
2221 "github.com/vapor/asset"
2322 "github.com/vapor/blockchain/pseudohsm"
24- "github.com/vapor/blockchain/txfeed"
2523 cfg "github.com/vapor/config"
2624 "github.com/vapor/consensus"
2725 "github.com/vapor/database"
@@ -54,15 +52,12 @@ type Node struct {
5452 notificationMgr *websocket.WSNotificationManager
5553 api *api.API
5654 chain *protocol.Chain
57- txfeed *txfeed.Tracker
5855 cpuMiner *blockproposer.BlockProposer
5956 miningEnable bool
6057 }
6158
6259 // NewNode create bytom node
6360 func NewNode(config *cfg.Config) *Node {
64- ctx := context.Background()
65-
6661 if err := lockDataDirectory(config); err != nil {
6762 cmn.Exit("Error: " + err.Error())
6863 }
@@ -107,15 +102,6 @@ func NewNode(config *cfg.Config) *Node {
107102 var accounts *account.Manager
108103 var assets *asset.Registry
109104 var wallet *w.Wallet
110- var txFeed *txfeed.Tracker
111-
112- txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
113- txFeed = txfeed.NewTracker(txFeedDB, chain)
114-
115- if err = txFeed.Prepare(ctx); err != nil {
116- log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed")
117- return nil
118- }
119105
120106 hsm, err := pseudohsm.New(config.KeysDir())
121107 if err != nil {
@@ -163,7 +149,6 @@ func NewNode(config *cfg.Config) *Node {
163149 accessTokens: accessTokens,
164150 wallet: wallet,
165151 chain: chain,
166- txfeed: txFeed,
167152 miningEnable: config.Mining,
168153
169154 notificationMgr: notificationMgr,
@@ -236,7 +221,7 @@ func launchWebBrowser(port string) {
236221 }
237222
238223 func (n *Node) initAndstartAPIServer() {
239- n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
224+ n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
240225
241226 listenAddr := env.String("LISTEN", n.config.ApiAddress)
242227 env.Parse()
Show on old repository browser