Skip to content

Commit d92ea2d

Browse files
committed
query: support more than 1 job per worker
A worker is now able to be queried more than 1 job at a time with maxJobs constant introduced to limit the job per peer.
1 parent d4563a2 commit d92ea2d

File tree

4 files changed

+39
-22
lines changed

4 files changed

+39
-22
lines changed

query/worker.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"errors"
5+
"sync/atomic"
56
"time"
67

78
"github.com/btcsuite/btcd/wire"
@@ -58,6 +59,10 @@ type jobResult struct {
5859
type worker struct {
5960
peer Peer
6061

62+
// quit indicates that the worker has already quit and is not accepting
63+
// any more jobs.
64+
quit int32
65+
6166
// nextJob is a channel of queries to be distributed, where the worker
6267
// will poll new work from.
6368
nextJob chan *queryJob
@@ -70,7 +75,7 @@ var _ Worker = (*worker)(nil)
7075
func NewWorker(peer Peer) Worker {
7176
return &worker{
7277
peer: peer,
73-
nextJob: make(chan *queryJob),
78+
nextJob: make(chan *queryJob, maxJobs),
7479
}
7580
}
7681

@@ -88,7 +93,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
8893

8994
// Subscribe to messages from the peer.
9095
msgChan, cancel := peer.SubscribeRecvMsg()
91-
defer cancel()
96+
defer func() {
97+
atomic.AddInt32(&w.quit, 1)
98+
cancel()
99+
}()
92100

93101
for {
94102
log.Tracef("Worker %v waiting for more work", peer.Addr())
@@ -266,5 +274,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
266274
//
267275
// NOTE: Part of the Worker interface.
268276
func (w *worker) NewJob() chan<- *queryJob {
277+
// The worker has already quit so don't return the nextJob channel.
278+
if atomic.LoadInt32(&w.quit) != 0 {
279+
return nil
280+
}
281+
269282
return w.nextJob
270283
}

query/worker_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func makeJob() *queryJob {
8686
}
8787

8888
type testCtx struct {
89-
nextJob chan<- *queryJob
89+
worker Worker
9090
jobResults chan *jobResult
9191
peer *mockPeer
9292
workerDone chan struct{}
@@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
101101
subscriptions: make(chan chan wire.Message),
102102
quit: make(chan struct{}),
103103
}
104-
results := make(chan *jobResult)
104+
results := make(chan *jobResult, maxJobs)
105105
quit := make(chan struct{})
106106

107107
wk := NewWorker(peer)
@@ -123,7 +123,7 @@ func startWorker() (*testCtx, error) {
123123
peer.responses = sub
124124

125125
return &testCtx{
126-
nextJob: wk.NewJob(),
126+
worker: wk,
127127
jobResults: results,
128128
peer: peer,
129129
workerDone: done,
@@ -144,7 +144,7 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
144144
task := makeJob()
145145

146146
select {
147-
case ctx.nextJob <- task:
147+
case ctx.worker.NewJob() <- task:
148148
case <-time.After(1 * time.Second):
149149
t.Fatalf("did not pick up job")
150150
}
@@ -215,7 +215,7 @@ func TestWorkerTimeout(t *testing.T) {
215215

216216
// Give the worker the new job.
217217
select {
218-
case ctx.nextJob <- task:
218+
case ctx.worker.NewJob() <- task:
219219
case <-time.After(1 * time.Second):
220220
t.Fatalf("did not pick up job")
221221
}
@@ -253,7 +253,7 @@ func TestWorkerTimeout(t *testing.T) {
253253

254254
// It will immediately attempt to fetch another task.
255255
select {
256-
case ctx.nextJob <- task:
256+
case ctx.worker.NewJob() <- task:
257257
case <-time.After(1 * time.Second):
258258
t.Fatalf("did not pick up job")
259259
}
@@ -272,7 +272,7 @@ func TestWorkerDisconnect(t *testing.T) {
272272
// Give the worker a new job.
273273
task := makeJob()
274274
select {
275-
case ctx.nextJob <- task:
275+
case ctx.worker.NewJob() <- task:
276276
case <-time.After(1 * time.Second):
277277
t.Fatalf("did not pick up job")
278278
}
@@ -312,7 +312,7 @@ func TestWorkerDisconnect(t *testing.T) {
312312

313313
// No more jobs should be accepted by the worker after it has exited.
314314
select {
315-
case ctx.nextJob <- task:
315+
case ctx.worker.NewJob() <- task:
316316
t.Fatalf("exited worker did pick up job")
317317
default:
318318
}
@@ -342,7 +342,7 @@ func TestWorkerProgress(t *testing.T) {
342342
task.timeout = taskTimeout
343343

344344
select {
345-
case ctx.nextJob <- task:
345+
case ctx.worker.NewJob() <- task:
346346
case <-time.After(1 * time.Second):
347347
t.Fatalf("did not pick up job")
348348
}
@@ -421,7 +421,7 @@ func TestWorkerJobCanceled(t *testing.T) {
421421
canceled := false
422422
for i := 0; i < 2; i++ {
423423
select {
424-
case ctx.nextJob <- task:
424+
case ctx.worker.NewJob() <- task:
425425
case <-time.After(1 * time.Second):
426426
t.Fatalf("did not pick up job")
427427
}

query/workmanager.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const (
1515

1616
// maxQueryTimeout is the maximum timeout given to a single query.
1717
maxQueryTimeout = 32 * time.Second
18+
19+
// maxJobs is the maximum amount of jobs a single worker can have.
20+
maxJobs = 32
1821
)
1922

2023
var (
@@ -74,7 +77,6 @@ type PeerRanking interface {
7477

7578
// activeWorker wraps a Worker that is currently running, together with the job
7679
// we have given to it.
77-
// TODO(halseth): support more than one active job at a time.
7880
type activeWorker struct {
7981
w Worker
8082
activeJobs map[uint64]*queryJob
@@ -202,8 +204,10 @@ func (w *peerWorkManager) handleJobResult(result *jobResult) {
202204

203205
// Delete the job from the worker's active job, such
204206
// that the slot gets opened for more work.
205-
r := w.workers[result.peer.Addr()]
206-
delete(r.activeJobs, result.job.Index())
207+
r, found := w.workers[result.peer.Addr()]
208+
if found {
209+
delete(r.activeJobs, result.job.Index())
210+
}
207211

208212
// Get the index of this query's batch, and delete it
209213
// from the map of current queries, since we don't have
@@ -438,7 +442,7 @@ Loop:
438442
for p, r := range w.workers {
439443
// Only one active job at a time is currently
440444
// supported.
441-
if len(r.activeJobs) >= 1 {
445+
if len(r.activeJobs) >= maxJobs {
442446
continue
443447
}
444448

query/workmanager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
8484
NewWorker: func(peer Peer) Worker {
8585
m := &mockWorker{
8686
peer: peer,
87-
nextJob: make(chan *queryJob),
88-
results: make(chan *jobResult),
87+
nextJob: make(chan *queryJob, maxJobs),
88+
results: make(chan *jobResult, maxJobs),
8989
}
9090
workerChan <- m
9191
return m
@@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
205205
for i := 0; i < numQueries; i++ {
206206
q := &Request{}
207207
queries[i] = q
208-
scheduledJobs[i] = make(chan sched)
208+
scheduledJobs[i] = make(chan sched, maxJobs)
209209
}
210210

211211
// For each worker, spin up a goroutine that will forward the job it
@@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
387387
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
388388
// jobs among workers according to the peer ranking.
389389
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
390-
const numQueries = 4
390+
const numQueries = 4 * maxJobs
391391
const numWorkers = 8
392392

393393
workMgr, workers := startWorkManager(t, numWorkers)
@@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
414414
var jobs []*queryJob
415415
for i := 0; i < numQueries; i++ {
416416
select {
417-
case job := <-workers[i].nextJob:
417+
case job := <-workers[i/maxJobs].nextJob:
418418
if job.index != uint64(i) {
419419
t.Fatalf("unexpected job")
420420
}
@@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
449449
// Go backwards, and succeed the queries.
450450
for i := numQueries - 1; i >= 0; i-- {
451451
select {
452-
case workers[i].results <- &jobResult{
452+
case workers[i/maxJobs].results <- &jobResult{
453453
job: jobs[i],
454454
err: nil,
455455
}:

0 commit comments

Comments
 (0)