• R/O
  • HTTP
  • SSH
  • HTTPS

vapor: Commit

Golang implemented sidechain for Bytom


Commit MetaInfo

Revision3a6cd9640bdb0eed33451a5af7a0e12cfe423165 (tree)
Time2019-07-20 11:58:14
Authorapolloww <32606824+apolloww@user...>
CommiterPaladz

Log Message

txpool: periodically sweep pool for stale Txs (#337)

Change Summary

Incremental Difference

--- a/protocol/txpool.go
+++ b/protocol/txpool.go
@@ -32,6 +32,9 @@ var (
3232 orphanTTL = 60 * time.Second
3333 orphanExpireScanInterval = 30 * time.Second
3434
35+ txTTL = 1 * time.Hour
36+ txExpireScanInterval = 20 * time.Minute
37+
3538 // ErrTransactionNotExist is the pre-defined error message
3639 ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
3740 // ErrPoolIsFull indicates the pool is full
@@ -89,6 +92,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
8992 eventDispatcher: dispatcher,
9093 }
9194 go tp.orphanExpireWorker()
95+ go tp.txExpireWorker()
9296 return tp
9397 }
9498
@@ -100,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
100104 tp.errCache.Add(txHash, err)
101105 }
102106
103-// ExpireOrphan expire all the orphans that before the input time range
104-func (tp *TxPool) ExpireOrphan(now time.Time) {
107+// expireOrphan expire all the orphans that before the input time range
108+func (tp *TxPool) expireOrphan(now time.Time) {
105109 tp.mtx.Lock()
106110 defer tp.mtx.Unlock()
107111
@@ -129,19 +133,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
129133 tp.mtx.Lock()
130134 defer tp.mtx.Unlock()
131135
136+ if txD := tp.removeTransaction(txHash); txD != nil {
137+ atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
138+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
139+ log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
140+ }
141+}
142+
143+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
132144 txD, ok := tp.pool[*txHash]
133145 if !ok {
134- return
146+ return nil
135147 }
136148
137149 for _, output := range txD.Tx.ResultIds {
138150 delete(tp.utxo, *output)
139151 }
140152 delete(tp.pool, *txHash)
141-
142- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
143- tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
144- log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
153+ return txD
145154 }
146155
147156 // GetTransaction return the TxDesc by hash
@@ -201,6 +210,7 @@ func isTransactionZeroOutput(tx *types.Tx) bool {
201210 return false
202211 }
203212
213+//IsDust checks if a tx has zero output
204214 func (tp *TxPool) IsDust(tx *types.Tx) bool {
205215 return isTransactionZeroOutput(tx)
206216 }
@@ -313,7 +323,7 @@ func (tp *TxPool) orphanExpireWorker() {
313323 defer ticker.Stop()
314324
315325 for now := range ticker.C {
316- tp.ExpireOrphan(now)
326+ tp.expireOrphan(now)
317327 }
318328 }
319329
@@ -368,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) {
368378 }
369379 delete(tp.orphans, *hash)
370380 }
381+
382+func (tp *TxPool) txExpireWorker() {
383+ ticker := time.NewTicker(txExpireScanInterval)
384+ defer ticker.Stop()
385+
386+ for now := range ticker.C {
387+ tp.expireTx(now)
388+ }
389+}
390+
391+// expireTx expires all the Txs that before the input time range
392+func (tp *TxPool) expireTx(now time.Time) {
393+ tp.mtx.Lock()
394+ defer tp.mtx.Unlock()
395+
396+ cutOff := now.Add(-txTTL)
397+ for hash, txD := range tp.pool {
398+ if txD.Added.Before(cutOff) {
399+ tp.removeTransaction(&hash)
400+ }
401+ }
402+}
--- a/protocol/txpool_test.go
+++ b/protocol/txpool_test.go
@@ -422,7 +422,7 @@ func TestExpireOrphan(t *testing.T) {
422422 },
423423 }
424424
425- before.ExpireOrphan(time.Unix(1633479701, 0))
425+ before.expireOrphan(time.Unix(1633479701, 0))
426426 if !testutil.DeepEqual(before, want) {
427427 t.Errorf("got %v want %v", before, want)
428428 }
Show on old repository browser