Skip to content

Commit ef30f47

Browse files
committed
sync2: implement multi-peer synchronization
This adds multi-peer synchronization support. When the local set differs too much from the remote sets, "torrent-style" "split sync" is attempted which splits the set into subranges and syncs each sub-range against a separate peer. Otherwise, the full sync is done, syncing the whole set against each of the synchronization peers. Full sync is also done after each split sync run. The local set can be considered synchronized after the specified number of full syncs has happened. The approach is loosely based on [SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf) paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.
1 parent 86b9591 commit ef30f47

22 files changed

+3877
-12
lines changed

fetch/peers/peers.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ type Peers struct {
5454
globalLatency float64
5555
}
5656

57+
func (p *Peers) Contains(id peer.ID) bool {
58+
p.mu.Lock()
59+
defer p.mu.Unlock()
60+
_, exist := p.peers[id]
61+
return exist
62+
}
63+
5764
func (p *Peers) Add(id peer.ID) bool {
5865
p.mu.Lock()
5966
defer p.mu.Unlock()

p2p/server/server.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,30 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
104104
}
105105
}
106106

107+
// WithDecayingTag specifies P2P decaying tag that is applied to the peer when a request
108+
// is being served.
107109
func WithDecayingTag(tag DecayingTagSpec) Opt {
108110
return func(s *Server) {
109111
s.decayingTagSpec = &tag
110112
}
111113
}
112114

115+
type peerIDKey struct{}
116+
117+
func withPeerID(ctx context.Context, peerID peer.ID) context.Context {
118+
return context.WithValue(ctx, peerIDKey{}, peerID)
119+
}
120+
121+
// ContextPeerID retrieves the ID of the peer being served from the context and a boolean
122+
// value indicating that the context contains peer ID. If there's no peer ID associated
123+
// with the context, the function returns an empty peer ID and false.
124+
func ContextPeerID(ctx context.Context) (peer.ID, bool) {
125+
if v := ctx.Value(peerIDKey{}); v != nil {
126+
return v.(peer.ID), true
127+
}
128+
return peer.ID(""), false
129+
}
130+
113131
// Handler is a handler to be defined by the application.
114132
type Handler func(context.Context, []byte) ([]byte, error)
115133

