• R/O
  • HTTP
  • SSH
  • HTTPS

vapor: Commit

Golang implemented sidechain for Bytom


Commit MetaInfo

Revision3b96051d005b75dc626fd5f921bf398a7dd5a9eb (tree)
Time2019-07-03 16:50:51
Authormars <mars@byto...>
Commitermars

Log Message

add compression

Change Summary

Incremental Difference

--- a/p2p/connection/connection.go
+++ b/p2p/connection/connection.go
@@ -13,6 +13,8 @@ import (
1313 wire "github.com/tendermint/go-wire"
1414 cmn "github.com/tendermint/tmlibs/common"
1515 "github.com/tendermint/tmlibs/flowrate"
16+
17+ "github.com/vapor/common/compression"
1618 )
1719
1820 const (
@@ -33,7 +35,7 @@ const (
3335 defaultSendQueueCapacity = 1
3436 defaultSendRate = int64(104857600) // 100MB/s
3537 defaultRecvBufferCapacity = 4096
36- defaultRecvMessageCapacity = 22020096 // 21MB
38+ defaultRecvMessageCapacity = 22020096 // 21MB
3739 defaultRecvRate = int64(104857600) // 100MB/s
3840 defaultSendTimeout = 10 * time.Second
3941 logModule = "p2p/conn"
@@ -94,6 +96,8 @@ type MConnection struct {
9496 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
9597 pingTimer *time.Ticker // send pings periodically
9698 chStatsTimer *time.Ticker // update channel stats periodically
99+
100+ compression compression.Compression
97101 }
98102
99103 // MConnConfig is a MConnection configuration.
@@ -128,6 +132,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
128132
129133 pingTimer: time.NewTicker(pingTimeout),
130134 chStatsTimer: time.NewTicker(updateState),
135+ compression: compression.NewSnappy(),
131136 }
132137
133138 for _, desc := range chDescs {
@@ -189,7 +194,9 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
189194 return false
190195 }
191196
192- if !channel.sendBytes(wire.BinaryBytes(msg)) {
197+ decode := c.compression.CompressBytes(wire.BinaryBytes(msg))
198+
199+ if !channel.sendBytes(decode) {
193200 log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
194201 return false
195202 }
@@ -220,7 +227,9 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
220227 return false
221228 }
222229
223- ok = channel.trySendBytes(wire.BinaryBytes(msg))
230+ decode := c.compression.CompressBytes(wire.BinaryBytes(msg))
231+
232+ ok = channel.trySendBytes(decode)
224233 if ok {
225234 select {
226235 case c.send <- struct{}{}:
@@ -313,7 +322,12 @@ func (c *MConnection) recvRoutine() {
313322 }
314323
315324 if msgBytes != nil {
316- c.onReceive(pkt.ChannelID, msgBytes)
325+ data, err := c.compression.DecompressBytes(msgBytes)
326+ if err != nil {
327+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
328+ return
329+ }
330+ c.onReceive(pkt.ChannelID, data)
317331 }
318332
319333 default:
--- a/p2p/defaults.go
+++ /dev/null
@@ -1 +0,0 @@
1-package p2p
Show on old repository browser