From ae71e129f40038d1376acdea8fef18e7fb168487 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Tue, 21 Jan 2025 20:44:38 +0400 Subject: [PATCH 1/5] sync2: implement malfeasance sync --- config/mainnet.go | 4 + config/presets/fastnet.go | 1 + config/presets/testnet.go | 4 + fetch/mesh_data.go | 22 +++- fetch/mesh_data_test.go | 25 ++++ sync2/atxs.go | 135 +++----------------- sync2/atxs_test.go | 212 +------------------------------ sync2/commitstate.go | 139 ++++++++++++++++++++ sync2/commitstate_test.go | 260 ++++++++++++++++++++++++++++++++++++++ sync2/interface.go | 11 +- sync2/malfeasance.go | 93 ++++++++++++++ sync2/malfeasance_test.go | 53 ++++++++ sync2/mocks_test.go | 165 ++++++++++++++++++++++-- sync2/p2p.go | 18 +++ syncer/interface.go | 6 + syncer/mocks/mocks.go | 174 +++++++++++++++++++++++++ syncer/syncer.go | 76 ++++++++++- syncer/syncer_test.go | 20 +++ 18 files changed, 1069 insertions(+), 349 deletions(-) create mode 100644 sync2/commitstate.go create mode 100644 sync2/commitstate_test.go create mode 100644 sync2/malfeasance.go create mode 100644 sync2/malfeasance_test.go diff --git a/config/mainnet.go b/config/mainnet.go index f4b6a4ac23..a7b11f6916 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -87,6 +87,9 @@ func MainnetConfig() Config { newAtxSyncCfg.MaxDepth = 21 newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute newAtxSyncCfg.AdvanceInterval = 5 * time.Minute + malSyncCfg := sync2.DefaultConfig() + malSyncCfg.MaxDepth = 16 + malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute return Config{ BaseConfig: BaseConfig{ @@ -228,6 +231,7 @@ func MainnetConfig() Config { Enable: true, OldAtxSyncCfg: oldAtxSyncCfg, NewAtxSyncCfg: newAtxSyncCfg, + MalSyncCfg: malSyncCfg, ParallelLoadLimit: 10, HardTimeout: 10 * time.Minute, ServerConfig: fetch.ServerConfig{ diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index ba9838d26d..5913a93825 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -56,6 +56,7 @@ func fastnet() config.Config { conf.Sync.AtxSync.EpochInfoPeers = 10 conf.Sync.AtxSync.RequestsLimit = 100 conf.Sync.MalSync.IDRequestInterval = 20 * time.Second + conf.Sync.ReconcSync.MalSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 20 * time.Second conf.LayersPerEpoch = 4 conf.RegossipAtxInterval = 30 * time.Second conf.FETCH.RequestTimeout = 2 * time.Second diff --git a/config/presets/testnet.go b/config/presets/testnet.go index 2c12087087..bf194f65a9 100644 --- a/config/presets/testnet.go +++ b/config/presets/testnet.go @@ -75,6 +75,9 @@ func testnet() config.Config { newAtxSyncCfg.MaxDepth = 21 newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute newAtxSyncCfg.MultiPeerReconcilerConfig.SyncPeerCount = 4 + malSyncCfg := sync2.DefaultConfig() + malSyncCfg.MaxDepth = 16 + malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute return config.Config{ Preset: "testnet", @@ -179,6 +182,7 @@ func testnet() config.Config { Enable: true, OldAtxSyncCfg: oldAtxSyncCfg, NewAtxSyncCfg: newAtxSyncCfg, + MalSyncCfg: malSyncCfg, ParallelLoadLimit: 10, HardTimeout: time.Minute, ServerConfig: fetch.ServerConfig{ diff --git a/fetch/mesh_data.go b/fetch/mesh_data.go index 34a5f62181..a435ea2475 100644 --- a/fetch/mesh_data.go +++ b/fetch/mesh_data.go @@ -160,14 +160,30 @@ func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error { return f.getHashes(ctx, []types.Hash32{set}, datastore.ActiveSet, f.validators.activeset.HandleMessage) } -// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them. -func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error { +// GetMalfeasanceProofsWithCallback gets malfeasance proofs for the specified NodeIDs and validates them. +// If callback is not nil, GetMalfeasanceProofsWithCallback invokes the callback for each proof fetched. +func (f *Fetch) GetMalfeasanceProofsWithCallback( + ctx context.Context, + ids []types.NodeID, + callback func(types.NodeID, error), +) error { if len(ids) == 0 { return nil } f.logger.Debug("requesting malfeasance proofs from peer", log.ZContext(ctx), zap.Int("num_proofs", len(ids))) hashes := types.NodeIDsToHashes(ids) - return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage) + var ghOpts []getHashesOpt + if callback != nil { + ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) { + callback(types.NodeID(hash), err) + })) + } + return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage, ghOpts...) +} + +// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them. +func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error { + return f.GetMalfeasanceProofsWithCallback(ctx, ids, nil) } // GetBallots gets data for the specified BallotIDs and validates them. diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index 82c5d422f3..b413669301 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -401,6 +401,31 @@ func TestFetch_GetMalfeasanceProofs(t *testing.T) { require.NoError(t, eg.Wait()) } +func TestFetch_GetMalfeasanceProofsWithCallback(t *testing.T) { + nodeIDs := []types.NodeID{{1}, {2}, {3}} + f := createFetch(t) + f.mMalH.EXPECT(). + HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(len(nodeIDs)) + + stop := make(chan struct{}, 1) + var eg errgroup.Group + startTestLoop(t, f.Fetch, &eg, stop) + + var ids []types.NodeID + require.NoError(t, f.GetMalfeasanceProofsWithCallback( + context.Background(), nodeIDs, + func(nodeID types.NodeID, err error) { + require.NotContains(t, ids, nodeID) + ids = append(ids, nodeID) + require.NoError(t, err) + })) + require.ElementsMatch(t, nodeIDs, ids) + close(stop) + require.NoError(t, eg.Wait()) +} + func TestFetch_GetBlocks(t *testing.T) { blks := []*types.Block{ genLayerBlock(types.LayerID(10), types.RandomTXSet(10)), diff --git a/sync2/atxs.go b/sync2/atxs.go index ecb6241f45..58f6b4d2ae 100644 --- a/sync2/atxs.go +++ b/sync2/atxs.go @@ -4,19 +4,14 @@ import ( "context" "errors" "fmt" - "sync" "github.com/jonboulle/clockwork" - "github.com/libp2p/go-libp2p/core/host" "go.uber.org/zap" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/fetch" "github.com/spacemeshos/go-spacemesh/fetch/peers" "github.com/spacemeshos/go-spacemesh/p2p" - "github.com/spacemeshos/go-spacemesh/p2p/pubsub" - "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/expr" "github.com/spacemeshos/go-spacemesh/sync2/dbset" @@ -26,10 +21,6 @@ import ( "github.com/spacemeshos/go-spacemesh/system" ) -const ( - proto = "sync/2" -) - type ATXHandler struct { logger *zap.Logger f Fetcher @@ -37,7 +28,10 @@ type ATXHandler struct { cfg Config } -var _ multipeer.SyncKeyHandler = &ATXHandler{} +var ( + _ multipeer.SyncKeyHandler = &ATXHandler{} + _ Handler[types.ATXID] = &ATXHandler{} +) func NewATXHandler( logger *zap.Logger, @@ -56,72 +50,14 @@ func NewATXHandler( } } -type commitState struct { - state map[types.ATXID]uint - total int - numDownloaded int - items []types.ATXID +func (h *ATXHandler) Register(peer p2p.Peer, k rangesync.KeyBytes) types.ATXID { + id := types.BytesToATXID(k) + h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()}) + return id } -func (h *ATXHandler) setupState( - peer p2p.Peer, - base rangesync.OrderedSet, - received rangesync.SeqResult, -) (*commitState, error) { - state := make(map[types.ATXID]uint) - for k := range received.Seq { - found, err := base.Has(k) - if err != nil { - return nil, fmt.Errorf("check if ATX exists: %w", err) - } - if found { - continue - } - id := types.BytesToATXID(k) - h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()}) - state[id] = 0 - } - if err := received.Error(); err != nil { - return nil, fmt.Errorf("get item: %w", err) - } - return &commitState{ - state: state, - total: len(state), - items: make([]types.ATXID, 0, h.cfg.BatchSize), - }, nil -} - -func (h *ATXHandler) getAtxs(ctx context.Context, cs *commitState) (bool, error) { - cs.items = cs.items[:0] // reuse the slice to reduce allocations - for id := range cs.state { - cs.items = append(cs.items, id) - if uint(len(cs.items)) == h.cfg.BatchSize { - break - } - } - someSucceeded := false - var mtx sync.Mutex - err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) { - mtx.Lock() - defer mtx.Unlock() - switch { - case err == nil: - cs.numDownloaded++ - someSucceeded = true - delete(cs.state, id) - case errors.Is(err, pubsub.ErrValidationReject): - h.logger.Debug("failed to download ATX", - zap.String("atx", id.ShortString()), zap.Error(err)) - delete(cs.state, id) - case cs.state[id] >= h.cfg.MaxAttempts-1: - h.logger.Debug("failed to download ATX: max attempts reached", - zap.String("atx", id.ShortString())) - delete(cs.state, id) - default: - cs.state[id]++ - } - })) - return someSucceeded, err +func (h *ATXHandler) Get(ctx context.Context, ids []types.ATXID, callback func(types.ATXID, error)) error { + return h.f.GetAtxs(ctx, ids, system.WithATXCallback(callback)) } func (h *ATXHandler) Commit( @@ -132,46 +68,11 @@ func (h *ATXHandler) Commit( ) error { h.logger.Debug("begin atx commit") defer h.logger.Debug("end atx commit") - cs, err := h.setupState(peer, base, received) + cs, err := NewCommitState(h.logger, h, h.clock, peer, base, received, h.cfg) if err != nil { return err } - startTime := h.clock.Now() - batchAttemptsRemaining := h.cfg.MaxBatchRetries - for len(cs.state) > 0 { - someSucceeded, err := h.getAtxs(ctx, cs) - batchErr := &fetch.BatchError{} - switch { - case err == nil: - case errors.Is(err, context.Canceled): - return err - case !errors.As(err, &batchErr): - h.logger.Debug("failed to download ATXs", zap.Error(err)) - } - if !someSucceeded { - if batchAttemptsRemaining == 0 { - return errors.New("failed to download ATXs: max batch retries reached") - } - batchAttemptsRemaining-- - h.logger.Debug("failed to download any ATXs: will retry batch", - zap.Uint("remaining", batchAttemptsRemaining), - zap.Duration("delay", h.cfg.FailedBatchDelay)) - select { - case <-ctx.Done(): - return ctx.Err() - case <-h.clock.After(h.cfg.FailedBatchDelay): - continue - } - } - - batchAttemptsRemaining = h.cfg.MaxBatchRetries - elapsed := h.clock.Since(startTime) - h.logger.Debug("fetched atxs", - zap.Int("total", cs.total), - zap.Int("downloaded", cs.numDownloaded), - zap.Float64("rate per sec", float64(cs.numDownloaded)/elapsed.Seconds())) - } - return nil + return cs.Commit(ctx) } type MultiEpochATXSyncer struct { @@ -221,7 +122,7 @@ func (s *MultiEpochATXSyncer) load(newEpoch types.EpochID) error { if epoch == newEpoch { cfg = s.newCfg } - hs, err := s.hss.CreateHashSync(name, cfg, epoch) + hs, err := s.hss.CreateATXSync(name, cfg, epoch) if err != nil { return fmt.Errorf("create ATX syncer for epoch %d: %w", epoch, err) } @@ -307,12 +208,6 @@ func NewATXSyncer( return NewP2PHashSync(logger, d, name, curSet, 32, peers, handler, cfg, enableActiveSync) } -func NewDispatcher(logger *zap.Logger, host host.Host, opts []server.Opt) *rangesync.Dispatcher { - d := rangesync.NewDispatcher(logger) - d.SetupServer(host, proto, opts...) - return d -} - type ATXSyncSource struct { logger *zap.Logger d *rangesync.Dispatcher @@ -335,7 +230,7 @@ func NewATXSyncSource( return &ATXSyncSource{logger: logger, d: d, db: db, f: f, peers: peers, enableActiveSync: enableActiveSync} } -// CreateHashSync implements HashSyncSource. -func (as *ATXSyncSource) CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) { +// CreateATXSync implements HashSyncSource. +func (as *ATXSyncSource) CreateATXSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) { return NewATXSyncer(as.logger.Named(name), as.d, name, cfg, as.db, as.f, as.peers, epoch, as.enableActiveSync) } diff --git a/sync2/atxs_test.go b/sync2/atxs_test.go index 174e243a38..c71a198c38 100644 --- a/sync2/atxs_test.go +++ b/sync2/atxs_test.go @@ -2,32 +2,27 @@ package sync2_test import ( "context" - "errors" "fmt" "testing" - "time" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" gomock "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" - "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/fetch" "github.com/spacemeshos/go-spacemesh/p2p" - "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/sync2" "github.com/spacemeshos/go-spacemesh/sync2/rangesync" "github.com/spacemeshos/go-spacemesh/sync2/rangesync/mocks" "github.com/spacemeshos/go-spacemesh/system" ) -func atxSeqResult(atxs []types.ATXID) rangesync.SeqResult { +func byteSeqResult[T interface{ Bytes() []byte }](items []T) rangesync.SeqResult { return rangesync.SeqResult{ Seq: func(yield func(k rangesync.KeyBytes) bool) { - for _, atx := range atxs { - if !yield(atx.Bytes()) { + for _, item := range items { + if !yield(item.Bytes()) { return } } @@ -36,14 +31,7 @@ func atxSeqResult(atxs []types.ATXID) rangesync.SeqResult { } } -var testCfg = sync2.Config{ - BatchSize: 4, - MaxAttempts: 3, - MaxBatchRetries: 2, - FailedBatchDelay: 10 * time.Second, -} - -func TestAtxHandler_Success(t *testing.T) { +func TestAtxHandler(t *testing.T) { ctrl := gomock.NewController(t) allAtxs := make([]types.ATXID, 10) logger := zaptest.NewLogger(t) @@ -79,199 +67,11 @@ func TestAtxHandler_Success(t *testing.T) { } return nil }).Times(3) - require.NoError(t, h.Commit(context.Background(), peer, baseSet, atxSeqResult(allAtxs))) + require.NoError(t, h.Commit(context.Background(), peer, baseSet, byteSeqResult(allAtxs))) require.Empty(t, toFetch) require.Equal(t, []int{4, 4, 2}, batches) } -func TestAtxHandler_Retry(t *testing.T) { - ctrl := gomock.NewController(t) - allAtxs := make([]types.ATXID, 10) - logger := zaptest.NewLogger(t) - peer := p2p.Peer("foobar") - for i := range allAtxs { - allAtxs[i] = types.RandomATXID() - } - f := NewMockFetcher(ctrl) - clock := clockwork.NewFakeClock() - h := sync2.NewATXHandler(logger, f, testCfg, clock) - baseSet := mocks.NewMockOrderedSet(ctrl) - for _, id := range allAtxs { - baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes())) - f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{id.Hash32()}) - } - failCount := 0 - var fetched []types.ATXID - validationFailed := false - f.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, atxs []types.ATXID, opts ...system.GetAtxOpt) error { - errs := make(map[types.Hash32]error) - var atxOpts system.GetAtxOpts - for _, opt := range opts { - opt(&atxOpts) - } - require.NotNil(t, atxOpts.Callback) - for _, id := range atxs { - switch { - case id == allAtxs[0]: - require.False(t, validationFailed, "retried after validation error") - errs[id.Hash32()] = pubsub.ErrValidationReject - atxOpts.Callback(id, errs[id.Hash32()]) - validationFailed = true - case id == allAtxs[1] && failCount < 2: - errs[id.Hash32()] = errors.New("fetch failed") - atxOpts.Callback(id, errs[id.Hash32()]) - failCount++ - default: - fetched = append(fetched, id) - atxOpts.Callback(id, nil) - } - } - if len(errs) > 0 { - var bErr fetch.BatchError - for h, err := range errs { - bErr.Add(h, err) - } - return &bErr - } - return nil - }).AnyTimes() - - // If it so happens that a full batch fails, we need to advance the clock to - // trigger the retry. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var eg errgroup.Group - eg.Go(func() error { - for { - clock.BlockUntilContext(ctx, 1) - if ctx.Err() != nil { - return nil - } - clock.Advance(testCfg.FailedBatchDelay) - } - }) - - require.NoError(t, h.Commit(context.Background(), peer, baseSet, atxSeqResult(allAtxs))) - require.ElementsMatch(t, allAtxs[1:], fetched) - cancel() - require.NoError(t, eg.Wait()) -} - -func TestAtxHandler_Cancel(t *testing.T) { - atxID := types.RandomATXID() - ctrl := gomock.NewController(t) - logger := zaptest.NewLogger(t) - peer := p2p.Peer("foobar") - f := NewMockFetcher(ctrl) - clock := clockwork.NewFakeClock() - h := sync2.NewATXHandler(logger, f, testCfg, clock) - baseSet := mocks.NewMockOrderedSet(ctrl) - baseSet.EXPECT().Has(rangesync.KeyBytes(atxID.Bytes())).Return(false, nil) - f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{atxID.Hash32()}) - f.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, atxs []types.ATXID, opts ...system.GetAtxOpt) error { - return context.Canceled - }) - sr := rangesync.SeqResult{ - Seq: func(yield func(k rangesync.KeyBytes) bool) { - yield(atxID.Bytes()) - }, - Error: rangesync.NoSeqError, - } - require.ErrorIs(t, h.Commit(context.Background(), peer, baseSet, sr), context.Canceled) -} - -func TestAtxHandler_BatchRetry(t *testing.T) { - ctrl := gomock.NewController(t) - allAtxs := make([]types.ATXID, 10) - logger := zaptest.NewLogger(t) - peer := p2p.Peer("foobar") - for i := range allAtxs { - allAtxs[i] = types.RandomATXID() - } - clock := clockwork.NewFakeClock() - f := NewMockFetcher(ctrl) - h := sync2.NewATXHandler(logger, f, testCfg, clock) - baseSet := mocks.NewMockOrderedSet(ctrl) - for _, id := range allAtxs { - baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes())) - f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{id.Hash32()}) - } - f.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, atxs []types.ATXID, opts ...system.GetAtxOpt) error { - return errors.New("fetch failed") - }) - var eg errgroup.Group - eg.Go(func() error { - return h.Commit(context.Background(), peer, baseSet, atxSeqResult(allAtxs)) - }) - // wait for delay after 1st batch failure - clock.BlockUntilContext(context.Background(), 1) - toFetch := make(map[types.ATXID]bool) - for _, id := range allAtxs { - toFetch[id] = true - } - f.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, atxs []types.ATXID, opts ...system.GetAtxOpt) error { - var atxOpts system.GetAtxOpts - for _, opt := range opts { - opt(&atxOpts) - } - require.NotNil(t, atxOpts.Callback) - for _, id := range atxs { - require.True(t, toFetch[id], "already fetched or bad ID") - delete(toFetch, id) - atxOpts.Callback(id, nil) - } - return nil - }).Times(3) - clock.Advance(testCfg.FailedBatchDelay) - require.NoError(t, eg.Wait()) - require.Empty(t, toFetch) -} - -func TestAtxHandler_BatchRetry_Fail(t *testing.T) { - ctrl := gomock.NewController(t) - allAtxs := make([]types.ATXID, 10) - logger := zaptest.NewLogger(t) - peer := p2p.Peer("foobar") - for i := range allAtxs { - allAtxs[i] = types.RandomATXID() - } - clock := clockwork.NewFakeClock() - f := NewMockFetcher(ctrl) - h := sync2.NewATXHandler(logger, f, testCfg, clock) - baseSet := mocks.NewMockOrderedSet(ctrl) - for _, id := range allAtxs { - baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes())) - f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{id.Hash32()}) - } - sr := rangesync.SeqResult{ - Seq: func(yield func(k rangesync.KeyBytes) bool) { - for _, atx := range allAtxs { - if !yield(atx.Bytes()) { - return - } - } - }, - Error: rangesync.NoSeqError, - } - f.EXPECT().GetAtxs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, atxs []types.ATXID, opts ...system.GetAtxOpt) error { - return errors.New("fetch failed") - }).Times(3) - var eg errgroup.Group - eg.Go(func() error { - return h.Commit(context.Background(), peer, baseSet, sr) - }) - for range 2 { - clock.BlockUntilContext(context.Background(), 1) - clock.Advance(testCfg.FailedBatchDelay) - } - require.Error(t, eg.Wait()) -} - func TestMultiEpochATXSyncer(t *testing.T) { ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t) @@ -290,7 +90,7 @@ func TestMultiEpochATXSyncer(t *testing.T) { var syncActions []string curIdx := 0 - hss.EXPECT().CreateHashSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + hss.EXPECT().CreateATXSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(name string, cfg sync2.Config, epoch types.EpochID) (sync2.HashSync, error) { idx := curIdx curIdx++ diff --git a/sync2/commitstate.go b/sync2/commitstate.go new file mode 100644 index 0000000000..c5a04ce674 --- /dev/null +++ b/sync2/commitstate.go @@ -0,0 +1,139 @@ +package sync2 + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/jonboulle/clockwork" + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/log" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/pubsub" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync" +) + +type ItemID interface { + comparable + log.ShortString +} + +type CommitState[T ItemID] struct { + logger *zap.Logger + handler Handler[T] + clock clockwork.Clock + mtx sync.Mutex + state map[T]uint + total int + numDownloaded int + items []T + cfg Config + someSucceeded bool +} + +func NewCommitState[T ItemID]( + logger *zap.Logger, + handler Handler[T], + clock clockwork.Clock, + peer p2p.Peer, + base rangesync.OrderedSet, + received rangesync.SeqResult, + cfg Config, +) (*CommitState[T], error) { + state := make(map[T]uint) + for k := range received.Seq { + found, err := base.Has(k) + if err != nil { + return nil, fmt.Errorf("check if object exists: %w", err) + } + if found { + continue + } + state[handler.Register(peer, k)] = 0 + } + if err := received.Error(); err != nil { + return nil, fmt.Errorf("get item: %w", err) + } + return &CommitState[T]{ + logger: logger, + handler: handler, + clock: clock, + state: state, + total: len(state), + items: make([]T, 0, cfg.BatchSize), + cfg: cfg, + }, nil +} + +func (cs *CommitState[T]) batch() []T { + cs.items = cs.items[:0] // reuse the slice to reduce allocations + for id := range cs.state { + cs.items = append(cs.items, id) + if uint(len(cs.items)) == cs.cfg.BatchSize { + break + } + } + return cs.items +} + +func (cs *CommitState[T]) handleItem(id T, err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + switch { + case err == nil: + cs.numDownloaded++ + cs.someSucceeded = true + delete(cs.state, id) + case errors.Is(err, pubsub.ErrValidationReject): + cs.logger.Debug("failed to download", log.ZShortStringer("id", id), zap.Error(err)) + delete(cs.state, id) + case cs.state[id] >= cs.cfg.MaxAttempts-1: + cs.logger.Debug("failed to download: max attempts reached", log.ZShortStringer("id", id)) + delete(cs.state, id) + default: + cs.state[id]++ + } +} + +func (cs *CommitState[T]) Commit(ctx context.Context) error { + startTime := cs.clock.Now() + batchAttemptsRemaining := cs.cfg.MaxBatchRetries + for len(cs.state) > 0 { + cs.someSucceeded = false + err := cs.handler.Get(ctx, cs.batch(), cs.handleItem) + batchErr := &fetch.BatchError{} + switch { + case err == nil: + case errors.Is(err, context.Canceled): + return err + case !errors.As(err, &batchErr): + cs.logger.Debug("failed to download", zap.Error(err)) + } + if !cs.someSucceeded { + if batchAttemptsRemaining == 0 { + return errors.New("failed to download: max batch retries reached") + } + batchAttemptsRemaining-- + cs.logger.Debug("failed to download any objects: will retry batch", + zap.Uint("remaining", batchAttemptsRemaining), + zap.Duration("delay", cs.cfg.FailedBatchDelay)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-cs.clock.After(cs.cfg.FailedBatchDelay): + continue + } + } + + batchAttemptsRemaining = cs.cfg.MaxBatchRetries + elapsed := cs.clock.Since(startTime) + cs.logger.Debug("fetched objects", + zap.Int("total", cs.total), + zap.Int("downloaded", cs.numDownloaded), + zap.Float64("rate per sec", float64(cs.numDownloaded)/elapsed.Seconds())) + } + return nil +} diff --git a/sync2/commitstate_test.go b/sync2/commitstate_test.go new file mode 100644 index 0000000000..63c6f926be --- /dev/null +++ b/sync2/commitstate_test.go @@ -0,0 +1,260 @@ +package sync2_test + +import ( + "context" + "encoding/hex" + "errors" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/pubsub" + "github.com/spacemeshos/go-spacemesh/sync2" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync/mocks" +) + +var testCfg = sync2.Config{ + BatchSize: 4, + MaxAttempts: 3, + MaxBatchRetries: 2, + FailedBatchDelay: 10 * time.Second, +} + +type fakeID [4]byte + +func (id fakeID) ShortString() string { + return hex.EncodeToString(id[:]) +} + +func (id fakeID) Bytes() []byte { + return id[:] +} + +func (id fakeID) Hash32() types.Hash32 { + var h types.Hash32 + copy(h[:], id[:]) + return h +} + +func randomFakeID() fakeID { + var id fakeID + copy(id[:], types.RandomBytes(len(id))) + return id +} + +func TestCommitState_Success(t *testing.T) { + ctrl := gomock.NewController(t) + allIDs := make([]fakeID, 10) + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + for i := range allIDs { + allIDs[i] = randomFakeID() + } + clock := clockwork.NewFakeClock() + h := NewMockHandler[fakeID](ctrl) + baseSet := mocks.NewMockOrderedSet(ctrl) + for _, id := range allIDs { + baseSet.EXPECT().Has(id.Bytes()) + h.EXPECT().Register(peer, id.Bytes()).Return(id) + } + toFetch := make(map[fakeID]bool) + for _, id := range allIDs { + toFetch[id] = true + } + var batches []int + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + batches = append(batches, len(ids)) + for _, id := range ids { + require.True(t, toFetch[id], "already fetched or bad ID") + delete(toFetch, id) + callback(id, nil) + } + return nil + }).Times(3) + cs, err := sync2.NewCommitState(logger, h, clock, peer, baseSet, byteSeqResult(allIDs), testCfg) + require.NoError(t, err) + require.NoError(t, cs.Commit(context.Background())) + require.Empty(t, toFetch) + require.Equal(t, []int{4, 4, 2}, batches) +} + +func TestCommitState_Retry(t *testing.T) { + ctrl := gomock.NewController(t) + allIDs := make([]fakeID, 10) + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + for i := range allIDs { + allIDs[i] = randomFakeID() + } + clock := clockwork.NewFakeClock() + h := NewMockHandler[fakeID](ctrl) + baseSet := mocks.NewMockOrderedSet(ctrl) + for _, id := range allIDs { + baseSet.EXPECT().Has(rangesync.KeyBytes(id[:])) + h.EXPECT().Register(peer, rangesync.KeyBytes(id[:])).Return(id) + } + toFetch := make(map[fakeID]bool) + for _, id := range allIDs { + toFetch[id] = true + } + failCount := 0 + var fetched []fakeID + validationFailed := false + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + errs := make(map[types.Hash32]error) + for _, id := range ids { + switch { + case id == allIDs[0]: + require.False(t, validationFailed, "retried after validation error") + errs[id.Hash32()] = pubsub.ErrValidationReject + callback(id, errs[id.Hash32()]) + validationFailed = true + case id == allIDs[1] && failCount < 2: + errs[id.Hash32()] = errors.New("fetch failed") + callback(id, errs[id.Hash32()]) + failCount++ + default: + fetched = append(fetched, id) + callback(id, nil) + } + } + if len(errs) > 0 { + var bErr fetch.BatchError + for h, err := range errs { + bErr.Add(h, err) + } + return &bErr + } + return nil + }).AnyTimes() + + cs, err := sync2.NewCommitState(logger, h, clock, peer, baseSet, byteSeqResult(allIDs), testCfg) + require.NoError(t, err) + + // If it so happens that a full batch fails, we need to advance the clock to + // trigger the retry. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var eg errgroup.Group + eg.Go(func() error { + for { + clock.BlockUntilContext(ctx, 1) + if ctx.Err() != nil { + return nil + } + clock.Advance(testCfg.FailedBatchDelay) + } + }) + + require.NoError(t, cs.Commit(context.Background())) + require.ElementsMatch(t, allIDs[1:], fetched) + cancel() + require.NoError(t, eg.Wait()) +} + +func TestCommitState_Cancel(t *testing.T) { + ctrl := gomock.NewController(t) + id := randomFakeID() + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + clock := clockwork.NewFakeClock() + h := NewMockHandler[fakeID](ctrl) + baseSet := mocks.NewMockOrderedSet(ctrl) + baseSet.EXPECT().Has(rangesync.KeyBytes(id[:])) + h.EXPECT().Register(peer, rangesync.KeyBytes(id[:])).Return(id) + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + return context.Canceled + }) + cs, err := sync2.NewCommitState(logger, h, clock, peer, baseSet, byteSeqResult([]fakeID{id}), testCfg) + require.NoError(t, err) + require.ErrorIs(t, cs.Commit(context.Background()), context.Canceled) +} + +func TestCommitState_BatchRetry(t *testing.T) { + ctrl := gomock.NewController(t) + allIDs := make([]fakeID, 10) + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + for i := range allIDs { + allIDs[i] = randomFakeID() + } + clock := clockwork.NewFakeClock() + h := NewMockHandler[fakeID](ctrl) + baseSet := mocks.NewMockOrderedSet(ctrl) + for _, id := range allIDs { + baseSet.EXPECT().Has(rangesync.KeyBytes(id[:])) + h.EXPECT().Register(peer, rangesync.KeyBytes(id[:])).Return(id) + } + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + return errors.New("fetch failed") + }) + var eg errgroup.Group + cs, err := sync2.NewCommitState(logger, h, clock, peer, baseSet, byteSeqResult(allIDs), testCfg) + require.NoError(t, err) + eg.Go(func() error { + return cs.Commit(context.Background()) + }) + // wait for delay after 1st batch failure + clock.BlockUntilContext(context.Background(), 1) + toFetch := make(map[fakeID]bool) + for _, id := range allIDs { + toFetch[id] = true + } + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + for _, id := range ids { + require.True(t, toFetch[id], "already fetched or bad ID") + delete(toFetch, id) + callback(id, nil) + } + return nil + }).Times(3) + clock.Advance(testCfg.FailedBatchDelay) + require.NoError(t, eg.Wait()) + require.Empty(t, toFetch) +} + +func TestCommitState_BatchRetry_Fail(t *testing.T) { + ctrl := gomock.NewController(t) + allIDs := make([]fakeID, 10) + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + for i := range allIDs { + allIDs[i] = randomFakeID() + } + clock := clockwork.NewFakeClock() + h := NewMockHandler[fakeID](ctrl) + baseSet := mocks.NewMockOrderedSet(ctrl) + for _, id := range allIDs { + baseSet.EXPECT().Has(rangesync.KeyBytes(id[:])) + h.EXPECT().Register(peer, rangesync.KeyBytes(id[:])).Return(id) + } + h.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, ids []fakeID, callback func(fakeID, error)) error { + return errors.New("fetch failed") + }).Times(3) + var eg errgroup.Group + cs, err := sync2.NewCommitState(logger, h, clock, peer, baseSet, byteSeqResult(allIDs), testCfg) + require.NoError(t, err) + eg.Go(func() error { + return cs.Commit(context.Background()) + }) + for range 2 { + clock.BlockUntilContext(context.Background(), 1) + clock.Advance(testCfg.FailedBatchDelay) + } + require.Error(t, eg.Wait()) +} diff --git a/sync2/interface.go b/sync2/interface.go index bcacae31b7..da7b45140a 100644 --- a/sync2/interface.go +++ b/sync2/interface.go @@ -5,13 +5,15 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync" "github.com/spacemeshos/go-spacemesh/system" ) -//go:generate mockgen -typed -package=sync2_test -destination=./mocks_test.go -source=./interface.go +//go:generate mockgen -typed -package=sync2_test -destination=./mocks_test.go -source=./interface.go -exclude_interfaces itemID type Fetcher interface { GetAtxs(context.Context, []types.ATXID, ...system.GetAtxOpt) error + GetMalfeasanceProofsWithCallback(context.Context, []types.NodeID, func(types.NodeID, error)) error RegisterPeerHashes(peer p2p.Peer, hash []types.Hash32) } @@ -23,9 +25,14 @@ type HashSync interface { } type HashSyncSource interface { - CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) + CreateATXSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) } type LayerTicker interface { CurrentLayer() types.LayerID } + +type Handler[T ItemID] interface { + Register(peer p2p.Peer, k rangesync.KeyBytes) T + Get(ctx context.Context, ids []T, callback func(T, error)) error +} diff --git a/sync2/malfeasance.go b/sync2/malfeasance.go new file mode 100644 index 0000000000..a93eab8a81 --- /dev/null +++ b/sync2/malfeasance.go @@ -0,0 +1,93 @@ +package sync2 + +import ( + "context" + + "github.com/jonboulle/clockwork" + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch/peers" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sync2/dbset" + "github.com/spacemeshos/go-spacemesh/sync2/multipeer" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync" + "github.com/spacemeshos/go-spacemesh/sync2/sqlstore" +) + +type MalfeasanceHandler struct { + logger *zap.Logger + f Fetcher + clock clockwork.Clock + cfg Config +} + +var ( + _ multipeer.SyncKeyHandler = &MalfeasanceHandler{} + _ Handler[types.NodeID] = &MalfeasanceHandler{} +) + +func NewMalfeasanceHandler( + logger *zap.Logger, + f Fetcher, + cfg Config, + clock clockwork.Clock, +) *MalfeasanceHandler { + if clock == nil { + clock = clockwork.NewRealClock() + } + return &MalfeasanceHandler{ + f: f, + logger: logger, + clock: clock, + cfg: cfg, + } +} + +func (h *MalfeasanceHandler) Register(peer p2p.Peer, k rangesync.KeyBytes) types.NodeID { + id := types.BytesToNodeID(k) + h.f.RegisterPeerHashes(peer, []types.Hash32{types.Hash32(id)}) + return id +} + +func (h *MalfeasanceHandler) Get(ctx context.Context, ids []types.NodeID, callback func(types.NodeID, error)) error { + return h.f.GetMalfeasanceProofsWithCallback(ctx, ids, callback) +} + +func (h *MalfeasanceHandler) Commit( + ctx context.Context, + peer p2p.Peer, + base rangesync.OrderedSet, + received rangesync.SeqResult, +) error { + h.logger.Debug("begin malfeasance commit") + defer h.logger.Debug("end malfeasance commit") + cs, err := NewCommitState(h.logger, h, h.clock, peer, base, received, h.cfg) + if err != nil { + return err + } + return cs.Commit(ctx) +} + +func identitiesTable() *sqlstore.SyncedTable { + return &sqlstore.SyncedTable{ + TableName: "identities", + IDColumn: "id", + } +} + +func NewMalfeasanceSyncer( + logger *zap.Logger, + d *rangesync.Dispatcher, + name string, + cfg Config, + db sql.Database, + f Fetcher, + peers *peers.Peers, + enableActiveSync bool, +) (*P2PHashSync, error) { + curSet := dbset.NewDBSet(db, identitiesTable(), 32, int(cfg.MaxDepth)) + handler := NewMalfeasanceHandler(logger, f, cfg, nil) + return NewP2PHashSync(logger, d, name, curSet, 32, peers, handler, cfg, enableActiveSync) +} diff --git a/sync2/malfeasance_test.go b/sync2/malfeasance_test.go new file mode 100644 index 0000000000..3167c2cdd0 --- /dev/null +++ b/sync2/malfeasance_test.go @@ -0,0 +1,53 @@ +package sync2_test + +import ( + "context" + "testing" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/sync2" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync" + "github.com/spacemeshos/go-spacemesh/sync2/rangesync/mocks" +) + +func TestMalfeasanceHandler(t *testing.T) { + ctrl := gomock.NewController(t) + allNodes := make([]types.NodeID, 10) + logger := zaptest.NewLogger(t) + peer := p2p.Peer("foobar") + for i := range allNodes { + allNodes[i] = types.RandomNodeID() + } + f := NewMockFetcher(ctrl) + clock := clockwork.NewFakeClock() + h := sync2.NewMalfeasanceHandler(logger, f, testCfg, clock) + baseSet := mocks.NewMockOrderedSet(ctrl) + for _, id := range allNodes { + baseSet.EXPECT().Has(rangesync.KeyBytes(id.Bytes())) + f.EXPECT().RegisterPeerHashes(peer, []types.Hash32{types.Hash32(id)}) + } + toFetch := make(map[types.NodeID]bool) + for _, id := range allNodes { + toFetch[id] = true + } + var batches []int + f.EXPECT().GetMalfeasanceProofsWithCallback(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, nodes []types.NodeID, callback func(types.NodeID, error)) error { + batches = append(batches, len(nodes)) + for _, id := range nodes { + require.True(t, toFetch[id], "already fetched or bad ID") + delete(toFetch, id) + callback(id, nil) + } + return nil + }).Times(3) + require.NoError(t, h.Commit(context.Background(), peer, baseSet, byteSeqResult(allNodes))) + require.Empty(t, toFetch) + require.Equal(t, []int{4, 4, 2}, batches) +} diff --git a/sync2/mocks_test.go b/sync2/mocks_test.go index 02a100fbf3..741207b787 100644 --- a/sync2/mocks_test.go +++ b/sync2/mocks_test.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen -typed -package=sync2_test -destination=./mocks_test.go -source=./interface.go +// mockgen -typed -package=sync2_test -destination=./mocks_test.go -source=./interface.go -exclude_interfaces itemID // // Package sync2_test is a generated GoMock package. @@ -16,6 +16,7 @@ import ( types "github.com/spacemeshos/go-spacemesh/common/types" p2p "github.com/spacemeshos/go-spacemesh/p2p" sync2 "github.com/spacemeshos/go-spacemesh/sync2" + rangesync "github.com/spacemeshos/go-spacemesh/sync2/rangesync" system "github.com/spacemeshos/go-spacemesh/system" gomock "go.uber.org/mock/gomock" ) @@ -87,6 +88,44 @@ func (c *MockFetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATX return c } +// GetMalfeasanceProofsWithCallback mocks base method. +func (m *MockFetcher) GetMalfeasanceProofsWithCallback(arg0 context.Context, arg1 []types.NodeID, arg2 func(types.NodeID, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMalfeasanceProofsWithCallback", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetMalfeasanceProofsWithCallback indicates an expected call of GetMalfeasanceProofsWithCallback. +func (mr *MockFetcherMockRecorder) GetMalfeasanceProofsWithCallback(arg0, arg1, arg2 any) *MockFetcherGetMalfeasanceProofsWithCallbackCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMalfeasanceProofsWithCallback", reflect.TypeOf((*MockFetcher)(nil).GetMalfeasanceProofsWithCallback), arg0, arg1, arg2) + return &MockFetcherGetMalfeasanceProofsWithCallbackCall{Call: call} +} + +// MockFetcherGetMalfeasanceProofsWithCallbackCall wrap *gomock.Call +type MockFetcherGetMalfeasanceProofsWithCallbackCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockFetcherGetMalfeasanceProofsWithCallbackCall) Return(arg0 error) *MockFetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockFetcherGetMalfeasanceProofsWithCallbackCall) Do(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockFetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockFetcherGetMalfeasanceProofsWithCallbackCall) DoAndReturn(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockFetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // RegisterPeerHashes mocks base method. func (m *MockFetcher) RegisterPeerHashes(peer p2p.Peer, hash []types.Hash32) { m.ctrl.T.Helper() @@ -319,41 +358,41 @@ func (m *MockHashSyncSource) EXPECT() *MockHashSyncSourceMockRecorder { return m.recorder } -// CreateHashSync mocks base method. -func (m *MockHashSyncSource) CreateHashSync(name string, cfg sync2.Config, epoch types.EpochID) (sync2.HashSync, error) { +// CreateATXSync mocks base method. +func (m *MockHashSyncSource) CreateATXSync(name string, cfg sync2.Config, epoch types.EpochID) (sync2.HashSync, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateHashSync", name, cfg, epoch) + ret := m.ctrl.Call(m, "CreateATXSync", name, cfg, epoch) ret0, _ := ret[0].(sync2.HashSync) ret1, _ := ret[1].(error) return ret0, ret1 } -// CreateHashSync indicates an expected call of CreateHashSync. -func (mr *MockHashSyncSourceMockRecorder) CreateHashSync(name, cfg, epoch any) *MockHashSyncSourceCreateHashSyncCall { +// CreateATXSync indicates an expected call of CreateATXSync. +func (mr *MockHashSyncSourceMockRecorder) CreateATXSync(name, cfg, epoch any) *MockHashSyncSourceCreateATXSyncCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateHashSync", reflect.TypeOf((*MockHashSyncSource)(nil).CreateHashSync), name, cfg, epoch) - return &MockHashSyncSourceCreateHashSyncCall{Call: call} + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateATXSync", reflect.TypeOf((*MockHashSyncSource)(nil).CreateATXSync), name, cfg, epoch) + return &MockHashSyncSourceCreateATXSyncCall{Call: call} } -// MockHashSyncSourceCreateHashSyncCall wrap *gomock.Call -type MockHashSyncSourceCreateHashSyncCall struct { +// MockHashSyncSourceCreateATXSyncCall wrap *gomock.Call +type MockHashSyncSourceCreateATXSyncCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *MockHashSyncSourceCreateHashSyncCall) Return(arg0 sync2.HashSync, arg1 error) *MockHashSyncSourceCreateHashSyncCall { +func (c *MockHashSyncSourceCreateATXSyncCall) Return(arg0 sync2.HashSync, arg1 error) *MockHashSyncSourceCreateATXSyncCall { c.Call = c.Call.Return(arg0, arg1) return c } // Do rewrite *gomock.Call.Do -func (c *MockHashSyncSourceCreateHashSyncCall) Do(f func(string, sync2.Config, types.EpochID) (sync2.HashSync, error)) *MockHashSyncSourceCreateHashSyncCall { +func (c *MockHashSyncSourceCreateATXSyncCall) Do(f func(string, sync2.Config, types.EpochID) (sync2.HashSync, error)) *MockHashSyncSourceCreateATXSyncCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockHashSyncSourceCreateHashSyncCall) DoAndReturn(f func(string, sync2.Config, types.EpochID) (sync2.HashSync, error)) *MockHashSyncSourceCreateHashSyncCall { +func (c *MockHashSyncSourceCreateATXSyncCall) DoAndReturn(f func(string, sync2.Config, types.EpochID) (sync2.HashSync, error)) *MockHashSyncSourceCreateATXSyncCall { c.Call = c.Call.DoAndReturn(f) return c } @@ -419,3 +458,103 @@ func (c *MockLayerTickerCurrentLayerCall) DoAndReturn(f func() types.LayerID) *M c.Call = c.Call.DoAndReturn(f) return c } + +// MockHandler is a mock of Handler interface. +type MockHandler[T sync2.ItemID] struct { + ctrl *gomock.Controller + recorder *MockHandlerMockRecorder[T] + isgomock struct{} +} + +// MockHandlerMockRecorder is the mock recorder for MockHandler. +type MockHandlerMockRecorder[T sync2.ItemID] struct { + mock *MockHandler[T] +} + +// NewMockHandler creates a new mock instance. +func NewMockHandler[T sync2.ItemID](ctrl *gomock.Controller) *MockHandler[T] { + mock := &MockHandler[T]{ctrl: ctrl} + mock.recorder = &MockHandlerMockRecorder[T]{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHandler[T]) EXPECT() *MockHandlerMockRecorder[T] { + return m.recorder +} + +// Get mocks base method. +func (m *MockHandler[T]) Get(ctx context.Context, ids []T, callback func(T, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, ids, callback) + ret0, _ := ret[0].(error) + return ret0 +} + +// Get indicates an expected call of Get. +func (mr *MockHandlerMockRecorder[T]) Get(ctx, ids, callback any) *MockHandlerGetCall[T] { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockHandler[T])(nil).Get), ctx, ids, callback) + return &MockHandlerGetCall[T]{Call: call} +} + +// MockHandlerGetCall wrap *gomock.Call +type MockHandlerGetCall[T sync2.ItemID] struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockHandlerGetCall[T]) Return(arg0 error) *MockHandlerGetCall[T] { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockHandlerGetCall[T]) Do(f func(context.Context, []T, func(T, error)) error) *MockHandlerGetCall[T] { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockHandlerGetCall[T]) DoAndReturn(f func(context.Context, []T, func(T, error)) error) *MockHandlerGetCall[T] { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Register mocks base method. +func (m *MockHandler[T]) Register(peer p2p.Peer, k rangesync.KeyBytes) T { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Register", peer, k) + ret0, _ := ret[0].(T) + return ret0 +} + +// Register indicates an expected call of Register. +func (mr *MockHandlerMockRecorder[T]) Register(peer, k any) *MockHandlerRegisterCall[T] { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockHandler[T])(nil).Register), peer, k) + return &MockHandlerRegisterCall[T]{Call: call} +} + +// MockHandlerRegisterCall wrap *gomock.Call +type MockHandlerRegisterCall[T sync2.ItemID] struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockHandlerRegisterCall[T]) Return(arg0 T) *MockHandlerRegisterCall[T] { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockHandlerRegisterCall[T]) Do(f func(p2p.Peer, rangesync.KeyBytes) T) *MockHandlerRegisterCall[T] { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockHandlerRegisterCall[T]) DoAndReturn(f func(p2p.Peer, rangesync.KeyBytes) T) *MockHandlerRegisterCall[T] { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/sync2/p2p.go b/sync2/p2p.go index 3dff8c046c..1e817b8504 100644 --- a/sync2/p2p.go +++ b/sync2/p2p.go @@ -8,14 +8,20 @@ import ( "sync/atomic" "time" + "github.com/libp2p/go-libp2p/core/host" "go.uber.org/zap" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/fetch/peers" + "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sync2/multipeer" "github.com/spacemeshos/go-spacemesh/sync2/rangesync" ) +const ( + proto = "sync/2" +) + // Config contains the configuration for the P2PHashSync. type Config struct { rangesync.RangeSetReconcilerConfig `mapstructure:",squash"` @@ -186,6 +192,10 @@ func (s *P2PHashSync) Start() { // StartAndSync starts the multi-peer reconciler if it is not already running, and waits // until the local OrderedSet is in sync with the peers. func (s *P2PHashSync) StartAndSync(ctx context.Context) error { + if !s.enableActiveSync { + s.start() + return nil + } if s.start() { // If the multipeer reconciler is waiting for sync, we kick it to start // the sync so as not to wait for the next scheduled sync interval. @@ -235,3 +245,11 @@ func (s *P2PHashSync) WaitForSync(ctx context.Context) error { func (s *P2PHashSync) SyncCycleCount() int { return s.reconciler.SyncCycleCount() } + +// NewDispatcher creates a new rangesync.Dispatcher for the sync2 protocol with the given +// host and options. +func NewDispatcher(logger *zap.Logger, host host.Host, opts []server.Opt) *rangesync.Dispatcher { + d := rangesync.NewDispatcher(logger) + d.SetupServer(host, proto, opts...) + return d +} diff --git a/syncer/interface.go b/syncer/interface.go index 95d8f17125..6e7013c52a 100644 --- a/syncer/interface.go +++ b/syncer/interface.go @@ -48,6 +48,7 @@ type fetcher interface { GetAtxs(context.Context, []types.ATXID, ...system.GetAtxOpt) error GetMalfeasanceProofs(context.Context, []types.NodeID) error + GetMalfeasanceProofsWithCallback(context.Context, []types.NodeID, func(types.NodeID, error)) error GetBallots(context.Context, []types.BallotID) error GetBlocks(context.Context, []types.BlockID) error RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) @@ -77,3 +78,8 @@ type multiEpochAtxSyncerV2 interface { EnsureSync(ctx context.Context, lastWaitEpoch, newEpoch types.EpochID) (lastSynced types.EpochID, err error) Stop() } + +type malfeasanceSyncerV2 interface { + StartAndSync(ctx context.Context) error + Stop() +} diff --git a/syncer/mocks/mocks.go b/syncer/mocks/mocks.go index 8ad4707d01..5c4f2955a8 100644 --- a/syncer/mocks/mocks.go +++ b/syncer/mocks/mocks.go @@ -419,6 +419,44 @@ func (c *MockfetchLogicGetMalfeasanceProofsCall) DoAndReturn(f func(context.Cont return c } +// GetMalfeasanceProofsWithCallback mocks base method. +func (m *MockfetchLogic) GetMalfeasanceProofsWithCallback(arg0 context.Context, arg1 []types.NodeID, arg2 func(types.NodeID, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMalfeasanceProofsWithCallback", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetMalfeasanceProofsWithCallback indicates an expected call of GetMalfeasanceProofsWithCallback. +func (mr *MockfetchLogicMockRecorder) GetMalfeasanceProofsWithCallback(arg0, arg1, arg2 any) *MockfetchLogicGetMalfeasanceProofsWithCallbackCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMalfeasanceProofsWithCallback", reflect.TypeOf((*MockfetchLogic)(nil).GetMalfeasanceProofsWithCallback), arg0, arg1, arg2) + return &MockfetchLogicGetMalfeasanceProofsWithCallbackCall{Call: call} +} + +// MockfetchLogicGetMalfeasanceProofsWithCallbackCall wrap *gomock.Call +type MockfetchLogicGetMalfeasanceProofsWithCallbackCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetchLogicGetMalfeasanceProofsWithCallbackCall) Return(arg0 error) *MockfetchLogicGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetchLogicGetMalfeasanceProofsWithCallbackCall) Do(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockfetchLogicGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetchLogicGetMalfeasanceProofsWithCallbackCall) DoAndReturn(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockfetchLogicGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // PeerEpochInfo mocks base method. func (m *MockfetchLogic) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { m.ctrl.T.Helper() @@ -1114,6 +1152,44 @@ func (c *MockfetcherGetMalfeasanceProofsCall) DoAndReturn(f func(context.Context return c } +// GetMalfeasanceProofsWithCallback mocks base method. +func (m *Mockfetcher) GetMalfeasanceProofsWithCallback(arg0 context.Context, arg1 []types.NodeID, arg2 func(types.NodeID, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMalfeasanceProofsWithCallback", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetMalfeasanceProofsWithCallback indicates an expected call of GetMalfeasanceProofsWithCallback. +func (mr *MockfetcherMockRecorder) GetMalfeasanceProofsWithCallback(arg0, arg1, arg2 any) *MockfetcherGetMalfeasanceProofsWithCallbackCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMalfeasanceProofsWithCallback", reflect.TypeOf((*Mockfetcher)(nil).GetMalfeasanceProofsWithCallback), arg0, arg1, arg2) + return &MockfetcherGetMalfeasanceProofsWithCallbackCall{Call: call} +} + +// MockfetcherGetMalfeasanceProofsWithCallbackCall wrap *gomock.Call +type MockfetcherGetMalfeasanceProofsWithCallbackCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherGetMalfeasanceProofsWithCallbackCall) Return(arg0 error) *MockfetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherGetMalfeasanceProofsWithCallbackCall) Do(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockfetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherGetMalfeasanceProofsWithCallbackCall) DoAndReturn(f func(context.Context, []types.NodeID, func(types.NodeID, error)) error) *MockfetcherGetMalfeasanceProofsWithCallbackCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // PeerEpochInfo mocks base method. func (m *Mockfetcher) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { m.ctrl.T.Helper() @@ -1702,3 +1778,101 @@ func (c *MockmultiEpochAtxSyncerV2StopCall) DoAndReturn(f func()) *MockmultiEpoc c.Call = c.Call.DoAndReturn(f) return c } + +// MockmalfeasanceSyncerV2 is a mock of malfeasanceSyncerV2 interface. +type MockmalfeasanceSyncerV2 struct { + ctrl *gomock.Controller + recorder *MockmalfeasanceSyncerV2MockRecorder + isgomock struct{} +} + +// MockmalfeasanceSyncerV2MockRecorder is the mock recorder for MockmalfeasanceSyncerV2. +type MockmalfeasanceSyncerV2MockRecorder struct { + mock *MockmalfeasanceSyncerV2 +} + +// NewMockmalfeasanceSyncerV2 creates a new mock instance. +func NewMockmalfeasanceSyncerV2(ctrl *gomock.Controller) *MockmalfeasanceSyncerV2 { + mock := &MockmalfeasanceSyncerV2{ctrl: ctrl} + mock.recorder = &MockmalfeasanceSyncerV2MockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockmalfeasanceSyncerV2) EXPECT() *MockmalfeasanceSyncerV2MockRecorder { + return m.recorder +} + +// StartAndSync mocks base method. +func (m *MockmalfeasanceSyncerV2) StartAndSync(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartAndSync", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartAndSync indicates an expected call of StartAndSync. +func (mr *MockmalfeasanceSyncerV2MockRecorder) StartAndSync(ctx any) *MockmalfeasanceSyncerV2StartAndSyncCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartAndSync", reflect.TypeOf((*MockmalfeasanceSyncerV2)(nil).StartAndSync), ctx) + return &MockmalfeasanceSyncerV2StartAndSyncCall{Call: call} +} + +// MockmalfeasanceSyncerV2StartAndSyncCall wrap *gomock.Call +type MockmalfeasanceSyncerV2StartAndSyncCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockmalfeasanceSyncerV2StartAndSyncCall) Return(arg0 error) *MockmalfeasanceSyncerV2StartAndSyncCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockmalfeasanceSyncerV2StartAndSyncCall) Do(f func(context.Context) error) *MockmalfeasanceSyncerV2StartAndSyncCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockmalfeasanceSyncerV2StartAndSyncCall) DoAndReturn(f func(context.Context) error) *MockmalfeasanceSyncerV2StartAndSyncCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Stop mocks base method. +func (m *MockmalfeasanceSyncerV2) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockmalfeasanceSyncerV2MockRecorder) Stop() *MockmalfeasanceSyncerV2StopCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockmalfeasanceSyncerV2)(nil).Stop)) + return &MockmalfeasanceSyncerV2StopCall{Call: call} +} + +// MockmalfeasanceSyncerV2StopCall wrap *gomock.Call +type MockmalfeasanceSyncerV2StopCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockmalfeasanceSyncerV2StopCall) Return() *MockmalfeasanceSyncerV2StopCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockmalfeasanceSyncerV2StopCall) Do(f func()) *MockmalfeasanceSyncerV2StopCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockmalfeasanceSyncerV2StopCall) DoAndReturn(f func()) *MockmalfeasanceSyncerV2StopCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/syncer.go b/syncer/syncer.go index af05f1463e..0cf0d99ec5 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -50,8 +50,10 @@ type Config struct { type ReconcSyncConfig struct { Enable bool `mapstructure:"enable"` EnableActiveSync bool `mapstructure:"enable-active-sync"` + EnableMalSync bool `mapstructure:"enable-mal-sync"` OldAtxSyncCfg sync2.Config `mapstructure:"old-atx-sync"` NewAtxSyncCfg sync2.Config `mapstructure:"new-atx-sync"` + MalSyncCfg sync2.Config `mapstructure:"mal-sync"` ParallelLoadLimit int `mapstructure:"parallel-load-limit"` HardTimeout time.Duration `mapstructure:"hard-timeout"` ServerConfig fetch.ServerConfig `mapstructure:"server-config"` @@ -67,6 +69,9 @@ func DefaultConfig() Config { newAtxSyncCfg.MaxDepth = 21 newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute newAtxSyncCfg.AdvanceInterval = 5 * time.Minute + malSyncCfg := sync2.DefaultConfig() + malSyncCfg.MaxDepth = 16 + malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute return Config{ Interval: 10 * time.Second, EpochEndFraction: 0.5, @@ -83,6 +88,7 @@ func DefaultConfig() Config { EnableActiveSync: false, OldAtxSyncCfg: oldAtxSyncCfg, NewAtxSyncCfg: newAtxSyncCfg, + MalSyncCfg: malSyncCfg, ParallelLoadLimit: 10, HardTimeout: 10 * time.Minute, ServerConfig: fetch.ServerConfig{ @@ -162,6 +168,12 @@ func withAtxSyncerV2(asv2 multiEpochAtxSyncerV2) Option { } } +func withMalfeasanceSyncerV2(msv2 malfeasanceSyncerV2) Option { + return func(s *Syncer) { + s.msv2 = msv2 + } +} + // Syncer is responsible to keep the node in sync with the network. type Syncer struct { logger *zap.Logger @@ -206,6 +218,7 @@ type Syncer struct { stop context.CancelFunc asv2 multiEpochAtxSyncerV2 + msv2 malfeasanceSyncerV2 dispatcher *rangesync.Dispatcher } @@ -252,10 +265,11 @@ func NewSyncer( s.isBusy.Store(false) s.lastLayerSynced.Store(s.mesh.LatestLayer().Uint32()) s.lastEpochSynced.Store(types.GetEffectiveGenesis().GetEpoch().Uint32() - 1) - if s.cfg.ReconcSync.Enable && s.asv2 == nil { - serverOpts := s.cfg.ReconcSync.ServerConfig.ToOpts() - serverOpts = append(serverOpts, server.WithHardTimeout(s.cfg.ReconcSync.HardTimeout)) - s.dispatcher = sync2.NewDispatcher(s.logger, host, serverOpts) + if !s.cfg.ReconcSync.Enable || (s.asv2 != nil && s.msv2 != nil) { + return s, nil + } + if s.asv2 == nil { + s.ensureDispatcher(host) hss := sync2.NewATXSyncSource( s.logger, s.dispatcher, @@ -276,9 +290,28 @@ func NewSyncer( return nil, fmt.Errorf("creating multi-epoch ATX syncer: %w", err) } } + if s.msv2 == nil { + s.ensureDispatcher(host) + var err error + s.msv2, err = sync2.NewMalfeasanceSyncer( + s.logger, s.dispatcher, "malsync", s.cfg.ReconcSync.MalSyncCfg, cdb.Database, + fetcher, peerCache, s.cfg.ReconcSync.EnableActiveSync) + if err != nil { + return nil, fmt.Errorf("creating malfeasance syncer: %w", err) + } + } return s, nil } +func (s *Syncer) ensureDispatcher(host host.Host) { + if s.dispatcher != nil { + return + } + serverOpts := s.cfg.ReconcSync.ServerConfig.ToOpts() + serverOpts = append(serverOpts, server.WithHardTimeout(s.cfg.ReconcSync.HardTimeout)) + s.dispatcher = sync2.NewDispatcher(s.logger, host, serverOpts) +} + // Close stops the syncing process and the goroutines syncer spawns. func (s *Syncer) Close() { if s.stop == nil { @@ -290,6 +323,9 @@ func (s *Syncer) Close() { if s.asv2 != nil { s.asv2.Stop() } + if s.msv2 != nil { + s.msv2.Stop() + } s.logger.Debug("all syncer goroutines finished", zap.Error(err)) } @@ -686,11 +722,36 @@ func (s *Syncer) ensureMalfeasanceInSync(ctx context.Context) error { return nil } +func (s *Syncer) ensureMalfeasanceInSyncV2(ctx context.Context) error { + if !s.cfg.ReconcSync.EnableActiveSync || !s.cfg.ReconcSync.EnableMalSync { + // no actual active sync is initiated in this case, thus no logs + if err := s.msv2.StartAndSync(ctx); err != nil { + return fmt.Errorf("starting malfeasance syncv2: %w", err) + } + return nil + } + if !s.ListenToATXGossip() { + s.logger.Info("syncing malicious proofs", log.ZContext(ctx)) + if err := s.msv2.StartAndSync(ctx); err != nil { + return fmt.Errorf("starting malfeasance syncv2: %w", err) + } + s.logger.Info("malicious IDs synced", log.ZContext(ctx)) + // Malfeasance proofs are synced after the actual ATXs. + // We set ATX synced status after both ATXs and malfeascance proofs + // are in sync. + s.setATXSynced() + } + return nil +} + func (s *Syncer) syncAtxAndMalfeasance(ctx context.Context) error { if s.cfg.ReconcSync.Enable { if err := s.ensureATXsInSyncV2(ctx); err != nil { return err } + if err := s.ensureMalfeasanceInSyncV2(ctx); err != nil { + return err + } } if !s.cfg.ReconcSync.Enable || !s.cfg.ReconcSync.EnableActiveSync { // If syncv2 is being used in server-only mode, we still need to run @@ -699,7 +760,12 @@ func (s *Syncer) syncAtxAndMalfeasance(ctx context.Context) error { return err } } - return s.ensureMalfeasanceInSync(ctx) + if !s.cfg.ReconcSync.Enable || !s.cfg.ReconcSync.EnableActiveSync || !s.cfg.ReconcSync.EnableMalSync { + if err := s.ensureMalfeasanceInSync(ctx); err != nil { + return err + } + } + return nil } func isTooFarBehind( diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index febf439f16..e6a640b35b 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -82,6 +82,7 @@ type testSyncer struct { mCertHdr *mocks.MockcertHandler mForkFinder *mocks.MockforkFinder mASV2 *mocks.MockmultiEpochAtxSyncerV2 + mMSV2 *mocks.MockmalfeasanceSyncerV2 } func (ts *testSyncer) expectMalEnsureInSync(current types.LayerID) { @@ -127,6 +128,7 @@ func newTestSyncerWithConfig(tb testing.TB, cfg Config) *testSyncer { mCertHdr: mocks.NewMockcertHandler(ctrl), mForkFinder: mocks.NewMockforkFinder(ctrl), mASV2: mocks.NewMockmultiEpochAtxSyncerV2(ctrl), + mMSV2: mocks.NewMockmalfeasanceSyncerV2(ctrl), } db := statesql.InMemoryTest(tb) ts.cdb = datastore.NewCachedDB(db, lg) @@ -154,6 +156,7 @@ func newTestSyncerWithConfig(tb testing.TB, cfg Config) *testSyncer { withDataFetcher(ts.mDataFetcher), withForkFinder(ts.mForkFinder), withAtxSyncerV2(ts.mASV2), + withMalfeasanceSyncerV2(ts.mMSV2), ) require.NoError(tb, err) return ts @@ -212,6 +215,7 @@ func TestStartAndShutdown(t *testing.T) { }, time.Second, 10*time.Millisecond) ts.mASV2.EXPECT().Stop() + ts.mMSV2.EXPECT().Stop() cancel() require.False(t, ts.syncer.synchronize(ctx)) ts.syncer.Close() @@ -262,6 +266,7 @@ func TestSynchronize_OnlyOneSynchronize(t *testing.T) { <-dlCh ts.mASV2.EXPECT().Stop() + ts.mMSV2.EXPECT().Stop() cancel() ts.syncer.Close() } @@ -514,6 +519,7 @@ func TestSyncAtxs_Genesis_SyncV2(t *testing.T) { t.Run("no atx expected", func(t *testing.T) { ts := newSyncerWithoutPeriodicRunsWithConfig(t, cfg) ts.mTicker.advanceToLayer(1) + ts.mMSV2.EXPECT().StartAndSync(gomock.Any()) require.True(t, ts.syncer.synchronize(context.Background())) require.True(t, ts.syncer.ListenToATXGossip()) require.Equal(t, types.EpochID(0), ts.syncer.lastAtxEpoch()) @@ -528,6 +534,20 @@ func TestSyncAtxs_Genesis_SyncV2(t *testing.T) { require.False(t, ts.syncer.ListenToATXGossip()) ts.mASV2.EXPECT().EnsureSync(gomock.Any(), types.EpochID(0), epoch) ts.expectMalEnsureInSync(current) + ts.mMSV2.EXPECT().StartAndSync(gomock.Any()) + require.True(t, ts.syncer.synchronize(context.Background())) + require.True(t, ts.syncer.ListenToATXGossip()) + }) + + t.Run("with malfeasance syncv2", func(t *testing.T) { + cfg.ReconcSync.EnableMalSync = true + ts := newSyncerWithoutPeriodicRunsWithConfig(t, cfg) + epoch := types.EpochID(1) + current := epoch.FirstLayer() + 2 + ts.mTicker.advanceToLayer(current) // to pass epoch end fraction threshold + require.False(t, ts.syncer.ListenToATXGossip()) + ts.mASV2.EXPECT().EnsureSync(gomock.Any(), types.EpochID(0), epoch) + ts.mMSV2.EXPECT().StartAndSync(gomock.Any()) require.True(t, ts.syncer.synchronize(context.Background())) require.True(t, ts.syncer.ListenToATXGossip()) }) From 8dfe669a8d85d844ccd72b8cfc48eb08dce81ff6 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 23 Jan 2025 23:13:32 +0400 Subject: [PATCH 2/5] sync2: fix identitiesTable for malfeasance --- sync2/malfeasance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync2/malfeasance.go b/sync2/malfeasance.go index a93eab8a81..e080933e91 100644 --- a/sync2/malfeasance.go +++ b/sync2/malfeasance.go @@ -73,7 +73,7 @@ func (h *MalfeasanceHandler) Commit( func identitiesTable() *sqlstore.SyncedTable { return &sqlstore.SyncedTable{ TableName: "identities", - IDColumn: "id", + IDColumn: "pubkey", } } From a891179aa18f3049d7f977a1329433623db8d444 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 23 Jan 2025 23:13:54 +0400 Subject: [PATCH 3/5] syncer: fix syncv2 malfeasance logging --- syncer/syncer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 0cf0d99ec5..a7be88a5d2 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -294,8 +294,10 @@ func NewSyncer( s.ensureDispatcher(host) var err error s.msv2, err = sync2.NewMalfeasanceSyncer( - s.logger, s.dispatcher, "malsync", s.cfg.ReconcSync.MalSyncCfg, cdb.Database, - fetcher, peerCache, s.cfg.ReconcSync.EnableActiveSync) + s.logger.Named("malsync"), + s.dispatcher, "malsync", s.cfg.ReconcSync.MalSyncCfg, cdb.Database, + fetcher, peerCache, + s.cfg.ReconcSync.EnableActiveSync && s.cfg.ReconcSync.EnableMalSync) if err != nil { return nil, fmt.Errorf("creating malfeasance syncer: %w", err) } From 95d94a9185f9d0ca2e3fe91e8938f7874ad862ba Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 23 Jan 2025 23:14:06 +0400 Subject: [PATCH 4/5] syncer: fix test --- syncer/syncer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index e6a640b35b..adb76938b5 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -637,6 +637,7 @@ func TestSyncAtxs_SyncV2(t *testing.T) { for _, tc := range tcs { t.Run(tc.desc, func(t *testing.T) { ts := newSyncerWithoutPeriodicRunsWithConfig(t, cfg) + ts.mMSV2.EXPECT().StartAndSync(gomock.Any()).AnyTimes() ts.expectMalDownloadLoop() lyr := startWithSyncedState_SyncV2(t, ts) require.LessOrEqual(t, lyr, tc.current) From 3ef6421fc1c0ea83aa389a932b9eaf4154e21dde Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 23 Jan 2025 23:58:02 +0400 Subject: [PATCH 5/5] fetch: fix race in test --- fetch/mesh_data_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index b413669301..fa64aaa15e 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -413,10 +413,13 @@ func TestFetch_GetMalfeasanceProofsWithCallback(t *testing.T) { var eg errgroup.Group startTestLoop(t, f.Fetch, &eg, stop) + var mtx sync.Mutex var ids []types.NodeID require.NoError(t, f.GetMalfeasanceProofsWithCallback( context.Background(), nodeIDs, func(nodeID types.NodeID, err error) { + mtx.Lock() + defer mtx.Unlock() require.NotContains(t, ids, nodeID) ids = append(ids, nodeID) require.NoError(t, err)