Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func MainnetConfig() Config {
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute
newAtxSyncCfg.AdvanceInterval = 5 * time.Minute
malSyncCfg := sync2.DefaultConfig()
malSyncCfg.MaxDepth = 16
malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute

return Config{
BaseConfig: BaseConfig{
Expand Down Expand Up @@ -228,6 +231,7 @@ func MainnetConfig() Config {
Enable: true,
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
MalSyncCfg: malSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: 10 * time.Minute,
ServerConfig: fetch.ServerConfig{
Expand Down
1 change: 1 addition & 0 deletions config/presets/fastnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func fastnet() config.Config {
conf.Sync.AtxSync.EpochInfoPeers = 10
conf.Sync.AtxSync.RequestsLimit = 100
conf.Sync.MalSync.IDRequestInterval = 20 * time.Second
conf.Sync.ReconcSync.MalSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 20 * time.Second
conf.LayersPerEpoch = 4
conf.RegossipAtxInterval = 30 * time.Second
conf.FETCH.RequestTimeout = 2 * time.Second
Expand Down
4 changes: 4 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func testnet() config.Config {
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncPeerCount = 4
malSyncCfg := sync2.DefaultConfig()
malSyncCfg.MaxDepth = 16
malSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 30 * time.Minute

return config.Config{
Preset: "testnet",
Expand Down Expand Up @@ -179,6 +182,7 @@ func testnet() config.Config {
Enable: true,
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
MalSyncCfg: malSyncCfg,
ParallelLoadLimit: 10,
HardTimeout: time.Minute,
ServerConfig: fetch.ServerConfig{
Expand Down
22 changes: 19 additions & 3 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,30 @@ func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error {
return f.getHashes(ctx, []types.Hash32{set}, datastore.ActiveSet, f.validators.activeset.HandleMessage)
}

// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error {
// GetMalfeasanceProofsWithCallback gets malfeasance proofs for the specified NodeIDs and validates them.
// If callback is not nil, GetMalfeasanceProofsWithCallback invokes the callback for each proof fetched.
func (f *Fetch) GetMalfeasanceProofsWithCallback(
ctx context.Context,
ids []types.NodeID,
callback func(types.NodeID, error),
) error {
if len(ids) == 0 {
return nil
}
f.logger.Debug("requesting malfeasance proofs from peer", log.ZContext(ctx), zap.Int("num_proofs", len(ids)))
hashes := types.NodeIDsToHashes(ids)
return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage)
var ghOpts []getHashesOpt
if callback != nil {
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
callback(types.NodeID(hash), err)
}))
}
return f.getHashes(ctx, hashes, datastore.Malfeasance, f.validators.malfeasance.HandleMessage, ghOpts...)
}

// GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error {
return f.GetMalfeasanceProofsWithCallback(ctx, ids, nil)
}

// GetBallots gets data for the specified BallotIDs and validates them.
Expand Down
28 changes: 28 additions & 0 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,34 @@ func TestFetch_GetMalfeasanceProofs(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestFetch_GetMalfeasanceProofsWithCallback(t *testing.T) {
nodeIDs := []types.NodeID{{1}, {2}, {3}}
f := createFetch(t)
f.mMalH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
Times(len(nodeIDs))

stop := make(chan struct{}, 1)
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

var mtx sync.Mutex
var ids []types.NodeID
require.NoError(t, f.GetMalfeasanceProofsWithCallback(
context.Background(), nodeIDs,
func(nodeID types.NodeID, err error) {
mtx.Lock()
defer mtx.Unlock()
require.NotContains(t, ids, nodeID)
ids = append(ids, nodeID)
require.NoError(t, err)
}))
require.ElementsMatch(t, nodeIDs, ids)
close(stop)
require.NoError(t, eg.Wait())
}

func TestFetch_GetBlocks(t *testing.T) {
blks := []*types.Block{
genLayerBlock(types.LayerID(10), types.RandomTXSet(10)),
Expand Down
135 changes: 15 additions & 120 deletions sync2/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@
"context"
"errors"
"fmt"
"sync"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/host"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/fetch"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/expr"
"github.com/spacemeshos/go-spacemesh/sync2/dbset"
Expand All @@ -26,18 +21,17 @@
"github.com/spacemeshos/go-spacemesh/system"
)

const (
proto = "sync/2"
)

type ATXHandler struct {
logger *zap.Logger
f Fetcher
clock clockwork.Clock
cfg Config
}

var _ multipeer.SyncKeyHandler = &ATXHandler{}
var (
_ multipeer.SyncKeyHandler = &ATXHandler{}
_ Handler[types.ATXID] = &ATXHandler{}
)

func NewATXHandler(
logger *zap.Logger,
Expand All @@ -56,72 +50,14 @@
}
}

type commitState struct {
state map[types.ATXID]uint
total int
numDownloaded int
items []types.ATXID
func (h *ATXHandler) Register(peer p2p.Peer, k rangesync.KeyBytes) types.ATXID {
id := types.BytesToATXID(k)
h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()})
return id
}

func (h *ATXHandler) setupState(
peer p2p.Peer,
base rangesync.OrderedSet,
received rangesync.SeqResult,
) (*commitState, error) {
state := make(map[types.ATXID]uint)
for k := range received.Seq {
found, err := base.Has(k)
if err != nil {
return nil, fmt.Errorf("check if ATX exists: %w", err)
}
if found {
continue
}
id := types.BytesToATXID(k)
h.f.RegisterPeerHashes(peer, []types.Hash32{id.Hash32()})
state[id] = 0
}
if err := received.Error(); err != nil {
return nil, fmt.Errorf("get item: %w", err)
}
return &commitState{
state: state,
total: len(state),
items: make([]types.ATXID, 0, h.cfg.BatchSize),
}, nil
}

func (h *ATXHandler) getAtxs(ctx context.Context, cs *commitState) (bool, error) {
cs.items = cs.items[:0] // reuse the slice to reduce allocations
for id := range cs.state {
cs.items = append(cs.items, id)
if uint(len(cs.items)) == h.cfg.BatchSize {
break
}
}
someSucceeded := false
var mtx sync.Mutex
err := h.f.GetAtxs(ctx, cs.items, system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
switch {
case err == nil:
cs.numDownloaded++
someSucceeded = true
delete(cs.state, id)
case errors.Is(err, pubsub.ErrValidationReject):
h.logger.Debug("failed to download ATX",
zap.String("atx", id.ShortString()), zap.Error(err))
delete(cs.state, id)
case cs.state[id] >= h.cfg.MaxAttempts-1:
h.logger.Debug("failed to download ATX: max attempts reached",
zap.String("atx", id.ShortString()))
delete(cs.state, id)
default:
cs.state[id]++
}
}))
return someSucceeded, err
func (h *ATXHandler) Get(ctx context.Context, ids []types.ATXID, callback func(types.ATXID, error)) error {
return h.f.GetAtxs(ctx, ids, system.WithATXCallback(callback))
}

func (h *ATXHandler) Commit(
Expand All @@ -132,46 +68,11 @@
) error {
h.logger.Debug("begin atx commit")
defer h.logger.Debug("end atx commit")
cs, err := h.setupState(peer, base, received)
cs, err := NewCommitState(h.logger, h, h.clock, peer, base, received, h.cfg)
if err != nil {
return err
}
startTime := h.clock.Now()
batchAttemptsRemaining := h.cfg.MaxBatchRetries
for len(cs.state) > 0 {
someSucceeded, err := h.getAtxs(ctx, cs)
batchErr := &fetch.BatchError{}
switch {
case err == nil:
case errors.Is(err, context.Canceled):
return err
case !errors.As(err, &batchErr):
h.logger.Debug("failed to download ATXs", zap.Error(err))
}
if !someSucceeded {
if batchAttemptsRemaining == 0 {
return errors.New("failed to download ATXs: max batch retries reached")
}
batchAttemptsRemaining--
h.logger.Debug("failed to download any ATXs: will retry batch",
zap.Uint("remaining", batchAttemptsRemaining),
zap.Duration("delay", h.cfg.FailedBatchDelay))
select {
case <-ctx.Done():
return ctx.Err()
case <-h.clock.After(h.cfg.FailedBatchDelay):
continue
}
}

batchAttemptsRemaining = h.cfg.MaxBatchRetries
elapsed := h.clock.Since(startTime)
h.logger.Debug("fetched atxs",
zap.Int("total", cs.total),
zap.Int("downloaded", cs.numDownloaded),
zap.Float64("rate per sec", float64(cs.numDownloaded)/elapsed.Seconds()))
}
return nil
return cs.Commit(ctx)
}

type MultiEpochATXSyncer struct {
Expand Down Expand Up @@ -221,7 +122,7 @@
if epoch == newEpoch {
cfg = s.newCfg
}
hs, err := s.hss.CreateHashSync(name, cfg, epoch)
hs, err := s.hss.CreateATXSync(name, cfg, epoch)
if err != nil {
return fmt.Errorf("create ATX syncer for epoch %d: %w", epoch, err)
}
Expand Down Expand Up @@ -307,12 +208,6 @@
return NewP2PHashSync(logger, d, name, curSet, 32, peers, handler, cfg, enableActiveSync)
}

func NewDispatcher(logger *zap.Logger, host host.Host, opts []server.Opt) *rangesync.Dispatcher {
d := rangesync.NewDispatcher(logger)
d.SetupServer(host, proto, opts...)
return d
}

type ATXSyncSource struct {
logger *zap.Logger
d *rangesync.Dispatcher
Expand All @@ -335,7 +230,7 @@
return &ATXSyncSource{logger: logger, d: d, db: db, f: f, peers: peers, enableActiveSync: enableActiveSync}
}

// CreateHashSync implements HashSyncSource.
func (as *ATXSyncSource) CreateHashSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) {
// CreateATXSync implements HashSyncSource.
func (as *ATXSyncSource) CreateATXSync(name string, cfg Config, epoch types.EpochID) (HashSync, error) {

Check warning on line 234 in sync2/atxs.go

View check run for this annotation

Codecov / codecov/patch

sync2/atxs.go#L234

Added line #L234 was not covered by tests
return NewATXSyncer(as.logger.Named(name), as.d, name, cfg, as.db, as.f, as.peers, epoch, as.enableActiveSync)
}
Loading
Loading