diff --git a/blocksync/blocksync.go b/blocksync/blocksync.go index 959868ce0f..62f4dc5e1b 100644 --- a/blocksync/blocksync.go +++ b/blocksync/blocksync.go @@ -21,6 +21,7 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain" "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/blockchain/blockdao" + "github.com/iotexproject/iotex-core/v2/nodeinfo" "github.com/iotexproject/iotex-core/v2/pkg/fastrand" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" "github.com/iotexproject/iotex-core/v2/pkg/log" @@ -69,6 +70,7 @@ type ( p2pNeighbor Neighbors unicastOutbound UniCastOutbound blockP2pPeer BlockPeer + nodeInfoManager *nodeinfo.InfoManager syncTask *routine.RecurringTask syncStageTask *routine.RecurringTask @@ -138,6 +140,7 @@ func NewBlockSyncer( p2pNeighbor Neighbors, uniCastHandler UniCastOutbound, blockP2pPeer BlockPeer, + nodeInfoManager *nodeinfo.InfoManager, ) (BlockSync, error) { bs := &blockSyncer{ cfg: cfg, @@ -150,6 +153,7 @@ func NewBlockSyncer( unicastOutbound: uniCastHandler, blockP2pPeer: blockP2pPeer, targetHeight: 0, + nodeInfoManager: nodeInfoManager, } if bs.cfg.Interval != 0 { bs.syncTask = routine.NewRecurringTask(bs.sync, bs.cfg.Interval) @@ -224,10 +228,19 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6 repeat = len(peers) } for i := 0; i < repeat; i++ { - peer := peers[fastrand.Uint32n(uint32(len(peers)))] + var peer *peer.AddrInfo + for j := 0; j < 10; j++ { + peer = &peers[fastrand.Uint32n(uint32(len(peers)))] + if !bs.nodeInfoManager.MayHaveBlock(peer.ID.String(), start) { + continue + } + } + if peer == nil { + continue + } if err := bs.unicastOutbound( ctx, - peer, + *peer, &iotexrpc.BlockSync{Start: start, End: end}, ); err != nil { log.L().Error("failed to request blocks", zap.Error(err), zap.String("peer", peer.ID.String()), zap.Uint64("start", start), zap.Uint64("end", end)) diff --git a/blocksync/blocksync_test.go b/blocksync/blocksync_test.go index 7be8b5f9c3..a4564f8370 100644 --- a/blocksync/blocksync_test.go +++ b/blocksync/blocksync_test.go @@ -74,6 +74,7 @@ func newBlockSyncerForTest(cfg Config, chain blockchain.Blockchain, dao blockdao func(string) { return }, + nil, ) if err != nil { return nil, err diff --git a/chainservice/builder.go b/chainservice/builder.go index 9bd18f1dc8..a9dc33686f 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -35,7 +35,6 @@ import ( "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/blockchain/blockdao" "github.com/iotexproject/iotex-core/v2/blockchain/filedao" - "github.com/iotexproject/iotex-core/v2/blockchain/genesis" "github.com/iotexproject/iotex-core/v2/blockindex" "github.com/iotexproject/iotex-core/v2/blockindex/contractstaking" "github.com/iotexproject/iotex-core/v2/blocksync" @@ -537,29 +536,11 @@ func (builder *Builder) buildNodeInfoManager() error { if stk == nil { return errors.New("cannot find staking protocol") } - chain := builder.cs.chain var dm *nodeinfo.InfoManager if builder.cfg.System.Active { - dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, func() []string { - ctx := protocol.WithFeatureCtx( - protocol.WithBlockCtx( - genesis.WithGenesisContext(context.Background(), chain.Genesis()), - protocol.BlockCtx{BlockHeight: chain.TipHeight()}, - ), - ) - candidates, err := stk.ActiveCandidates(ctx, cs.factory, 0) - if err != nil { - log.L().Error("failed to get active candidates", zap.Error(errors.WithStack(err))) - return nil - } - whiteList := make([]string, len(candidates)) - for i := range whiteList { - whiteList[i] = candidates[i].Address - } - return whiteList - }, builder.cfg.Chain.ProducerPrivateKeys()...) + dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, builder.cfg.WakeUpgrade.BlockInterval, builder.cfg.Chain.ProducerPrivateKeys()...) } else { - dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, nil) + dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, builder.cfg.WakeUpgrade.BlockInterval) } builder.cs.nodeInfoManager = dm builder.cs.lifecycle.Add(dm) @@ -653,6 +634,7 @@ func (builder *Builder) buildBlockSyncer() error { p2pAgent.ConnectedPeers, p2pAgent.UnicastOutbound, p2pAgent.BlockPeer, + builder.cs.nodeInfoManager, ) if err != nil { return errors.Wrap(err, "failed to create block syncer") diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 37300956a4..328f583339 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -32,6 +32,7 @@ type ( ActionChanSize uint `yaml:"actionChanSize"` BlockChanSize uint `yaml:"blockChanSize"` BlockSyncChanSize uint `yaml:"blockSyncChanSize"` + BlockSyncLimit uint `yaml:"blockSyncLimit"` ConsensusChanSize uint `yaml:"consensusChanSize"` MiscChanSize uint `yaml:"miscChanSize"` ProcessSyncRequestInterval time.Duration `yaml:"processSyncRequestInterval"` @@ -46,6 +47,7 @@ var ( ActionChanSize: 5000, BlockChanSize: 1000, BlockSyncChanSize: 400, + BlockSyncLimit: 2, ConsensusChanSize: 1000, MiscChanSize: 1000, AccountRateLimit: 100, @@ -131,6 +133,7 @@ func NewDispatcher(cfg Config, verificationFunc VerificationFunc) (Dispatcher, e actionChanSize: cfg.ActionChanSize, blockChanSize: cfg.BlockChanSize, blockSyncSize: cfg.BlockSyncChanSize, + blockSyncLimit: cfg.BlockSyncLimit, consensusSize: cfg.ConsensusChanSize, miscSize: cfg.MiscChanSize, }, func(msg *message) { @@ -236,17 +239,10 @@ func (d *IotxDispatcher) queueMessage(msg *message) { log.L().Warn("chainID has not been registered in dispatcher.", zap.Uint32("chainID", msg.chainID)) return } - queue := d.queueMgr.Queue(msg) - if !subscriber.Filter(msg.msgType, msg.msg, cap(queue)) { - log.L().Debug("Message filtered by subscriber.", zap.Uint32("chainID", msg.chainID), zap.String("msgType", msg.msgType.String())) + if !d.queueMgr.Queue(msg, subscriber) { return } - select { - case queue <- msg: - default: - log.L().Warn("Queue is full.", zap.Any("msgType", msg.msgType)) - } - d.updateMetrics(msg, queue) + d.updateEventAudit(msg.msgType) } // HandleBroadcast handles incoming broadcast message @@ -313,14 +309,6 @@ func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) { d.eventAudit[t]++ } -func (d *IotxDispatcher) updateMetrics(msg *message, queue chan *message) { - d.updateEventAudit(msg.msgType) - subscriber := d.subscriber(msg.chainID) - if subscriber != nil { - subscriber.ReportFullness(msg.ctx, msg.msgType, msg.msg, float32(len(queue))/float32(cap(queue))) - } -} - func (d *IotxDispatcher) filter(msg *message) bool { if msg.msgType != iotexrpc.MessageType_BLOCK_REQUEST { return true diff --git a/dispatcher/msg_queue.go b/dispatcher/msg_queue.go index 8251252243..4deb9be2fc 100644 --- a/dispatcher/msg_queue.go +++ b/dispatcher/msg_queue.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/iotexproject/iotex-proto/golang/iotexrpc" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/pkg/log" ) @@ -18,22 +19,48 @@ const ( ) type ( + queueLimit struct { + counts map[string]int + limit int + mu sync.RWMutex + } msgQueueMgr struct { - queues map[string]msgQueue - wg sync.WaitGroup - handleMsg func(msg *message) - quit chan struct{} + queues map[string]msgQueue + blockRequestPeers queueLimit + wg sync.WaitGroup + handleMsg func(msg *message) + quit chan struct{} } msgQueue chan *message msgQueueConfig struct { actionChanSize uint blockChanSize uint blockSyncSize uint + blockSyncLimit uint consensusSize uint miscSize uint } ) +func (q *queueLimit) increment(peer string) bool { + q.mu.Lock() + defer q.mu.Unlock() + if q.counts[peer] >= q.limit { + return false + } + q.counts[peer]++ + return true +} + +func (q *queueLimit) decrement(peer string) { + q.mu.Lock() + defer q.mu.Unlock() + if q.counts[peer] <= 0 { + return + } + q.counts[peer]-- +} + func newMsgQueueMgr(cfg msgQueueConfig, handler func(msg *message)) *msgQueueMgr { queues := make(map[string]msgQueue) queues[actionQ] = make(chan *message, cfg.actionChanSize) @@ -42,9 +69,10 @@ func newMsgQueueMgr(cfg msgQueueConfig, handler func(msg *message)) *msgQueueMgr queues[consensusQ] = make(chan *message, cfg.consensusSize) queues[miscQ] = make(chan *message, cfg.miscSize) return &msgQueueMgr{ - queues: queues, - handleMsg: handler, - quit: make(chan struct{}), + queues: queues, + blockRequestPeers: queueLimit{counts: make(map[string]int), limit: int(cfg.blockSyncLimit), mu: sync.RWMutex{}}, + handleMsg: handler, + quit: make(chan struct{}), } } @@ -57,6 +85,10 @@ func (m *msgQueueMgr) Start(ctx context.Context) error { m.wg.Add(1) go m.consume(blockQ) + for i := 0; i <= 2; i++ { + m.wg.Add(1) + go m.consume(blockSyncQ) + } m.wg.Add(1) go m.consume(blockSyncQ) @@ -79,6 +111,9 @@ func (m *msgQueueMgr) consume(q string) { for { select { case msg := <-m.queues[q]: + if q == blockSyncQ { + m.blockRequestPeers.decrement(msg.peer) + } m.handleMsg(msg) case <-m.quit: log.L().Debug("message handler is terminated.") @@ -87,17 +122,34 @@ func (m *msgQueueMgr) consume(q string) { } } -func (m *msgQueueMgr) Queue(msg *message) msgQueue { +func (m *msgQueueMgr) Queue(msg *message, subscriber Subscriber) bool { + var queue msgQueue switch msg.msgType { case iotexrpc.MessageType_ACTION, iotexrpc.MessageType_ACTIONS, iotexrpc.MessageType_ACTION_HASH, iotexrpc.MessageType_ACTION_REQUEST: - return m.queues[actionQ] + queue = m.queues[actionQ] case iotexrpc.MessageType_BLOCK: - return m.queues[blockQ] + queue = m.queues[blockQ] case iotexrpc.MessageType_BLOCK_REQUEST: - return m.queues[blockSyncQ] + if !m.blockRequestPeers.increment(msg.peer) { + log.L().Warn("Peer has reached the block sync request limit.", zap.String("peer", msg.peer), zap.Int("limit", m.blockRequestPeers.limit)) + return false + } + queue = m.queues[blockSyncQ] case iotexrpc.MessageType_CONSENSUS: - return m.queues[consensusQ] + queue = m.queues[consensusQ] + default: + queue = m.queues[miscQ] + } + if !subscriber.Filter(msg.msgType, msg.msg, cap(queue)) { + log.L().Debug("Message filtered by subscriber.", zap.Uint32("chainID", msg.chainID), zap.String("msgType", msg.msgType.String())) + return false + } + select { + case queue <- msg: default: - return m.queues[miscQ] + log.L().Warn("Queue is full.", zap.Any("msgType", msg.msgType)) + return false } + subscriber.ReportFullness(msg.ctx, msg.msgType, msg.msg, float32(len(queue))/float32(cap(queue))) + return true } diff --git a/e2etest/nodeinfo_test.go b/e2etest/nodeinfo_test.go index 92cd045616..ac55fedf64 100644 --- a/e2etest/nodeinfo_test.go +++ b/e2etest/nodeinfo_test.go @@ -68,7 +68,7 @@ func TestBroadcastNodeInfo(t *testing.T) { cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test", "contractidxdb.test") require.NoError(err) defer teardown() - cfgSender.NodeInfo.EnableBroadcastNodeInfo = true + cfgSender.NodeInfo.DisableBroadcastNodeInfo = false cfgSender.NodeInfo.BroadcastNodeInfoInterval = time.Second cfgSender.Network.ReconnectInterval = 2 * time.Second srvSender, err := itx.NewServer(cfgSender) @@ -96,11 +96,14 @@ func TestBroadcastNodeInfo(t *testing.T) { require.NoError(srvReciever.Stop(ctxReciever)) }() - // check if there is sender's info in reciever delegatemanager + // check if there is sender's info in receiver delegatemanager addrSender := cfgSender.Chain.ProducerAddress()[0].String() + peer, err := srvSender.P2PAgent().Info() + require.NoError(err) + // TODO: replace with automatic broadcast check after enabling broadcast require.NoError(srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager().BroadcastNodeInfo(context.Background(), []string{addrSender})) require.NoError(testutil.WaitUntil(100*time.Millisecond, 10*time.Second, func() (bool, error) { - _, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeInfo(addrSender) + _, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeInfo(peer.ID.String()) return ok, nil })) } @@ -143,9 +146,8 @@ func TestUnicastNodeInfo(t *testing.T) { dmSender := srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager() err = dmSender.RequestSingleNodeInfoAsync(context.Background(), peerReciever) require.NoError(err) - addrReciever := cfgReceiver.Chain.ProducerAddress()[0].String() require.NoError(testutil.WaitUntil(100*time.Millisecond, 10*time.Second, func() (bool, error) { - _, ok := dmSender.GetNodeInfo(addrReciever) + _, ok := dmSender.GetNodeInfo(peerReciever.ID.String()) return ok, nil })) } diff --git a/nodeinfo/config.go b/nodeinfo/config.go index 67d1f68a90..c681942783 100644 --- a/nodeinfo/config.go +++ b/nodeinfo/config.go @@ -9,16 +9,14 @@ import "time" // Config node config type Config struct { - EnableBroadcastNodeInfo bool `yaml:"enableBroadcastNodeInfo"` + DisableBroadcastNodeInfo bool `yaml:"enableBroadcastNodeInfo"` BroadcastNodeInfoInterval time.Duration `yaml:"broadcastNodeInfoInterval"` - BroadcastListTTL time.Duration `yaml:"broadcastListTTL"` NodeMapSize int `yaml:"nodeMapSize"` } // DefaultConfig is the default config var DefaultConfig = Config{ - EnableBroadcastNodeInfo: false, - BroadcastNodeInfoInterval: 5 * time.Minute, - BroadcastListTTL: 30 * time.Minute, + DisableBroadcastNodeInfo: false, + BroadcastNodeInfoInterval: 1 * time.Minute, NodeMapSize: 1000, } diff --git a/nodeinfo/manager.go b/nodeinfo/manager.go index 69cc3e8379..0b8fabea68 100644 --- a/nodeinfo/manager.go +++ b/nodeinfo/manager.go @@ -7,7 +7,6 @@ package nodeinfo import ( "context" - "slices" "sync/atomic" "time" @@ -52,14 +51,14 @@ type ( // InfoManager manage delegate node info InfoManager struct { lifecycle.Lifecycle - version string - broadcastList atomic.Value // []string, whitelist to force enable broadcast - nodeMap *lru.Cache - transmitter transmitter - chain chain - privKeys map[string]crypto.PrivateKey - addrs []string - getBroadcastListFunc getBroadcastListFunc + version string + broadcastList atomic.Value // []string, whitelist to force enable broadcast + nodeMap *lru.Cache + blockInterval time.Duration + transmitter transmitter + chain chain + privKeys map[string]crypto.PrivateKey + addrs []string } getBroadcastListFunc func() []string @@ -78,7 +77,7 @@ func init() { } // NewInfoManager new info manager -func NewInfoManager(cfg *Config, t transmitter, ch chain, broadcastListFunc getBroadcastListFunc, privKeys ...crypto.PrivateKey) *InfoManager { +func NewInfoManager(cfg *Config, t transmitter, ch chain, blockInterval time.Duration, privKeys ...crypto.PrivateKey) *InfoManager { addrs := make([]string, 0, len(privKeys)) keyMaps := make(map[string]crypto.PrivateKey) for _, privKey := range privKeys { @@ -87,21 +86,20 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, broadcastListFunc getB keyMaps[addr] = privKey } dm := &InfoManager{ - nodeMap: lru.New(cfg.NodeMapSize), - transmitter: t, - chain: ch, - addrs: addrs, - privKeys: keyMaps, - version: version.PackageVersion, - getBroadcastListFunc: broadcastListFunc, + nodeMap: lru.New(cfg.NodeMapSize), + transmitter: t, + chain: ch, + addrs: addrs, + privKeys: keyMaps, + version: version.PackageVersion, + blockInterval: blockInterval, + } + if cfg.DisableBroadcastNodeInfo { + return dm } - dm.broadcastList.Store([]string{}) // init recurring tasks broadcastTask := routine.NewRecurringTask(func() { addrs := dm.addrs - if !cfg.EnableBroadcastNodeInfo { - addrs = dm.inBroadcastList() - } // broadcastlist or nodes who are turned on will broadcast if len(addrs) > 0 { if err := dm.BroadcastNodeInfo(context.Background(), addrs); err != nil { @@ -111,16 +109,12 @@ func NewInfoManager(cfg *Config, t transmitter, ch chain, broadcastListFunc getB log.L().Debug("nodeinfo manager general node disabled node info broadcast") } }, cfg.BroadcastNodeInfoInterval) - updateBroadcastListTask := routine.NewRecurringTask(func() { - dm.updateBroadcastList() - }, cfg.BroadcastListTTL) - dm.AddModels(updateBroadcastListTask, broadcastTask) + dm.AddModels(broadcastTask) return dm } // Start start delegate broadcast task func (dm *InfoManager) Start(ctx context.Context) error { - dm.updateBroadcastList() return dm.OnStart(ctx) } @@ -129,6 +123,21 @@ func (dm *InfoManager) Stop(ctx context.Context) error { return dm.OnStop(ctx) } +// MayHaveBlock check whether the peer may have the block starting from 'start' +func (dm *InfoManager) MayHaveBlock(peerID string, start uint64) bool { + value, ok := dm.nodeMap.Get(peerID) + if !ok { + return false + } + info := value.(Info) + duration := time.Now().Sub(info.Timestamp) + if duration < 0 { + duration = 0 + } + + return int64(info.Height)+duration.Milliseconds()/dm.blockInterval.Milliseconds() > int64(start) +} + // HandleNodeInfo handle node info message func (dm *InfoManager) HandleNodeInfo(ctx context.Context, peerID string, msg *iotextypes.NodeInfo) { log.L().Debug("nodeinfo manager handle node info") @@ -156,11 +165,11 @@ func (dm *InfoManager) HandleNodeInfo(ctx context.Context, peerID string, msg *i // updateNode update node info func (dm *InfoManager) updateNode(node *Info) { - addr := node.Address + id := node.PeerID // update dm.nodeMap - dm.nodeMap.Add(addr, *node) + dm.nodeMap.Add(id, *node) // update metric - _nodeInfoHeightGauge.WithLabelValues(addr, node.Version).Set(float64(node.Height)) + _nodeInfoHeightGauge.WithLabelValues(node.Address, node.Version).Set(float64(node.Height)) } // GetNodeInfo get node info by address @@ -251,25 +260,6 @@ func (dm *InfoManager) genNodeInfoMsg(addrs []string) ([]*iotextypes.NodeInfo, e return infos, nil } -func (dm *InfoManager) inBroadcastList() []string { - list := dm.broadcastList.Load().([]string) - inList := make([]string, 0, len(dm.addrs)) - for _, a := range dm.addrs { - if slices.Contains(list, a) { - inList = append(inList, a) - } - } - return inList -} - -func (dm *InfoManager) updateBroadcastList() { - if dm.getBroadcastListFunc != nil { - list := dm.getBroadcastListFunc() - dm.broadcastList.Store(list) - log.L().Debug("nodeinfo manaager updateBroadcastList", zap.Strings("list", list)) - } -} - func hashNodeInfo(msg *iotextypes.NodeInfoCore) hash.Hash256 { return hash.Hash256b(byteutil.Must(proto.Marshal(msg))) } diff --git a/nodeinfo/manager_test.go b/nodeinfo/manager_test.go index 6845fc80c8..4b2369da82 100644 --- a/nodeinfo/manager_test.go +++ b/nodeinfo/manager_test.go @@ -37,8 +37,8 @@ func TestNewDelegateManager(t *testing.T) { t.Run("disable_broadcast", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - cfg := Config{false, 100 * time.Millisecond, 100 * time.Millisecond, 1000} - dm := NewInfoManager(&cfg, tMock, hMock, getEmptyWhiteList, privK) + cfg := Config{true, 100 * time.Millisecond, 1000} + dm := NewInfoManager(&cfg, tMock, hMock, 2500*time.Millisecond, privK) require.NotNil(dm.nodeMap) require.Equal(tMock, dm.transmitter) require.Equal(hMock, dm.chain) @@ -54,8 +54,8 @@ func TestNewDelegateManager(t *testing.T) { t.Run("enable_broadcast", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - cfg := Config{true, 100 * time.Millisecond, 100 * time.Millisecond, 1000} - dm := NewInfoManager(&cfg, tMock, hMock, getEmptyWhiteList, privK) + cfg := Config{false, 100 * time.Millisecond, 1000} + dm := NewInfoManager(&cfg, tMock, hMock, 2500*time.Millisecond, privK) require.NotNil(dm.nodeMap) require.Equal(tMock, dm.transmitter) require.Equal(hMock, dm.chain) @@ -71,8 +71,8 @@ func TestNewDelegateManager(t *testing.T) { t.Run("delegate_broadcast", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - cfg := Config{true, 100 * time.Millisecond, 100 * time.Millisecond, 1000} - dm := NewInfoManager(&cfg, tMock, hMock, getEmptyWhiteList, privK) + cfg := Config{false, 100 * time.Millisecond, 1000} + dm := NewInfoManager(&cfg, tMock, hMock, 2500*time.Millisecond, privK) require.NotNil(dm.nodeMap) require.Equal(tMock, dm.transmitter) require.Equal(hMock, dm.chain) @@ -108,10 +108,10 @@ func TestDelegateManager_HandleNodeInfo(t *testing.T) { } hash := hashNodeInfo(msg.Info) msg.Signature, _ = privKey.Sign(hash[:]) - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) dm.HandleNodeInfo(context.Background(), "abc", msg) addr := msg.Info.Address - nodeGot, ok := dm.nodeMap.Get(addr) + nodeGot, ok := dm.nodeMap.Get("abc") require.True(ok) nodeInfo := nodeGot.(Info) require.Equal(msg.Info.Height, nodeInfo.Height) @@ -136,7 +136,7 @@ func TestDelegateManager_HandleNodeInfo(t *testing.T) { }, Signature: []byte("xxxx"), } - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) dm.HandleNodeInfo(context.Background(), "abc", msg) addr := msg.Info.Address _, ok := dm.nodeMap.Get(addr) @@ -158,7 +158,7 @@ func TestDelegateManager_BroadcastNodeInfo(t *testing.T) { t.Run("update_self", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) height := uint64(200) peerID, err := peer.IDFromBytes([]byte("12D3KooWF2fns5ZWKbPfx2U1wQDdxoTK2D6HC3ortbSAQYR4BQp4")) require.NoError(err) @@ -168,7 +168,7 @@ func TestDelegateManager_BroadcastNodeInfo(t *testing.T) { err = dm.BroadcastNodeInfo(context.Background(), []string{nodeAddr}) require.NoError(err) addr := privKey.PublicKey().Address().String() - nodeGot, ok := dm.nodeMap.Get(addr) + nodeGot, ok := dm.nodeMap.Get(peerID.String()) require.True(ok) nodeInfo := nodeGot.(Info) require.Equal(height, nodeInfo.Height) @@ -189,7 +189,7 @@ func TestDelegateManager_HandleNodeInfoRequest(t *testing.T) { t.Run("unicast", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) height := uint64(200) var sig []byte message := &iotextypes.NodeInfo{} @@ -219,7 +219,7 @@ func TestDelegateManager_RequestSingleNodeInfoAsync(t *testing.T) { t.Run("request_single", func(t *testing.T) { hMock := mock_nodeinfo.NewMockchain(ctrl) tMock := mock_nodeinfo.NewMocktransmitter(ctrl) - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) var paramPeer peer.AddrInfo var paramMsg *iotextypes.NodeInfoRequest peerID, err := peer.IDFromBytes([]byte("12D3KooWF2fns5ZWKbPfx2U1wQDdxoTK2D6HC3ortbSAQYR4BQp4")) @@ -245,17 +245,17 @@ func TestDelegateManager_GetNodeByAddr(t *testing.T) { privKey, err := crypto.GenerateKey() require.NoError(err) - dm := NewInfoManager(&DefaultConfig, tMock, hMock, getEmptyWhiteList, privKey) - dm.updateNode(&Info{Address: "1"}) - dm.updateNode(&Info{Address: "2"}) + dm := NewInfoManager(&DefaultConfig, tMock, hMock, 2500*time.Millisecond, privKey) + dm.updateNode(&Info{PeerID: "1"}) + dm.updateNode(&Info{PeerID: "2"}) t.Run("exist", func(t *testing.T) { info, ok := dm.GetNodeInfo("1") require.True(ok) - require.Equal(Info{Address: "1"}, info) + require.Equal(Info{PeerID: "1"}, info) info, ok = dm.GetNodeInfo("2") require.True(ok) - require.Equal(Info{Address: "2"}, info) + require.Equal(Info{PeerID: "2"}, info) }) t.Run("not_exist", func(t *testing.T) { _, ok := dm.GetNodeInfo("3")