Skip to content

Commit 34a1943

Browse files
committed
hashsync: add minhash probing
1 parent 06f89dc commit 34a1943

File tree

7 files changed

+574
-132
lines changed

7 files changed

+574
-132
lines changed

hashsync/handler.go

Lines changed: 83 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,14 @@ func (c *wireConduit) NextMessage() (SyncMessage, error) {
134134
return nil, err
135135
}
136136
return &m, nil
137-
case MessageTypeQuery:
138-
var m QueryMessage
137+
case MessageTypeProbe:
138+
var m ProbeMessage
139+
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
140+
return nil, err
141+
}
142+
return &m, nil
143+
case MessageTypeProbeResponse:
144+
var m ProbeResponseMessage
139145
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
140146
return nil, err
141147
}
@@ -146,6 +152,7 @@ func (c *wireConduit) NextMessage() (SyncMessage, error) {
146152
}
147153

148154
func (c *wireConduit) send(m sendable) error {
155+
// fmt.Fprintf(os.Stderr, "QQQQQ: send: %s: %#v\n", m.Type(), m)
149156
var stream io.Writer
150157
if c.initReqBuf != nil {
151158
stream = c.initReqBuf
@@ -219,15 +226,46 @@ func (c *wireConduit) SendDone() error {
219226
return c.send(&DoneMessage{})
220227
}
221228

222-
func (c *wireConduit) SendQuery(x, y Ordered) error {
229+
func (c *wireConduit) SendProbe(x, y Ordered, fingerprint any, sampleSize int) error {
230+
m := &ProbeMessage{
231+
RangeFingerprint: fingerprint.(types.Hash12),
232+
SampleSize: uint32(sampleSize),
233+
}
223234
if x == nil && y == nil {
224-
return c.send(&QueryMessage{})
235+
return c.send(m)
225236
} else if x == nil || y == nil {
226-
panic("BUG: SendQuery: bad range: just one of the bounds is nil")
237+
panic("BUG: SendProbe: bad range: just one of the bounds is nil")
227238
}
228239
xh := x.(types.Hash32)
229240
yh := y.(types.Hash32)
230-
return c.send(&QueryMessage{RangeX: &xh, RangeY: &yh})
241+
m.RangeX = &xh
242+
m.RangeY = &yh
243+
return c.send(m)
244+
}
245+
246+
func (c *wireConduit) SendProbeResponse(x, y Ordered, fingerprint any, count, sampleSize int, it Iterator) error {
247+
m := &ProbeResponseMessage{
248+
RangeFingerprint: fingerprint.(types.Hash12),
249+
NumItems: uint32(count),
250+
Sample: make([]MinhashSampleItem, sampleSize),
251+
}
252+
// fmt.Fprintf(os.Stderr, "QQQQQ: begin sending items\n")
253+
for n := 0; n < sampleSize; n++ {
254+
m.Sample[n] = MinhashSampleItemFromHash32(it.Key().(types.Hash32))
255+
// fmt.Fprintf(os.Stderr, "QQQQQ: m.Sample[%d] = %s\n", n, m.Sample[n])
256+
it.Next()
257+
}
258+
// fmt.Fprintf(os.Stderr, "QQQQQ: end sending items\n")
259+
if x == nil && y == nil {
260+
return c.send(m)
261+
} else if x == nil || y == nil {
262+
panic("BUG: SendProbe: bad range: just one of the bounds is nil")
263+
}
264+
xh := x.(types.Hash32)
265+
yh := y.(types.Hash32)
266+
m.RangeX = &xh
267+
m.RangeY = &yh
268+
return c.send(m)
231269
}
232270

233271
func (c *wireConduit) withInitialRequest(toCall func(Conduit) error) ([]byte, error) {
@@ -252,6 +290,11 @@ func (c *wireConduit) handleStream(stream io.ReadWriter, rsr *RangeSetReconciler
252290
}
253291
}
254292

293+
// ShortenKey implements Conduit.
294+
func (c *wireConduit) ShortenKey(k Ordered) Ordered {
295+
return MinhashSampleItemFromHash32(k.(types.Hash32))
296+
}
297+
255298
func MakeServerHandler(is ItemStore, opts ...Option) server.StreamHandler {
256299
return func(ctx context.Context, req []byte, stream io.ReadWriter) error {
257300
c := wireConduit{newValue: is.New}
@@ -299,43 +342,63 @@ func syncStore(ctx context.Context, r requester, peer p2p.Peer, is ItemStore, x,
299342
})
300343
}
301344

302-
func Probe(ctx context.Context, r requester, peer p2p.Peer, opts ...Option) (fp any, count int, err error) {
303-
return boundedProbe(ctx, r, peer, nil, nil, opts)
345+
func Probe(ctx context.Context, r requester, peer p2p.Peer, is ItemStore, opts ...Option) (ProbeResult, error) {
346+
return boundedProbe(ctx, r, peer, is, nil, nil, opts)
304347
}
305348

306-
func BoundedProbe(ctx context.Context, r requester, peer p2p.Peer, x, y types.Hash32, opts ...Option) (fp any, count int, err error) {
307-
return boundedProbe(ctx, r, peer, &x, &y, opts)
349+
func BoundedProbe(
350+
ctx context.Context,
351+
r requester,
352+
peer p2p.Peer,
353+
is ItemStore,
354+
x, y types.Hash32,
355+
opts ...Option,
356+
) (ProbeResult, error) {
357+
return boundedProbe(ctx, r, peer, is, &x, &y, opts)
308358
}
309359

310-
func boundedProbe(ctx context.Context, r requester, peer p2p.Peer, x, y *types.Hash32, opts []Option) (fp any, count int, err error) {
360+
func boundedProbe(
361+
ctx context.Context,
362+
r requester,
363+
peer p2p.Peer,
364+
is ItemStore,
365+
x, y *types.Hash32,
366+
opts []Option,
367+
) (ProbeResult, error) {
368+
var (
369+
err error
370+
initReq []byte
371+
info RangeInfo
372+
pr ProbeResult
373+
)
311374
c := wireConduit{
312375
newValue: func() any { return nil }, // not used
313376
}
314-
rsr := NewRangeSetReconciler(nil, opts...)
315-
// c.rmmePrint = true
316-
var initReq []byte
377+
rsr := NewRangeSetReconciler(is, opts...)
317378
if x == nil {
318379
initReq, err = c.withInitialRequest(func(c Conduit) error {
319-
return rsr.InitiateProbe(c)
380+
info, err = rsr.InitiateProbe(c)
381+
return err
320382
})
321383
} else {
322384
initReq, err = c.withInitialRequest(func(c Conduit) error {
323-
return rsr.InitiateBoundedProbe(c, *x, *y)
385+
info, err = rsr.InitiateBoundedProbe(c, *x, *y)
386+
return err
324387
})
325388
}
326389
if err != nil {
327-
return nil, 0, err
390+
return ProbeResult{}, err
328391
}
329392
err = r.StreamRequest(ctx, peer, initReq, func(ctx context.Context, stream io.ReadWriter) error {
330393
c.stream = stream
331394
var err error
332-
fp, count, err = rsr.HandleProbeResponse(&c)
395+
pr, err = rsr.HandleProbeResponse(&c, info)
333396
return err
334397
})
335398
if err != nil {
336-
return nil, 0, err
399+
return ProbeResult{}, err
337400
}
338-
return fp, count, nil
401+
return pr, nil
339402
}
340403

341404
// TODO: request duration

hashsync/handler_test.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,12 @@ func TestWireConduit(t *testing.T) {
362362
type getRequesterFunc func(name string, handler server.StreamHandler, peers ...requester) (requester, p2p.Peer)
363363

364364
func withClientServer(
365-
storeA, storeB ItemStore,
365+
store ItemStore,
366366
getRequester getRequesterFunc,
367367
opts []Option,
368368
toCall func(ctx context.Context, client requester, srvPeerID p2p.Peer),
369369
) {
370-
srvHandler := MakeServerHandler(storeA, opts...)
370+
srvHandler := MakeServerHandler(store, opts...)
371371
srv, srvPeerID := getRequester("srv", srvHandler)
372372
var eg errgroup.Group
373373
ctx, cancel := context.WithCancel(context.Background())
@@ -383,7 +383,7 @@ func withClientServer(
383383
toCall(ctx, client, srvPeerID)
384384
}
385385

386-
func fakeRequesterGetter(t *testing.T) getRequesterFunc {
386+
func fakeRequesterGetter() getRequesterFunc {
387387
return func(name string, handler server.StreamHandler, peers ...requester) (requester, p2p.Peer) {
388388
pid := p2p.Peer(name)
389389
return newFakeRequester(pid, handler, peers...), pid
@@ -428,7 +428,7 @@ func testWireSync(t *testing.T, getRequester getRequesterFunc) requester {
428428
var client requester
429429
verifyXORSync(t, cfg, func(storeA, storeB ItemStore, numSpecific int, opts []Option) bool {
430430
withClientServer(
431-
storeA, storeB, getRequester, opts,
431+
storeA, getRequester, opts,
432432
func(ctx context.Context, client requester, srvPeerID p2p.Peer) {
433433
err := SyncStore(ctx, client, srvPeerID, storeB, opts...)
434434
require.NoError(t, err)
@@ -445,7 +445,7 @@ func testWireSync(t *testing.T, getRequester getRequesterFunc) requester {
445445

446446
func TestWireSync(t *testing.T) {
447447
t.Run("fake requester", func(t *testing.T) {
448-
testWireSync(t, fakeRequesterGetter(t))
448+
testWireSync(t, fakeRequesterGetter())
449449
})
450450
t.Run("p2p", func(t *testing.T) {
451451
testWireSync(t, p2pRequesterGetter(t))
@@ -455,33 +455,36 @@ func TestWireSync(t *testing.T) {
455455
func testWireProbe(t *testing.T, getRequester getRequesterFunc) requester {
456456
cfg := xorSyncTestConfig{
457457
maxSendRange: 1,
458-
numTestHashes: 32,
459-
minNumSpecificA: 4,
460-
maxNumSpecificA: 4,
461-
minNumSpecificB: 4,
462-
maxNumSpecificB: 4,
458+
numTestHashes: 10000,
459+
minNumSpecificA: 130,
460+
maxNumSpecificA: 130,
461+
minNumSpecificB: 130,
462+
maxNumSpecificB: 130,
463463
}
464464
var client requester
465465
verifyXORSync(t, cfg, func(storeA, storeB ItemStore, numSpecific int, opts []Option) bool {
466466
withClientServer(
467-
storeA, storeB, getRequester, opts,
467+
storeA, getRequester, opts,
468468
func(ctx context.Context, client requester, srvPeerID p2p.Peer) {
469469
minA := storeA.Min().Key()
470470
infoA := storeA.GetRangeInfo(nil, minA, minA, -1)
471-
fpA, countA, err := Probe(ctx, client, srvPeerID, opts...)
471+
prA, err := Probe(ctx, client, srvPeerID, storeB, opts...)
472472
require.NoError(t, err)
473-
require.Equal(t, infoA.Fingerprint, fpA)
474-
require.Equal(t, infoA.Count, countA)
473+
require.Equal(t, infoA.Fingerprint, prA.FP)
474+
require.Equal(t, infoA.Count, prA.Count)
475+
require.InDelta(t, 0.98, prA.Sim, 0.05, "sim")
475476

476477
minA = storeA.Min().Key()
477478
partInfoA := storeA.GetRangeInfo(nil, minA, minA, infoA.Count/2)
478479
x := partInfoA.Start.Key().(types.Hash32)
479480
y := partInfoA.End.Key().(types.Hash32)
480481
// partInfoA = storeA.GetRangeInfo(nil, x, y, -1)
481-
fpA, countA, err = BoundedProbe(ctx, client, srvPeerID, x, y, opts...)
482+
prA, err = BoundedProbe(ctx, client, srvPeerID, storeB, x, y, opts...)
482483
require.NoError(t, err)
483-
require.Equal(t, partInfoA.Fingerprint, fpA)
484-
require.Equal(t, partInfoA.Count, countA)
484+
require.Equal(t, partInfoA.Fingerprint, prA.FP)
485+
require.Equal(t, partInfoA.Count, prA.Count)
486+
require.InDelta(t, 0.98, prA.Sim, 0.1, "sim")
487+
// QQQQQ: TBD: check prA.Sim and prB.Sim values
485488
})
486489
return false
487490
})
@@ -490,11 +493,11 @@ func testWireProbe(t *testing.T, getRequester getRequesterFunc) requester {
490493

491494
func TestWireProbe(t *testing.T) {
492495
t.Run("fake requester", func(t *testing.T) {
493-
testWireProbe(t, fakeRequesterGetter(t))
494-
})
495-
t.Run("p2p", func(t *testing.T) {
496-
testWireProbe(t, p2pRequesterGetter(t))
496+
testWireProbe(t, fakeRequesterGetter())
497497
})
498+
// t.Run("p2p", func(t *testing.T) {
499+
// testWireProbe(t, p2pRequesterGetter(t))
500+
// })
498501
}
499502

500503
// TODO: test bounded sync

0 commit comments

Comments
 (0)