Skip to content

Commit 9adb7f3

Browse files
committed
sync2: get rid of contexts and improve logging and sequence handling
1 parent 9405bc3 commit 9adb7f3

File tree

13 files changed

+292
-358
lines changed

13 files changed

+292
-358
lines changed

sync2/rangesync/dumbset.go

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package rangesync
22

33
import (
4-
"context"
54
"crypto/md5"
65
"errors"
76
"slices"
@@ -95,7 +94,7 @@ type dumbSet struct {
9594

9695
var _ OrderedSet = &dumbSet{}
9796

98-
func (ds *dumbSet) Receive(ctx context.Context, id types.KeyBytes) error {
97+
func (ds *dumbSet) Receive(id types.KeyBytes) error {
9998
if len(ds.keys) == 0 {
10099
ds.keys = []types.KeyBytes{id}
101100
return nil
@@ -124,22 +123,25 @@ func (ds *dumbSet) Receive(ctx context.Context, id types.KeyBytes) error {
124123
return nil
125124
}
126125

127-
func (ds *dumbSet) seq(n int) types.Seq {
126+
func (ds *dumbSet) seq(n int) types.SeqResult {
128127
if n < 0 || n > len(ds.keys) {
129128
panic("bad index")
130129
}
131-
return types.Seq(func(yield func(types.KeyBytes, error) bool) {
132-
n := n // make the sequence reusable
133-
for {
134-
if !yield(ds.keys[n], nil) {
135-
break
130+
return types.SeqResult{
131+
Seq: types.Seq(func(yield func(types.KeyBytes) bool) {
132+
n := n // make the sequence reusable
133+
for {
134+
if !yield(ds.keys[n]) {
135+
break
136+
}
137+
n = (n + 1) % len(ds.keys)
136138
}
137-
n = (n + 1) % len(ds.keys)
138-
}
139-
})
139+
}),
140+
Error: types.NoSeqError,
141+
}
140142
}
141143

142-
func (ds *dumbSet) seqFor(s types.KeyBytes) types.Seq {
144+
func (ds *dumbSet) seqFor(s types.KeyBytes) types.SeqResult {
143145
n := slices.IndexFunc(ds.keys, func(k types.KeyBytes) bool {
144146
return k.Compare(s) == 0
145147
})
@@ -150,7 +152,6 @@ func (ds *dumbSet) seqFor(s types.KeyBytes) types.Seq {
150152
}
151153

152154
func (ds *dumbSet) getRangeInfo(
153-
_ context.Context,
154155
x, y types.KeyBytes,
155156
count int,
156157
) (r RangeInfo, end types.KeyBytes, err error) {
@@ -179,35 +180,29 @@ func (ds *dumbSet) getRangeInfo(
179180
panic("empty start/end from naiveRange")
180181
}
181182
r.Items = ds.seqFor(start)
183+
} else {
184+
r.Items = types.EmptySeqResult()
182185
}
183186
return r, end, nil
184187
}
185188

186-
func (ds *dumbSet) GetRangeInfo(
187-
ctx context.Context,
188-
x, y types.KeyBytes,
189-
count int,
190-
) (RangeInfo, error) {
191-
ri, _, err := ds.getRangeInfo(ctx, x, y, count)
189+
func (ds *dumbSet) GetRangeInfo(x, y types.KeyBytes, count int) (RangeInfo, error) {
190+
ri, _, err := ds.getRangeInfo(x, y, count)
192191
return ri, err
193192
}
194193

195-
func (ds *dumbSet) SplitRange(
196-
ctx context.Context,
197-
x, y types.KeyBytes,
198-
count int,
199-
) (SplitInfo, error) {
194+
func (ds *dumbSet) SplitRange(x, y types.KeyBytes, count int) (SplitInfo, error) {
200195
if count <= 0 {
201196
panic("BUG: bad split count")
202197
}
203-
part0, middle, err := ds.getRangeInfo(ctx, x, y, count)
198+
part0, middle, err := ds.getRangeInfo(x, y, count)
204199
if err != nil {
205200
return SplitInfo{}, err
206201
}
207202
if part0.Count == 0 {
208203
return SplitInfo{}, errors.New("can't split empty range")
209204
}
210-
part1, err := ds.GetRangeInfo(ctx, middle, y, -1)
205+
part1, err := ds.GetRangeInfo(middle, y, -1)
211206
if err != nil {
212207
return SplitInfo{}, err
213208
}
@@ -217,23 +212,23 @@ func (ds *dumbSet) SplitRange(
217212
}, nil
218213
}
219214

220-
func (ds *dumbSet) Empty(ctx context.Context) (bool, error) {
215+
func (ds *dumbSet) Empty() (bool, error) {
221216
return len(ds.keys) == 0, nil
222217
}
223218

224-
func (ds *dumbSet) Items(ctx context.Context) (types.Seq, error) {
219+
func (ds *dumbSet) Items() types.SeqResult {
225220
if len(ds.keys) == 0 {
226-
return types.EmptySeq(), nil
221+
return types.EmptySeqResult()
227222
}
228-
return ds.seq(0), nil
223+
return ds.seq(0)
229224
}
230225

231-
func (ds *dumbSet) Copy() OrderedSet {
226+
func (ds *dumbSet) Copy(syncScope bool) OrderedSet {
232227
return &dumbSet{keys: slices.Clone(ds.keys)}
233228
}
234229

235-
func (ds *dumbSet) Recent(ctx context.Context, since time.Time) (types.Seq, int, error) {
236-
return nil, 0, nil
230+
func (ds *dumbSet) Recent(since time.Time) (types.SeqResult, int) {
231+
return types.EmptySeqResult(), 0
237232
}
238233

239234
var hashPool = &sync.Pool{
@@ -267,7 +262,7 @@ type deferredAddSet struct {
267262
added map[string]struct{}
268263
}
269264

270-
func (das *deferredAddSet) Receive(ctx context.Context, id types.KeyBytes) error {
265+
func (das *deferredAddSet) Receive(id types.KeyBytes) error {
271266
if das.added == nil {
272267
das.added = make(map[string]struct{})
273268
}
@@ -276,9 +271,8 @@ func (das *deferredAddSet) Receive(ctx context.Context, id types.KeyBytes) error
276271
}
277272

278273
func (das *deferredAddSet) addAll() error {
279-
ctx := context.Background()
280274
for k := range das.added {
281-
if err := das.OrderedSet.Receive(ctx, types.KeyBytes(k)); err != nil {
275+
if err := das.OrderedSet.Receive(types.KeyBytes(k)); err != nil {
282276
return err
283277
}
284278
}

sync2/rangesync/helpers.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,20 @@
11
package rangesync
22

33
import (
4-
"context"
5-
64
"github.com/spacemeshos/go-spacemesh/sync2/types"
75
)
86

97
// CollectSetItems returns the list of items in the given set.
10-
func CollectSetItems(ctx context.Context, os OrderedSet) (r []types.KeyBytes, err error) {
11-
items, err := os.Items(ctx)
12-
if err != nil {
13-
return nil, err
14-
}
8+
func CollectSetItems(os OrderedSet) (r []types.KeyBytes, err error) {
9+
items := os.Items()
1510
var first types.KeyBytes
16-
for v, err := range items {
17-
if err != nil {
18-
return nil, err
19-
}
11+
for v := range items.Seq {
2012
if first == nil {
2113
first = v
2214
} else if v.Compare(first) == 0 {
2315
break
2416
}
2517
r = append(r, v)
2618
}
27-
return r, nil
19+
return r, items.Error()
2820
}

sync2/rangesync/interface.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type RangeInfo struct {
1717
// Number of items in the interval
1818
Count int
1919
// Items is the sequence of set elements in the interval.
20-
Items types.Seq
20+
Items types.SeqResult
2121
}
2222

2323
// SplitInfo contains information about range split in two.
@@ -33,7 +33,7 @@ type OrderedSet interface {
3333
// Receive handles a new key received from the peer.
3434
// It may or may not add it to the set immediately; this doesn't affect set
3535
// reconciliation operation.
36-
Receive(ctx context.Context, k types.KeyBytes) error
36+
Receive(k types.KeyBytes) error
3737
// GetRangeInfo returns RangeInfo for the item range in the ordered set,
3838
// bounded by [x, y).
3939
// x == y indicates the whole set.
@@ -44,20 +44,24 @@ type OrderedSet interface {
4444
// is returned for the corresponding subrange of the requested range.
4545
// If both x and y are nil, the information for the entire set is returned.
4646
// If any of x or y is nil, the other one must be nil as well.
47-
GetRangeInfo(ctx context.Context, x, y types.KeyBytes, count int) (RangeInfo, error)
47+
GetRangeInfo(x, y types.KeyBytes, count int) (RangeInfo, error)
4848
// SplitRange splits the range roughly after the specified count of items,
4949
// returning RangeInfo for the first half and the second half of the range.
50-
SplitRange(ctx context.Context, x, y types.KeyBytes, count int) (SplitInfo, error)
50+
SplitRange(x, y types.KeyBytes, count int) (SplitInfo, error)
5151
// Items returns the sequence of items in the set.
52-
Items(ctx context.Context) (types.Seq, error)
52+
Items() types.SeqResult
5353
// Empty returns true if the set is empty.
54-
Empty(ctx context.Context) (bool, error)
54+
Empty() (bool, error)
5555
// Copy makes a shallow copy of the OrderedSet.
56-
Copy() OrderedSet
56+
// syncScope argument is a hint that can be used to optimize resource usage.
57+
// If syncScope is true, then the copy is intended to be used for the duration of
58+
// a synchronization run.
59+
// If syncScope if false, then the lifetime of the copy is not clearly defined.
60+
Copy(syncScope bool) OrderedSet
5761
// Recent returns an Iterator that yields the items added since the specified
5862
// timestamp. Some OrderedSet implementations may not have Recent implemented, in
59-
// which case it should return an error.
60-
Recent(ctx context.Context, since time.Time) (types.Seq, int, error)
63+
// which case it should return an empty sequence.
64+
Recent(since time.Time) (types.SeqResult, int)
6165
}
6266

6367
type Requester interface {

sync2/rangesync/log.go

Lines changed: 0 additions & 75 deletions
This file was deleted.

sync2/rangesync/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ func (s sender) sendSample(
153153
x, y types.KeyBytes,
154154
fp types.Fingerprint,
155155
count, sampleSize int,
156-
seq types.Seq,
156+
sr types.SeqResult,
157157
) error {
158-
items, err := Sample(seq, count, sampleSize)
158+
items, err := Sample(sr, count, sampleSize)
159159
if err != nil {
160160
return err
161161
}

sync2/rangesync/minhash.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,19 @@ func MinhashSampleItemFromKeyBytes(h types.KeyBytes) MinhashSampleItem {
4444

4545
// Sample retrieves min(count, sampleSize) items friom the ordered sequence, extracting
4646
// MinhashSampleItem from each value.
47-
func Sample(seq types.Seq, count, sampleSize int) ([]MinhashSampleItem, error) {
47+
func Sample(sr types.SeqResult, count, sampleSize int) ([]MinhashSampleItem, error) {
4848
sampleSize = min(count, sampleSize)
4949
if sampleSize == 0 {
5050
return nil, nil
5151
}
5252
items := make([]MinhashSampleItem, 0, sampleSize)
53-
for k, err := range seq {
54-
if err != nil {
55-
return nil, err
56-
}
53+
for k := range sr.Seq {
5754
items = append(items, MinhashSampleItemFromKeyBytes(k))
5855
if len(items) == sampleSize {
5956
break
6057
}
6158
}
62-
return items, nil
59+
return items, sr.Error()
6360
}
6461

6562
// CalcSim estimates the Jaccard similarity coefficient between two sets based on the

sync2/rangesync/minhash_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,25 @@ import (
1111

1212
var errBadItem = errors.New("bad item")
1313

14-
func mkFakeSeq(items []string) types.Seq {
15-
return func(yield func(types.KeyBytes, error) bool) {
16-
for {
17-
for _, item := range items {
18-
if item == "ERROR" {
19-
yield(nil, errBadItem)
20-
return
21-
}
22-
if !yield(types.KeyBytes(item), nil) {
23-
return
14+
func mkFakeSeq(items []string) types.SeqResult {
15+
var err error
16+
return types.SeqResult{
17+
Seq: func(yield func(types.KeyBytes) bool) {
18+
for {
19+
for _, item := range items {
20+
if item == "ERROR" {
21+
err = errBadItem
22+
return
23+
}
24+
if !yield(types.KeyBytes(item)) {
25+
return
26+
}
2427
}
2528
}
26-
}
29+
},
30+
Error: func() error {
31+
return err
32+
},
2733
}
2834
}
2935

0 commit comments

Comments
 (0)