@@ -264,7 +282,8 @@ func (s *Server) Run(ctx context.Context) error {
264282
eg.Wait()
265283
return nil
266284
}
267-
ctx, cancel := context.WithCancel(ctx)
285+
peer := req.stream.Conn().RemotePeer()
286+
ctx, cancel := context.WithCancel(withPeerID(ctx, peer))
268287
eg.Go(func() error {
269288
<-ctx.Done()
270289
s.sem.Release(1)
@@ -275,7 +294,7 @@ func (s *Server) Run(ctx context.Context) error {
275294
defer cancel()
276295
conn := req.stream.Conn()
277296
if s.decayingTag != nil {
278-
s.decayingTag.Bump(conn.RemotePeer(), s.decayingTagSpec.Inc)
297+
s.decayingTag.Bump(peer, s.decayingTagSpec.Inc)
279298
}
280299
ok := s.queueHandler(ctx, req.stream)
281300
duration := time.Since(req.received)

p2p/server/server_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/libp2p/go-libp2p/core/host"
11+
"github.com/libp2p/go-libp2p/core/peer"
1112
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
1213
"github.com/spacemeshos/go-scale/tester"
1314
"github.com/stretchr/testify/assert"
@@ -45,8 +46,10 @@ func TestServer(t *testing.T) {
4546
request := []byte("test request")
4647
testErr := errors.New("test error")
4748

48-
handler := func(_ context.Context, msg []byte) ([]byte, error) {
49-
return msg, nil
49+
handler := func(ctx context.Context, msg []byte) ([]byte, error) {
50+
peerID, found := ContextPeerID(ctx)
51+
require.True(t, found)
52+
return append(msg, []byte(peerID)...), nil
5053
}
5154
errhandler := func(_ context.Context, _ []byte) ([]byte, error) {
5255
return nil, testErr
@@ -81,6 +84,9 @@ func TestServer(t *testing.T) {
8184
append(opts, WithRequestSizeLimit(limit))...,
8285
)
8386
ctx, cancel := context.WithCancel(context.Background())
87+
noPeerID, found := ContextPeerID(ctx)
88+
require.Equal(t, peer.ID(""), noPeerID)
89+
require.False(t, found)
8490
var eg errgroup.Group
8591
eg.Go(func() error {
8692
return srv1.Run(ctx)
@@ -109,7 +115,8 @@ func TestServer(t *testing.T) {
109115
srvID := mesh.Hosts()[1].ID()
110116
response, err := client.Request(ctx, srvID, request)
111117
require.NoError(t, err)
112-
require.Equal(t, request, response)
118+
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
119+
require.Equal(t, expResponse, response)
113120
srvConns := mesh.Hosts()[1].Network().ConnsToPeer(mesh.Hosts()[0].ID())
114121
require.NotEmpty(t, srvConns)
115122
require.Equal(t, n+1, srv1.NumAcceptedRequests())
@@ -129,7 +136,8 @@ func TestServer(t *testing.T) {
129136
srvID := mesh.Hosts()[3].ID()
130137
response, err := client.Request(ctx, srvID, request)
131138
require.NoError(t, err)
132-
require.Equal(t, request, response)
139+
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
140+
require.Equal(t, expResponse, response)
133141
srvConns := mesh.Hosts()[3].Network().ConnsToPeer(mesh.Hosts()[0].ID())
134142
require.NotEmpty(t, srvConns)
135143
require.Equal(t, n+1, srv1.NumAcceptedRequests())

sync2/multipeer/delim.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package multipeer
2+
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
7+
)
8+
9+
func getDelimiters(numPeers, keyLen, maxDepth int) (h []rangesync.KeyBytes) {
10+
if numPeers < 2 {
11+
return nil
12+
}
13+
mask := uint64(0xffffffffffffffff) << (64 - maxDepth)
14+
inc := (uint64(0x80) << 56) / uint64(numPeers)
15+
h = make([]rangesync.KeyBytes, numPeers-1)
16+
for i, v := 0, uint64(0); i < numPeers-1; i++ {
17+
h[i] = make(rangesync.KeyBytes, keyLen)
18+
v += inc
19+
binary.BigEndian.PutUint64(h[i], (v<<1)&mask)
20+
}
21+
return h
22+
}

sync2/multipeer/delim_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package multipeer_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/spacemeshos/go-spacemesh/sync2/multipeer"
9+
)
10+
11+
func TestGetDelimiters(t *testing.T) {
12+
for _, tc := range []struct {
13+
numPeers int
14+
keyLen int
15+
maxDepth int
16+
values []string
17+
}{
18+
{
19+
numPeers: 0,
20+
maxDepth: 64,
21+
keyLen: 32,
22+
values: nil,
23+
},
24+
{
25+
numPeers: 1,
26+
maxDepth: 64,
27+
keyLen: 32,
28+
values: nil,
29+
},
30+
{
31+
numPeers: 2,
32+
maxDepth: 64,
33+
keyLen: 32,
34+
values: []string{
35+
"8000000000000000000000000000000000000000000000000000000000000000",
36+
},
37+
},
38+
{
39+
numPeers: 2,
40+
maxDepth: 24,
41+
keyLen: 32,
42+
values: []string{
43+
"8000000000000000000000000000000000000000000000000000000000000000",
44+
},
45+
},
46+
{
47+
numPeers: 3,
48+
maxDepth: 64,
49+
keyLen: 32,
50+
values: []string{
51+
"5555555555555554000000000000000000000000000000000000000000000000",
52+
"aaaaaaaaaaaaaaa8000000000000000000000000000000000000000000000000",
53+
},
54+
},
55+
{
56+
numPeers: 3,
57+
maxDepth: 24,
58+
keyLen: 32,
59+
values: []string{
60+
"5555550000000000000000000000000000000000000000000000000000000000",
61+
"aaaaaa0000000000000000000000000000000000000000000000000000000000",
62+
},
63+
},
64+
{
65+
numPeers: 3,
66+
maxDepth: 4,
67+
keyLen: 32,
68+
values: []string{
69+
"5000000000000000000000000000000000000000000000000000000000000000",
70+
"a000000000000000000000000000000000000000000000000000000000000000",
71+
},
72+
},
73+
{
74+
numPeers: 4,
75+
maxDepth: 64,
76+
keyLen: 32,
77+
values: []string{
78+
"4000000000000000000000000000000000000000000000000000000000000000",
79+
"8000000000000000000000000000000000000000000000000000000000000000",
80+
"c000000000000000000000000000000000000000000000000000000000000000",
81+
},
82+
},
83+
{
84+
numPeers: 4,
85+
maxDepth: 24,
86+
keyLen: 32,
87+
values: []string{
88+
"4000000000000000000000000000000000000000000000000000000000000000",
89+
"8000000000000000000000000000000000000000000000000000000000000000",
90+
"c000000000000000000000000000000000000000000000000000000000000000",
91+
},
92+
},
93+
} {
94+
ds := multipeer.GetDelimiters(tc.numPeers, tc.keyLen, tc.maxDepth)
95+
var hs []string
96+
for _, d := range ds {
97+
hs = append(hs, d.String())
98+
}
99+
if len(tc.values) == 0 {
100+
require.Empty(t, hs, "%d delimiters", tc.numPeers)
101+
} else {
102+
require.Equal(t, tc.values, hs, "%d delimiters", tc.numPeers)
103+
}
104+
}
105+
}

sync2/multipeer/dumbset.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package multipeer
2+
3+
import (
4+
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
5+
)
6+
7+
// DumbSet is an unoptimized OrderedSet to be used for testing purposes.
8+
// It builds on rangesync.DumbSet.
9+
type DumbSet struct {
10+
*rangesync.DumbSet
11+
}
12+
13+
var _ OrderedSet = &DumbSet{}
14+
15+
// NewDumbHashSet creates an unoptimized OrderedSet to be used for testing purposes.
16+
// If disableReAdd is true, receiving the same item multiple times will fail.
17+
func NewDumbHashSet() *DumbSet {
18+
return &DumbSet{
19+
DumbSet: &rangesync.DumbSet{},
20+
}
21+
}
22+
23+
// Advance implements OrderedSet.
24+
func (ds *DumbSet) EnsureLoaded() error {
25+
return nil
26+
}
27+
28+
// Advance implements OrderedSet.
29+
func (ds *DumbSet) Advance() error {
30+
return nil
31+
}
32+
33+
// Has implements OrderedSet.
34+
func (ds *DumbSet) Has(k rangesync.KeyBytes) (bool, error) {
35+
var first rangesync.KeyBytes
36+
sr := ds.Items()
37+
for cur := range sr.Seq {
38+
if first == nil {
39+
first = cur
40+
} else if first.Compare(cur) == 0 {
41+
return false, sr.Error()
42+
}
43+
if k.Compare(cur) == 0 {
44+
return true, sr.Error()
45+
}
46+
}
47+
return false, sr.Error()
48+
}
49+
50+
// Copy implements OrderedSet.
51+
func (ds *DumbSet) Copy(syncScope bool) rangesync.OrderedSet {
52+
return &DumbSet{ds.DumbSet.Copy(syncScope).(*rangesync.DumbSet)}
53+
}
54+
55+
// Release implements OrderedSet.
56+
func (ds *DumbSet) Release() error {
57+
return nil
58+
}

sync2/multipeer/export_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package multipeer
2+
3+
import (
4+
"context"
5+
6+
"github.com/spacemeshos/go-spacemesh/p2p"
7+
)
8+
9+
type (
10+
SyncRunner = syncRunner
11+
SplitSync = splitSync
12+
)
13+
14+
var (
15+
WithSyncRunner = withSyncRunner
16+
WithClock = withClock
17+
GetDelimiters = getDelimiters
18+
NewSyncQueue = newSyncQueue
19+
NewSplitSync = newSplitSync
20+
NewSyncList = newSyncList
21+
)
22+
23+
func (mpr *MultiPeerReconciler) FullSync(ctx context.Context, syncPeers []p2p.Peer) error {
24+
return mpr.fullSync(ctx, syncPeers)
25+
}

0 commit comments

Comments
 (0)