Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/iotexproject/iotex-core/v2/blockchain"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/blockchain/blockdao"
"github.com/iotexproject/iotex-core/v2/nodeinfo"
"github.com/iotexproject/iotex-core/v2/pkg/fastrand"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/log"
Expand Down Expand Up @@ -69,6 +70,7 @@ type (
p2pNeighbor Neighbors
unicastOutbound UniCastOutbound
blockP2pPeer BlockPeer
nodeInfoManager *nodeinfo.InfoManager

syncTask *routine.RecurringTask
syncStageTask *routine.RecurringTask
Expand Down Expand Up @@ -138,6 +140,7 @@ func NewBlockSyncer(
p2pNeighbor Neighbors,
uniCastHandler UniCastOutbound,
blockP2pPeer BlockPeer,
nodeInfoManager *nodeinfo.InfoManager,
) (BlockSync, error) {
bs := &blockSyncer{
cfg: cfg,
Expand All @@ -150,6 +153,7 @@ func NewBlockSyncer(
unicastOutbound: uniCastHandler,
blockP2pPeer: blockP2pPeer,
targetHeight: 0,
nodeInfoManager: nodeInfoManager,
}
if bs.cfg.Interval != 0 {
bs.syncTask = routine.NewRecurringTask(bs.sync, bs.cfg.Interval)
Expand Down Expand Up @@ -224,10 +228,19 @@ func (bs *blockSyncer) requestBlock(ctx context.Context, start uint64, end uint6
repeat = len(peers)
}
for i := 0; i < repeat; i++ {
peer := peers[fastrand.Uint32n(uint32(len(peers)))]
var peer *peer.AddrInfo
for j := 0; j < 10; j++ {
peer = &peers[fastrand.Uint32n(uint32(len(peers)))]
if !bs.nodeInfoManager.MayHaveBlock(peer.ID.String(), start) {
continue
}
}
if peer == nil {
continue
}
if err := bs.unicastOutbound(
ctx,
peer,
*peer,
&iotexrpc.BlockSync{Start: start, End: end},
); err != nil {
log.L().Error("failed to request blocks", zap.Error(err), zap.String("peer", peer.ID.String()), zap.Uint64("start", start), zap.Uint64("end", end))
Expand Down
1 change: 1 addition & 0 deletions blocksync/blocksync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func newBlockSyncerForTest(cfg Config, chain blockchain.Blockchain, dao blockdao
func(string) {
return
},
nil,
)
if err != nil {
return nil, err
Expand Down
24 changes: 3 additions & 21 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/blockchain/blockdao"
"github.com/iotexproject/iotex-core/v2/blockchain/filedao"
"github.com/iotexproject/iotex-core/v2/blockchain/genesis"
"github.com/iotexproject/iotex-core/v2/blockindex"
"github.com/iotexproject/iotex-core/v2/blockindex/contractstaking"
"github.com/iotexproject/iotex-core/v2/blocksync"
Expand Down Expand Up @@ -537,29 +536,11 @@ func (builder *Builder) buildNodeInfoManager() error {
if stk == nil {
return errors.New("cannot find staking protocol")
}
chain := builder.cs.chain
var dm *nodeinfo.InfoManager
if builder.cfg.System.Active {
dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, func() []string {
ctx := protocol.WithFeatureCtx(
protocol.WithBlockCtx(
genesis.WithGenesisContext(context.Background(), chain.Genesis()),
protocol.BlockCtx{BlockHeight: chain.TipHeight()},
),
)
candidates, err := stk.ActiveCandidates(ctx, cs.factory, 0)
if err != nil {
log.L().Error("failed to get active candidates", zap.Error(errors.WithStack(err)))
return nil
}
whiteList := make([]string, len(candidates))
for i := range whiteList {
whiteList[i] = candidates[i].Address
}
return whiteList
}, builder.cfg.Chain.ProducerPrivateKeys()...)
dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, builder.cfg.WakeUpgrade.BlockInterval, builder.cfg.Chain.ProducerPrivateKeys()...)
} else {
dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, nil)
dm = nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent, cs.chain, builder.cfg.WakeUpgrade.BlockInterval)
}
builder.cs.nodeInfoManager = dm
builder.cs.lifecycle.Add(dm)
Expand Down Expand Up @@ -653,6 +634,7 @@ func (builder *Builder) buildBlockSyncer() error {
p2pAgent.ConnectedPeers,
p2pAgent.UnicastOutbound,
p2pAgent.BlockPeer,
builder.cs.nodeInfoManager,
)
if err != nil {
return errors.Wrap(err, "failed to create block syncer")
Expand Down
22 changes: 5 additions & 17 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
ActionChanSize uint `yaml:"actionChanSize"`
BlockChanSize uint `yaml:"blockChanSize"`
BlockSyncChanSize uint `yaml:"blockSyncChanSize"`
BlockSyncLimit uint `yaml:"blockSyncLimit"`
ConsensusChanSize uint `yaml:"consensusChanSize"`
MiscChanSize uint `yaml:"miscChanSize"`
ProcessSyncRequestInterval time.Duration `yaml:"processSyncRequestInterval"`
Expand All @@ -46,6 +47,7 @@ var (
ActionChanSize: 5000,
BlockChanSize: 1000,
BlockSyncChanSize: 400,
BlockSyncLimit: 2,
ConsensusChanSize: 1000,
MiscChanSize: 1000,
AccountRateLimit: 100,
Expand Down Expand Up @@ -131,6 +133,7 @@ func NewDispatcher(cfg Config, verificationFunc VerificationFunc) (Dispatcher, e
actionChanSize: cfg.ActionChanSize,
blockChanSize: cfg.BlockChanSize,
blockSyncSize: cfg.BlockSyncChanSize,
blockSyncLimit: cfg.BlockSyncLimit,
consensusSize: cfg.ConsensusChanSize,
miscSize: cfg.MiscChanSize,
}, func(msg *message) {
Expand Down Expand Up @@ -236,17 +239,10 @@ func (d *IotxDispatcher) queueMessage(msg *message) {
log.L().Warn("chainID has not been registered in dispatcher.", zap.Uint32("chainID", msg.chainID))
return
}
queue := d.queueMgr.Queue(msg)
if !subscriber.Filter(msg.msgType, msg.msg, cap(queue)) {
log.L().Debug("Message filtered by subscriber.", zap.Uint32("chainID", msg.chainID), zap.String("msgType", msg.msgType.String()))
if !d.queueMgr.Queue(msg, subscriber) {
return
}
select {
case queue <- msg:
default:
log.L().Warn("Queue is full.", zap.Any("msgType", msg.msgType))
}
d.updateMetrics(msg, queue)
d.updateEventAudit(msg.msgType)
}

// HandleBroadcast handles incoming broadcast message
Expand Down Expand Up @@ -313,14 +309,6 @@ func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) {
d.eventAudit[t]++
}

func (d *IotxDispatcher) updateMetrics(msg *message, queue chan *message) {
d.updateEventAudit(msg.msgType)
subscriber := d.subscriber(msg.chainID)
if subscriber != nil {
subscriber.ReportFullness(msg.ctx, msg.msgType, msg.msg, float32(len(queue))/float32(cap(queue)))
}
}

func (d *IotxDispatcher) filter(msg *message) bool {
if msg.msgType != iotexrpc.MessageType_BLOCK_REQUEST {
return true
Expand Down
78 changes: 65 additions & 13 deletions dispatcher/msg_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/iotexproject/iotex-proto/golang/iotexrpc"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/v2/pkg/log"
)
Expand All @@ -18,22 +19,48 @@ const (
)

type (
queueLimit struct {
counts map[string]int
limit int
mu sync.RWMutex
}
msgQueueMgr struct {
queues map[string]msgQueue
wg sync.WaitGroup
handleMsg func(msg *message)
quit chan struct{}
queues map[string]msgQueue
blockRequestPeers queueLimit
wg sync.WaitGroup
handleMsg func(msg *message)
quit chan struct{}
}
msgQueue chan *message
msgQueueConfig struct {
actionChanSize uint
blockChanSize uint
blockSyncSize uint
blockSyncLimit uint
consensusSize uint
miscSize uint
}
)

func (q *queueLimit) increment(peer string) bool {
q.mu.Lock()
defer q.mu.Unlock()
if q.counts[peer] >= q.limit {
return false
}
q.counts[peer]++
return true
}

func (q *queueLimit) decrement(peer string) {
q.mu.Lock()
defer q.mu.Unlock()
if q.counts[peer] <= 0 {
return
}
q.counts[peer]--
}

func newMsgQueueMgr(cfg msgQueueConfig, handler func(msg *message)) *msgQueueMgr {
queues := make(map[string]msgQueue)
queues[actionQ] = make(chan *message, cfg.actionChanSize)
Expand All @@ -42,9 +69,10 @@ func newMsgQueueMgr(cfg msgQueueConfig, handler func(msg *message)) *msgQueueMgr
queues[consensusQ] = make(chan *message, cfg.consensusSize)
queues[miscQ] = make(chan *message, cfg.miscSize)
return &msgQueueMgr{
queues: queues,
handleMsg: handler,
quit: make(chan struct{}),
queues: queues,
blockRequestPeers: queueLimit{counts: make(map[string]int), limit: int(cfg.blockSyncLimit), mu: sync.RWMutex{}},
handleMsg: handler,
quit: make(chan struct{}),
}
}

Expand All @@ -57,6 +85,10 @@ func (m *msgQueueMgr) Start(ctx context.Context) error {
m.wg.Add(1)
go m.consume(blockQ)

for i := 0; i <= 2; i++ {
m.wg.Add(1)
go m.consume(blockSyncQ)
}
m.wg.Add(1)
go m.consume(blockSyncQ)

Expand All @@ -79,6 +111,9 @@ func (m *msgQueueMgr) consume(q string) {
for {
select {
case msg := <-m.queues[q]:
if q == blockSyncQ {
m.blockRequestPeers.decrement(msg.peer)
}
m.handleMsg(msg)
case <-m.quit:
log.L().Debug("message handler is terminated.")
Expand All @@ -87,17 +122,34 @@ func (m *msgQueueMgr) consume(q string) {
}
}

func (m *msgQueueMgr) Queue(msg *message) msgQueue {
func (m *msgQueueMgr) Queue(msg *message, subscriber Subscriber) bool {
var queue msgQueue
switch msg.msgType {
case iotexrpc.MessageType_ACTION, iotexrpc.MessageType_ACTIONS, iotexrpc.MessageType_ACTION_HASH, iotexrpc.MessageType_ACTION_REQUEST:
return m.queues[actionQ]
queue = m.queues[actionQ]
case iotexrpc.MessageType_BLOCK:
return m.queues[blockQ]
queue = m.queues[blockQ]
case iotexrpc.MessageType_BLOCK_REQUEST:
return m.queues[blockSyncQ]
if !m.blockRequestPeers.increment(msg.peer) {
log.L().Warn("Peer has reached the block sync request limit.", zap.String("peer", msg.peer), zap.Int("limit", m.blockRequestPeers.limit))
return false
}
queue = m.queues[blockSyncQ]
case iotexrpc.MessageType_CONSENSUS:
return m.queues[consensusQ]
queue = m.queues[consensusQ]
default:
queue = m.queues[miscQ]
}
if !subscriber.Filter(msg.msgType, msg.msg, cap(queue)) {
log.L().Debug("Message filtered by subscriber.", zap.Uint32("chainID", msg.chainID), zap.String("msgType", msg.msgType.String()))
return false
}
select {
case queue <- msg:
default:
return m.queues[miscQ]
log.L().Warn("Queue is full.", zap.Any("msgType", msg.msgType))
return false
}
subscriber.ReportFullness(msg.ctx, msg.msgType, msg.msg, float32(len(queue))/float32(cap(queue)))
return true
}
12 changes: 7 additions & 5 deletions e2etest/nodeinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestBroadcastNodeInfo(t *testing.T) {
cfgSender, teardown, err := newConfigForNodeInfoTest("trie.test", "db.test", "indexdb.test", "contractidxdb.test")
require.NoError(err)
defer teardown()
cfgSender.NodeInfo.EnableBroadcastNodeInfo = true
cfgSender.NodeInfo.DisableBroadcastNodeInfo = false
cfgSender.NodeInfo.BroadcastNodeInfoInterval = time.Second
cfgSender.Network.ReconnectInterval = 2 * time.Second
srvSender, err := itx.NewServer(cfgSender)
Expand Down Expand Up @@ -96,11 +96,14 @@ func TestBroadcastNodeInfo(t *testing.T) {
require.NoError(srvReciever.Stop(ctxReciever))
}()

// check if there is sender's info in reciever delegatemanager
// check if there is sender's info in receiver delegatemanager
addrSender := cfgSender.Chain.ProducerAddress()[0].String()
peer, err := srvSender.P2PAgent().Info()
require.NoError(err)
// TODO: replace with automatic broadcast check after enabling broadcast
require.NoError(srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager().BroadcastNodeInfo(context.Background(), []string{addrSender}))
require.NoError(testutil.WaitUntil(100*time.Millisecond, 10*time.Second, func() (bool, error) {
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeInfo(addrSender)
_, ok := srvReciever.ChainService(cfgReciever.Chain.ID).NodeInfoManager().GetNodeInfo(peer.ID.String())
return ok, nil
}))
}
Expand Down Expand Up @@ -143,9 +146,8 @@ func TestUnicastNodeInfo(t *testing.T) {
dmSender := srvSender.ChainService(cfgSender.Chain.ID).NodeInfoManager()
err = dmSender.RequestSingleNodeInfoAsync(context.Background(), peerReciever)
require.NoError(err)
addrReciever := cfgReceiver.Chain.ProducerAddress()[0].String()
require.NoError(testutil.WaitUntil(100*time.Millisecond, 10*time.Second, func() (bool, error) {
_, ok := dmSender.GetNodeInfo(addrReciever)
_, ok := dmSender.GetNodeInfo(peerReciever.ID.String())
return ok, nil
}))
}
8 changes: 3 additions & 5 deletions nodeinfo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ import "time"

// Config node config
type Config struct {
EnableBroadcastNodeInfo bool `yaml:"enableBroadcastNodeInfo"`
DisableBroadcastNodeInfo bool `yaml:"enableBroadcastNodeInfo"`
BroadcastNodeInfoInterval time.Duration `yaml:"broadcastNodeInfoInterval"`
BroadcastListTTL time.Duration `yaml:"broadcastListTTL"`
NodeMapSize int `yaml:"nodeMapSize"`
}

// DefaultConfig is the default config
var DefaultConfig = Config{
EnableBroadcastNodeInfo: false,
BroadcastNodeInfoInterval: 5 * time.Minute,
BroadcastListTTL: 30 * time.Minute,
DisableBroadcastNodeInfo: false,
BroadcastNodeInfoInterval: 1 * time.Minute,
NodeMapSize: 1000,
}
Loading