Created
June 30, 2021 22:26
-
-
Save bogatyy/de048d747c73ca4069c142899c0071fb to your computer and use it in GitHub Desktop.
geth_p2p.patch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go | |
index 3177a877e..6b77b3e58 100644 | |
--- a/eth/fetcher/block_fetcher.go | |
+++ b/eth/fetcher/block_fetcher.go | |
@@ -97,6 +97,9 @@ type chainInsertFn func(types.Blocks) (int, error) | |
// peerDropFn is a callback type for dropping a peer detected as malicious. | |
type peerDropFn func(id string) | |
+// peerStatsUpdateFn is a callback type for reporting peer events for stats update. | |
+type peerStatsUpdateFn func(peer string, event string) | |
+ | |
// blockAnnounce is the hash notification of the availability of a new block in the | |
// network. | |
type blockAnnounce struct { | |
@@ -187,6 +190,7 @@ type BlockFetcher struct { | |
insertHeaders headersInsertFn // Injects a batch of headers into the chain | |
insertChain chainInsertFn // Injects a batch of blocks into the chain | |
dropPeer peerDropFn // Drops a peer for misbehaving | |
+ updatePeerStats peerStatsUpdateFn // Callback to update peer stats | |
// Testing hooks | |
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list | |
@@ -197,7 +201,7 @@ type BlockFetcher struct { | |
} | |
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. | |
-func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { | |
+func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, updatePeerStats peerStatsUpdateFn) *BlockFetcher { | |
return &BlockFetcher{ | |
light: light, | |
notify: make(chan *blockAnnounce), | |
@@ -222,6 +226,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr | |
insertHeaders: insertHeaders, | |
insertChain: insertChain, | |
dropPeer: dropPeer, | |
+ updatePeerStats: updatePeerStats, | |
} | |
} | |
@@ -788,6 +793,11 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { | |
// Run the import on a new thread | |
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) | |
+ | |
+ if f.updatePeerStats != nil { | |
+ f.updatePeerStats(peer, "importBlocks") | |
+ } | |
+ | |
go func() { | |
defer func() { f.done <- hash }() | |
diff --git a/eth/handler.go b/eth/handler.go | |
index aff4871af..7e449fb59 100644 | |
--- a/eth/handler.go | |
+++ b/eth/handler.go | |
@@ -108,6 +108,7 @@ type handler struct { | |
blockFetcher *fetcher.BlockFetcher | |
txFetcher *fetcher.TxFetcher | |
peers *peerSet | |
+ peersStats *peerSetStats | |
eventMux *event.TypeMux | |
txsCh chan core.NewTxsEvent | |
@@ -143,6 +144,7 @@ func newHandler(config *handlerConfig) (*handler, error) { | |
txsyncCh: make(chan *txsync), | |
quitSync: make(chan struct{}), | |
} | |
+ h.peersStats = newPeerSetStats(h.peers) | |
if config.Sync == downloader.FullSync { | |
// The database seems empty as the current block is the genesis. Yet the fast | |
// block is ahead, so fast sync was enabled for this node at a certain point. | |
@@ -219,7 +221,7 @@ func newHandler(config *handlerConfig) (*handler, error) { | |
} | |
return n, err | |
} | |
- h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) | |
+ h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer, h.peersStats.UpdatePeerStats) | |
fetchTx := func(peer string, hashes []common.Hash) error { | |
p := h.peers.peer(peer) | |
@@ -438,6 +440,14 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { | |
hash := block.Hash() | |
peers := h.peers.peersWithoutBlock(hash) | |
+ if propagate { | |
+ h.peersStats.ReportPeersWithoutBlock(peers) | |
+ } | |
+ // Print peers stats every 100 blocks | |
+ if block.NumberU64() % 100 == 0 { | |
+ h.peersStats.LogStats() | |
+ } | |
+ | |
// If propagation is requested, send to a subset of the peer | |
if propagate { | |
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid) | |
@@ -448,12 +458,20 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { | |
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) | |
return | |
} | |
- // Send the block to a subset of our peers | |
- transfer := peers[:int(math.Sqrt(float64(len(peers))))] | |
- for _, peer := range transfer { | |
- peer.AsyncSendNewBlock(block, td) | |
+ // Prioritize sending to trusted peers | |
+ log.Warn("Broadcasting block to trusted peers", "number", block.Number(), "hash", hash) | |
+ for _, peer := range peers { | |
+ if peer.Peer.Info().Network.Trusted { | |
+ peer.AsyncSendNewBlock(block, td) | |
+ } | |
+ } | |
+ // Send to all remaining peers as well | |
+ for _, peer := range peers { | |
+ if !peer.Peer.Info().Network.Trusted { | |
+ peer.AsyncSendNewBlock(block, td) | |
+ } | |
} | |
- log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) | |
+ log.Trace("Propagated block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) | |
return | |
} | |
// Otherwise if the block is indeed in out own chain, announce it | |
diff --git a/eth/peerset.go b/eth/peerset.go | |
index 1e864a8e4..8d5293e41 100644 | |
--- a/eth/peerset.go | |
+++ b/eth/peerset.go | |
@@ -24,6 +24,7 @@ import ( | |
"github.com/ethereum/go-ethereum/common" | |
"github.com/ethereum/go-ethereum/eth/protocols/eth" | |
"github.com/ethereum/go-ethereum/eth/protocols/snap" | |
+ "github.com/ethereum/go-ethereum/log" | |
"github.com/ethereum/go-ethereum/p2p" | |
) | |
@@ -257,3 +258,54 @@ func (ps *peerSet) close() { | |
} | |
ps.closed = true | |
} | |
+ | |
+type peerSetStats struct { | |
+ ps *peerSet | |
+ importedBlocksFrom map[string]int | |
+ totalBroadcastsTo map[string]int | |
+ broadcastsToWithoutBlock map[string]int | |
+ lock sync.RWMutex | |
+} | |
+ | |
+func newPeerSetStats(ps *peerSet) *peerSetStats { | |
+ return &peerSetStats{ | |
+ ps: ps, | |
+ importedBlocksFrom: make(map[string]int), | |
+ totalBroadcastsTo: make(map[string]int), | |
+ broadcastsToWithoutBlock: make(map[string]int), | |
+ } | |
+} | |
+ | |
+func (s *peerSetStats) ReportPeersWithoutBlock(peers []*ethPeer) { | |
+ s.lock.Lock() | |
+ defer s.lock.Unlock() | |
+ | |
+ for _, p := range s.ps.peers { | |
+ s.totalBroadcastsTo[p.Peer.ID()] += 1 | |
+ } | |
+ for _, p := range peers { | |
+ s.broadcastsToWithoutBlock[p.Peer.ID()] += 1 | |
+ } | |
+} | |
+ | |
+func (s *peerSetStats) UpdatePeerStats(peer string, event string) { | |
+ s.lock.Lock() | |
+ defer s.lock.Unlock() | |
+ | |
+ if event == "importBlocks" { | |
+ s.importedBlocksFrom[peer] += 1 | |
+ } | |
+} | |
+ | |
+func (s *peerSetStats) LogStats() { | |
+ s.lock.RLock() | |
+ defer s.lock.RUnlock() | |
+ | |
+ log.Info("Peers:") | |
+ for _, p := range s.ps.peers { | |
+ log.Info("Peer: ", "id", p.Peer.ID(), "enode", p.Peer.Node().URLv4(), | |
+ "importedBlocksFrom", s.importedBlocksFrom[p.Peer.ID()], | |
+ "totalBroadcastsTo", s.totalBroadcastsTo[p.Peer.ID()], | |
+ "broadcastsToWithoutBlock", s.broadcastsToWithoutBlock[p.Peer.ID()]) | |
+ } | |
+} | |
diff --git a/les/fetcher.go b/les/fetcher.go | |
index a6d1c93c4..38a4f8417 100644 | |
--- a/les/fetcher.go | |
+++ b/les/fetcher.go | |
@@ -181,7 +181,7 @@ func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *se | |
chaindb: chaindb, | |
chain: chain, | |
reqDist: reqDist, | |
- fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper), | |
+ fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper, nil), | |
peers: make(map[enode.ID]*fetcherPeer), | |
synchronise: syncFn, | |
announceCh: make(chan *announce), |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment