Skip to content

Commit 9fe66f8

Browse files
committed
sync2: implement malfeasance sync
1 parent a8e23c1 commit 9fe66f8

18 files changed

+1069
-349
lines changed

config/mainnet.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ func MainnetConfig() Config {
8787
newAtxSyncCfg.MaxDepth = 21
8888
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute
8989
newAtxSyncCfg.AdvanceInterval = 5 * time.Minute
90+
malSyncCfg := sync2.DefaultConfig()
91+
malSyncCfg.MaxDepth = 16
92+
malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute
9093

9194
return Config{
9295
BaseConfig: BaseConfig{
@@ -227,6 +230,7 @@ func MainnetConfig() Config {
227230
ReconcSync: syncer.ReconcSyncConfig{
228231
OldAtxSyncCfg: oldAtxSyncCfg,
229232
NewAtxSyncCfg: newAtxSyncCfg,
233+
MalSyncCfg: malSyncCfg,
230234
ParallelLoadLimit: 10,
231235
HardTimeout: 10 * time.Minute,
232236
ServerConfig: fetch.ServerConfig{

config/presets/fastnet.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func fastnet() config.Config {
5656
conf.Sync.AtxSync.EpochInfoPeers = 10
5757
conf.Sync.AtxSync.RequestsLimit = 100
5858
conf.Sync.MalSync.IDRequestInterval = 20 * time.Second
59+
conf.Sync.ReconcSync.MalSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 20 * time.Second
5960
conf.LayersPerEpoch = 4
6061
conf.RegossipAtxInterval = 30 * time.Second
6162
conf.FETCH.RequestTimeout = 2 * time.Second

config/presets/testnet.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ func testnet() config.Config {
7373
newAtxSyncCfg := sync2.DefaultConfig()
7474
newAtxSyncCfg.MaxDepth = 21
7575
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
76+
malSyncCfg := sync2.DefaultConfig()
77+
malSyncCfg.MaxDepth = 16
78+
malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute
7679

7780
return config.Config{
7881
Preset: "testnet",
@@ -176,6 +179,7 @@ func testnet() config.Config {
176179
ReconcSync: syncer.ReconcSyncConfig{
177180
OldAtxSyncCfg: oldAtxSyncCfg,
178181
NewAtxSyncCfg: newAtxSyncCfg,
182+
MalSyncCfg: malSyncCfg,
179183
ParallelLoadLimit: 10,
180184
HardTimeout: time.Minute,
181185
ServerConfig: fetch.ServerConfig{

fetch/mesh_data.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,30 @@ func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error {
160160
return f.getHashes(ctx, []types.Hash32{set}, datastore.ActiveSet, f.validators.activeset.HandleMessage)
161161
}
162162

163-
// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
164-
func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error {
163+
// GetMalfeasanceProofsWithCallback gets malfeasance proofs for the specified NodeIDs and validates them.
164+
// If callback is not nil, GetMalfeasanceProofsWithCallback invokes the callback for each proof fetched.
165+
func (f *Fetch) GetMalfeasanceProofsWithCallback(
166+
ctx context.Context,
167+
ids []types.NodeID,
168+
callback func(types.NodeID, error),
169+
) error {
165170
if len(ids) == 0 {
166171
return nil
167172
}
168173
f.logger.Debug("requesting malfeasance proofs from peer", log.ZContext(ctx), zap.Int("num_proofs", len(ids)))
169174
hashes := types.NodeIDsToHashes(ids)
170-
return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage)
175+
var ghOpts []getHashesOpt
176+
if callback != nil {
177+
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
178+
callback(types.NodeID(hash), err)
179+
}))
180+
}
181+
return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage, ghOpts...)
182+
}
183+
184+
// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
185+
func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error {
186+
return f.GetMalfeasanceProofsWithCallback(ctx, ids, nil)
171187
}
172188

173189
// GetBallots gets data for the specified BallotIDs and validates them.

fetch/mesh_data_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,31 @@ func TestFetch_GetMalfeasanceProofs(t *testing.T) {
401401
require.NoError(t, eg.Wait())
402402
}
403403

404+
func TestFetch_GetMalfeasanceProofsWithCallback(t *testing.T) {
405+
nodeIDs := []types.NodeID{{1}, {2}, {3}}
406+
f := createFetch(t)
407+
f.mMalH.EXPECT().
408+
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
409+
Return(nil).
410+
Times(len(nodeIDs))
411+
412+
stop := make(chan struct{}, 1)
413+
var eg errgroup.Group
414+
startTestLoop(t, f.Fetch, &eg, stop)
415+
416+
var ids []types.NodeID
417+
require.NoError(t, f.GetMalfeasanceProofsWithCallback(
418+
context.Background(), nodeIDs,
419+
func(nodeID types.NodeID, err error) {
420+
require.NotContains(t, ids, nodeID)
421+
ids = append(ids, nodeID)
422+
require.NoError(t, err)
423+
}))
424+
require.ElementsMatch(t, nodeIDs, ids)
425+
close(stop)
426+
require.NoError(t, eg.Wait())
427+
}
428+
404429
func TestFetch_GetBlocks(t *testing.T) {
405430
blks := []*types.Block{
406431
genLayerBlock(types.LayerID(10), types.RandomTXSet(10)),

sync2/atxs.go

Lines changed: 15 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,14 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"sync"
87

98
"github.com/jonboulle/clockwork"
10-
"github.com/libp2p/go-libp2p/core/host"
119
"go.uber.org/zap"
1210
"golang.org/x/sync/errgroup"
1311

1412
"github.com/spacemeshos/go-spacemesh/common/types"
15-
"github.com/spacemeshos/go-spacemesh/fetch"
1613
"github.com/spacemeshos/go-spacemesh/fetch/peers"
1714
"github.com/spacemeshos/go-spacemesh/p2p"
18-
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
19-
"github.com/spacemeshos/go-spacemesh/p2p/server"
2015
"github.com/spacemeshos/go-spacemesh/sql"
2116
"github.com/spacemeshos/go-spacemesh/sql/expr"
2217
"github.com/spacemeshos/go-spacemesh/sync2/dbset"
@@ -26,18 +21,17 @@ import (
2621
"github.com/spacemeshos/go-spacemesh/system"
2722
)
2823

29-
const (
30-
proto = "sync/2"
31-
)
32-
3324
type ATXHandler struct {
3425
logger *zap.Logger
3526
f Fetcher
3627
clock clockwork.Clock
3728
cfg Config
3829
}
3930

40-
var _ multipeer.SyncKeyHandler = &ATXHandler{}
31+
var (
32+
_ multipeer.SyncKeyHandler = &ATXHandler{}
33+
_ Handler[types.ATXID] = &ATXHandler{}
34+
)
4135

4236
func NewATXHandler(
4337
logger *zap.Logger,
@@ -56,72 +50,14 @@ func NewATXHandler(
5650
}
5751
}
5852

59-
type commitState struct {
60-
state map[types.ATXID]uint
61-
total int
62-
numDownloaded int
63-
items []types.ATXID
53+
func (h *ATXHandler) Register(peer p2p.Peer, k rangesync.KeyBytes) types.ATXID {
54+
id := types.BytesToATXID(k)
55+
h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()})
56+
return id
6457
}
6558

66-
func (h *ATXHandler) setupState(
67-
peer p2p.Peer,
68-
base rangesync.OrderedSet,
69-
received rangesync.SeqResult,
70-
) (*commitState, error) {
71-
state := make(map[types.ATXID]uint)
72-
for k := range received.Seq {
73-
found, err := base.Has(k)
74-
if err != nil {
75-
return nil, fmt.Errorf("check if ATX exists: %w", err)
76-
}
77-
if found {
78-
continue
79-
}
80-
id := types.BytesToATXID(k)
81-
h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()})
82-
state[id] = 0
83-
}
84-
if err := received.Error(); err != nil {
85-
return nil, fmt.Errorf("get item: %w", err)
86-
}
87-
return &commitState{
88-
state: state,
89-
total: len(state),
90-
items: make([]types.ATXID, 0, h.cfg.BatchSize),
91-
}, nil
92-
}
93-
94-
func (h *ATXHandler) getAtxs(ctx context.Context, cs *commitState) (bool, error) {
95-
cs.items = cs.items[:0] // reuse the slice to reduce allocations
96-
for id := range cs.state {
97-
cs.items = append(cs.items, id)
98-
if uint(len(cs.items)) == h.cfg.BatchSize {
99-
break
100-
}
101-
}
102-
someSucceeded := false
103-
var mtx sync.Mutex
104-
err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) {
105-
mtx.Lock()
106-
defer mtx.Unlock()
107-
switch {
108-
case err == nil:
109-
cs.numDownloaded++
110-
someSucceeded = true
111-
delete(cs.state, id)
112-
case errors.Is(err, pubsub.ErrValidationReject):
113-
h.logger.Debug("failed to download ATX",
114-
zap.String("atx", id.ShortString()), zap.Error(err))
115-
delete(cs.state, id)
116-
case cs.state[id] >= h.cfg.MaxAttempts-1:
117-
h.logger.Debug("failed to download ATX: max attempts reached",
118-
zap.String("atx", id.ShortString()))
119-
delete(cs.state, id)
120-
default:
121-
cs.state[id]++
122-
}
123-
}))
124-
return someSucceeded, err
59+
func (h *ATXHandler) Get(ctx context.Context, ids []types.ATXID, callback func(types.ATXID, error)) error {
60+
return h.f.GetAtxs(ctx, ids, system.WithATXCallback(callback))
12561
}
12662

12763
func (h *ATXHandler) Commit(
@@ -132,46 +68,11 @@ func (h *ATXHandler) Commit(
13268
) error {
13369
h.logger.Debug("begin atx commit")
13470
defer h.logger.Debug("end atx commit")
135-
cs, err := h.setupState(peer, base, received)
71+
cs, err := NewCommitState(h.logger, h, h.clock, peer, base, received, h.cfg)
13672
if err != nil {
13773
return err
13874
}
139-
startTime := h.clock.Now()
140-
batchAttemptsRemaining := h.cfg.MaxBatchRetries
141-
for len(cs.state) > 0 {
142-
someSucceeded, err := h.getAtxs(ctx, cs)
143-
batchErr := &fetch.BatchError{}
144-
switch {
145-
case err == nil:
146-
case errors.Is(err, context.Canceled):
147-
return err
148-
case !errors.As(err, &batchErr):
149-
h.logger.Debug("failed to download ATXs", zap.Error(err))
150-
}
151-
if !someSucceeded {
152-
if batchAttemptsRemaining == 0 {
153-
return errors.New("failed to download ATXs: max batch retries reached")
154-
}
155-
batchAttemptsRemaining--
156-
h.logger.Debug("failed to download any ATXs: will retry batch",
157-
zap.Uint("remaining", batchAttemptsRemaining),
158-
zap.Duration("delay", h.cfg.FailedBatchDelay))
159-
select {
160-
case <-ctx.Done():
161-
return ctx.Err()
162-
case <-h.clock.After(h.cfg.FailedBatchDelay):
163-
continue
164-
}
165-
}
166-
167-
batchAttemptsRemaining = h.cfg.MaxBatchRetries
168-
elapsed := h.clock.Since(startTime)
169-
h.logger.Debug("fetched atxs",
170-
zap.Int("total", cs.total),
171-
zap.Int("downloaded", cs.numDownloaded),
172-
zap.Float64("rate per sec", float64(cs.numDownloaded)/elapsed.Seconds()))
173-
}
174-
return nil
75+
return cs.Commit(ctx)
17576
}
17677

17778
type MultiEpochATXSyncer struct {
@@ -221,7 +122,7 @@ func (s *MultiEpochATXSyncer) load(newEpoch types.EpochID) error {
221122
if epoch == newEpoch {
222123
cfg = s.newCfg
223124
}
224-
hs, err := s.hss.CreateHashSync(name, cfg, epoch)
125+
hs, err := s.hss.CreateATXSync(name, cfg, epoch)
225126
if err != nil {
226127
return fmt.Errorf("create ATX syncer for epoch %d: %w", epoch, err)
227128
}
@@ -307,12 +208,6 @@ func NewATXSyncer(
307208
return NewP2PHashSync(logger, d, name, curSet, 32, peers, handler, cfg, enableActiveSync)
308209
}
309210

310-
func NewDispatcher(logger *zap.Logger, host host.Host, opts []server.Opt) *rangesync.Dispatcher {
311-
d := rangesync.NewDispatcher(logger)
312-
d.SetupServer(host, proto, opts...)
313-
return d
314-
}
315-
316211
type ATXSyncSource struct {
317212
logger *zap.Logger
318213
d *rangesync.Dispatcher
@@ -335,7 +230,7 @@ func NewATXSyncSource(
335230
return &ATXSyncSource{logger: logger, d: d, db: db, f: f, peers: peers, enableActiveSync: enableActiveSync}
336231
}
337232

338-
// CreateHashSync implements HashSyncSource.
339-
func (as *ATXSyncSource) CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) {
233+
// CreateATXSync implements HashSyncSource.
234+
func (as *ATXSyncSource) CreateATXSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) {
340235
return NewATXSyncer(as.logger.Named(name), as.d, name, cfg, as.db, as.f, as.peers, epoch, as.enableActiveSync)
341236
}

0 commit comments

Comments
 (0)