Skip to content

Commit 4c57c22

Browse files
committed
sync2: fix race in the p2p test
1 parent 500ab74 commit 4c57c22

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

sync2/p2p_test.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type fakeHandler struct {
3333
mtx *sync.Mutex
3434
localPeerID p2p.Peer
3535
synced map[addedKey]struct{}
36+
committed map[string]struct{}
3637
}
3738

3839
func (fh *fakeHandler) Receive(k rangesync.KeyBytes, peer p2p.Peer) (bool, error) {
@@ -50,11 +51,21 @@ func (fh *fakeHandler) Commit(peer p2p.Peer, base, new multipeer.OrderedSet) err
5051
fh.mtx.Lock()
5152
defer fh.mtx.Unlock()
5253
for k := range fh.synced {
53-
base.(*multipeer.DumbSet).AddUnchecked(rangesync.KeyBytes(k.key))
54+
fh.committed[k.key] = struct{}{}
5455
}
56+
clear(fh.synced)
5557
return nil
5658
}
5759

60+
func (fh *fakeHandler) committedItems() (items []rangesync.KeyBytes) {
61+
fh.mtx.Lock()
62+
defer fh.mtx.Unlock()
63+
for k := range fh.committed {
64+
items = append(items, rangesync.KeyBytes(k))
65+
}
66+
return items
67+
}
68+
5869
func TestP2P(t *testing.T) {
5970
const (
6071
numNodes = 4
@@ -65,8 +76,8 @@ func TestP2P(t *testing.T) {
6576
logger := zaptest.NewLogger(t)
6677
mesh, err := mocknet.FullMeshConnected(numNodes)
6778
require.NoError(t, err)
68-
synced := make(map[addedKey]struct{})
6979
hs := make([]*sync2.P2PHashSync, numNodes)
80+
handlers := make([]*fakeHandler, numNodes)
7081
initialSet := make([]rangesync.KeyBytes, numHashes)
7182
for n := range initialSet {
7283
initialSet[n] = rangesync.RandomKeyBytes(32)
@@ -85,10 +96,11 @@ func TestP2P(t *testing.T) {
8596
cfg.EnableActiveSync = true
8697
cfg.SyncInterval = 100 * time.Millisecond
8798
host := mesh.Hosts()[n]
88-
handler := &fakeHandler{
99+
handlers[n] = &fakeHandler{
89100
mtx: &mtx,
90101
localPeerID: host.ID(),
91-
synced: synced,
102+
synced: make(map[addedKey]struct{}),
103+
committed: make(map[string]struct{}),
92104
}
93105
os := multipeer.NewDumbHashSet()
94106
d := rangesync.NewDispatcher(logger)
@@ -98,7 +110,7 @@ func TestP2P(t *testing.T) {
98110
eg.Go(func() error { return srv.Run(ctx) })
99111
hs[n] = sync2.NewP2PHashSync(
100112
logger.Named(fmt.Sprintf("node%d", n)),
101-
d, "test", os, keyLen, maxDepth, ps, handler, cfg, srv)
113+
d, "test", os, keyLen, maxDepth, ps, handlers[n], cfg, srv)
102114
require.NoError(t, hs[n].Load())
103115
is := hs[n].Set().(*multipeer.DumbSet)
104116
is.SetAllowMultiReceive(true)
@@ -112,12 +124,15 @@ func TestP2P(t *testing.T) {
112124
}
113125

114126
require.Eventually(t, func() bool {
115-
for _, hsync := range hs {
127+
for n, hsync := range hs {
116128
// use a snapshot to avoid races
117129
if !hsync.Synced() {
118130
return false
119131
}
120132
os := hsync.Set().Copy(false)
133+
for _, k := range handlers[n].committedItems() {
134+
os.(*multipeer.DumbSet).AddUnchecked(k)
135+
}
121136
empty, err := os.Empty()
122137
require.NoError(t, err)
123138
if empty {
@@ -134,9 +149,13 @@ func TestP2P(t *testing.T) {
134149
return true
135150
}, 30*time.Second, 300*time.Millisecond)
136151

137-
for _, hsync := range hs {
152+
for n, hsync := range hs {
138153
hsync.Stop()
139-
actualItems, err := hsync.Set().Items().Collect()
154+
os := hsync.Set().Copy(false)
155+
for _, k := range handlers[n].committedItems() {
156+
os.(*multipeer.DumbSet).AddUnchecked(k)
157+
}
158+
actualItems, err := os.Items().Collect()
140159
require.NoError(t, err)
141160
require.ElementsMatch(t, initialSet, actualItems)
142161
}

0 commit comments

Comments
 (0)