Skip to content

Commit 4e83c23

Browse files
committed
query: introduce maxJobs
Workers will now accept multiple jobs up to the maxJobs.
1 parent 197df14 commit 4e83c23

File tree

4 files changed

+39
-24
lines changed

4 files changed

+39
-24
lines changed

query/worker.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"errors"
5+
"sync/atomic"
56
"time"
67

78
"github.com/btcsuite/btcd/wire"
@@ -58,6 +59,10 @@ type jobResult struct {
5859
type worker struct {
5960
peer Peer
6061

62+
// quit indicates that the worker has already quit and is not accepting
63+
// any more jobs.
64+
quit int32
65+
6166
// nextJob is a channel of queries to be distributed, where the worker
6267
// will poll new work from.
6368
nextJob chan *queryJob
@@ -70,7 +75,7 @@ var _ Worker = (*worker)(nil)
7075
func NewWorker(peer Peer) Worker {
7176
return &worker{
7277
peer: peer,
73-
nextJob: make(chan *queryJob),
78+
nextJob: make(chan *queryJob, maxJobs),
7479
}
7580
}
7681

@@ -88,7 +93,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
8893

8994
// Subscribe to messages from the peer.
9095
msgChan, cancel := peer.SubscribeRecvMsg()
91-
defer cancel()
96+
defer func() {
97+
atomic.AddInt32(&w.quit, 1)
98+
cancel()
99+
}()
92100

93101
for {
94102
log.Tracef("Worker %v waiting for more work", peer.Addr())
@@ -266,5 +274,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
266274
//
267275
// NOTE: Part of the Worker interface.
268276
func (w *worker) NewJob() chan<- *queryJob {
277+
// The worker has already quit so don't return the nextJob channel.
278+
if atomic.LoadInt32(&w.quit) != 0 {
279+
return nil
280+
}
281+
269282
return w.nextJob
270283
}

query/worker_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func makeJob() *queryJob {
8686
}
8787

8888
type testCtx struct {
89-
nextJob chan<- *queryJob
89+
worker Worker
9090
jobResults chan *jobResult
9191
peer *mockPeer
9292
workerDone chan struct{}
@@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
101101
subscriptions: make(chan chan wire.Message),
102102
quit: make(chan struct{}),
103103
}
104-
results := make(chan *jobResult)
104+
results := make(chan *jobResult, maxJobs)
105105
quit := make(chan struct{})
106106

107107
wk := NewWorker(peer)
@@ -123,7 +123,7 @@ func startWorker() (*testCtx, error) {
123123
peer.responses = sub
124124

125125
return &testCtx{
126-
nextJob: wk.NewJob(),
126+
worker: wk,
127127
jobResults: results,
128128
peer: peer,
129129
workerDone: done,
@@ -144,7 +144,7 @@ func TestWorkerIgnoreMsgs(t *testing.T) {
144144
task := makeJob()
145145

146146
select {
147-
case ctx.nextJob <- task:
147+
case ctx.worker.NewJob() <- task:
148148
case <-time.After(1 * time.Second):
149149
t.Fatalf("did not pick up job")
150150
}
@@ -215,7 +215,7 @@ func TestWorkerTimeout(t *testing.T) {
215215

216216
// Give the worker the new job.
217217
select {
218-
case ctx.nextJob <- task:
218+
case ctx.worker.NewJob() <- task:
219219
case <-time.After(1 * time.Second):
220220
t.Fatalf("did not pick up job")
221221
}
@@ -253,7 +253,7 @@ func TestWorkerTimeout(t *testing.T) {
253253

254254
// It will immediately attempt to fetch another task.
255255
select {
256-
case ctx.nextJob <- task:
256+
case ctx.worker.NewJob() <- task:
257257
case <-time.After(1 * time.Second):
258258
t.Fatalf("did not pick up job")
259259
}
@@ -272,7 +272,7 @@ func TestWorkerDisconnect(t *testing.T) {
272272
// Give the worker a new job.
273273
task := makeJob()
274274
select {
275-
case ctx.nextJob <- task:
275+
case ctx.worker.NewJob() <- task:
276276
case <-time.After(1 * time.Second):
277277
t.Fatalf("did not pick up job")
278278
}
@@ -312,7 +312,7 @@ func TestWorkerDisconnect(t *testing.T) {
312312

313313
// No more jobs should be accepted by the worker after it has exited.
314314
select {
315-
case ctx.nextJob <- task:
315+
case ctx.worker.NewJob() <- task:
316316
t.Fatalf("exited worker did pick up job")
317317
default:
318318
}
@@ -342,7 +342,7 @@ func TestWorkerProgress(t *testing.T) {
342342
task.timeout = taskTimeout
343343

344344
select {
345-
case ctx.nextJob <- task:
345+
case ctx.worker.NewJob() <- task:
346346
case <-time.After(1 * time.Second):
347347
t.Fatalf("did not pick up job")
348348
}
@@ -421,7 +421,7 @@ func TestWorkerJobCanceled(t *testing.T) {
421421
canceled := false
422422
for i := 0; i < 2; i++ {
423423
select {
424-
case ctx.nextJob <- task:
424+
case ctx.worker.NewJob() <- task:
425425
case <-time.After(1 * time.Second):
426426
t.Fatalf("did not pick up job")
427427
}

query/workmanager.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const (
1515

1616
// maxQueryTimeout is the maximum timeout given to a single query.
1717
maxQueryTimeout = 32 * time.Second
18+
19+
// maxJobs is the maximum amount of jobs a single worker can have.
20+
maxJobs = 10
1821
)
1922

2023
var (
@@ -74,11 +77,10 @@ type PeerRanking interface {
7477

7578
// activeWorker wraps a Worker that is currently running, together with the job
7679
// we have given to it.
77-
// TODO(halseth): support more than one active job at a time.
7880
type activeWorker struct {
79-
w Worker
81+
w Worker
8082
activeJobs map[uint64]*queryJob
81-
onExit chan struct{}
83+
onExit chan struct{}
8284
}
8385

8486
// Config holds the configuration options for a new WorkManager.
@@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil)
126128
func NewWorkManager(cfg *Config) WorkManager {
127129
return &peerWorkManager{
128130
cfg: cfg,
129-
newBatches: make(chan *batch),
130-
jobResults: make(chan *jobResult),
131+
newBatches: make(chan *batch, maxJobs),
132+
jobResults: make(chan *jobResult, maxJobs),
131133
quit: make(chan struct{}),
132134
}
133135
}
@@ -220,7 +222,7 @@ Loop:
220222
for p, r := range workers {
221223
// Only one active job at a time is currently
222224
// supported.
223-
if len(r.activeJobs) >= 1 {
225+
if len(r.activeJobs) >= maxJobs {
224226
continue
225227
}
226228

query/workmanager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
8484
NewWorker: func(peer Peer) Worker {
8585
m := &mockWorker{
8686
peer: peer,
87-
nextJob: make(chan *queryJob),
88-
results: make(chan *jobResult),
87+
nextJob: make(chan *queryJob, maxJobs),
88+
results: make(chan *jobResult, maxJobs),
8989
}
9090
workerChan <- m
9191
return m
@@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
205205
for i := 0; i < numQueries; i++ {
206206
q := &Request{}
207207
queries[i] = q
208-
scheduledJobs[i] = make(chan sched)
208+
scheduledJobs[i] = make(chan sched, maxJobs)
209209
}
210210

211211
// For each worker, spin up a goroutine that will forward the job it
@@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
387387
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
388388
// jobs among workers according to the peer ranking.
389389
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
390-
const numQueries = 4
390+
const numQueries = 40
391391
const numWorkers = 8
392392

393393
workMgr, workers := startWorkManager(t, numWorkers)
@@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
414414
var jobs []*queryJob
415415
for i := 0; i < numQueries; i++ {
416416
select {
417-
case job := <-workers[i].nextJob:
417+
case job := <-workers[i/10].nextJob:
418418
if job.index != uint64(i) {
419419
t.Fatalf("unexpected job")
420420
}
@@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
449449
// Go backwards, and succeed the queries.
450450
for i := numQueries - 1; i >= 0; i-- {
451451
select {
452-
case workers[i].results <- &jobResult{
452+
case workers[i/10].results <- &jobResult{
453453
job: jobs[i],
454454
err: nil,
455455
}:

0 commit comments

Comments
 (0)