Skip to content

Commit 9c13852

Browse files
committed
query: introduce maxJobs
Workers will now accept multiple jobs up to the maxJobs.
1 parent 197df14 commit 9c13852

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

query/worker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
101101
subscriptions: make(chan chan wire.Message),
102102
quit: make(chan struct{}),
103103
}
104-
results := make(chan *jobResult)
104+
results := make(chan *jobResult, maxJobs)
105105
quit := make(chan struct{})
106106

107107
wk := NewWorker(peer)

query/workmanager.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const (
1515

1616
// maxQueryTimeout is the maximum timeout given to a single query.
1717
maxQueryTimeout = 32 * time.Second
18+
19+
// maxJobs is the maximum amount of jobs a single worker can have.
20+
maxJobs = 10
1821
)
1922

2023
var (
@@ -74,11 +77,10 @@ type PeerRanking interface {
7477

7578
// activeWorker wraps a Worker that is currently running, together with the job
7679
// we have given to it.
77-
// TODO(halseth): support more than one active job at a time.
7880
type activeWorker struct {
79-
w Worker
81+
w Worker
8082
activeJobs map[uint64]*queryJob
81-
onExit chan struct{}
83+
onExit chan struct{}
8284
}
8385

8486
// Config holds the configuration options for a new WorkManager.
@@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil)
126128
func NewWorkManager(cfg *Config) WorkManager {
127129
return &peerWorkManager{
128130
cfg: cfg,
129-
newBatches: make(chan *batch),
130-
jobResults: make(chan *jobResult),
131+
newBatches: make(chan *batch, maxJobs),
132+
jobResults: make(chan *jobResult, maxJobs),
131133
quit: make(chan struct{}),
132134
}
133135
}
@@ -220,7 +222,7 @@ Loop:
220222
for p, r := range workers {
221223
// Only one active job at a time is currently
222224
// supported.
223-
if len(r.activeJobs) >= 1 {
225+
if len(r.activeJobs) >= maxJobs {
224226
continue
225227
}
226228

query/workmanager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
8484
NewWorker: func(peer Peer) Worker {
8585
m := &mockWorker{
8686
peer: peer,
87-
nextJob: make(chan *queryJob),
88-
results: make(chan *jobResult),
87+
nextJob: make(chan *queryJob, maxJobs),
88+
results: make(chan *jobResult, maxJobs),
8989
}
9090
workerChan <- m
9191
return m
@@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
205205
for i := 0; i < numQueries; i++ {
206206
q := &Request{}
207207
queries[i] = q
208-
scheduledJobs[i] = make(chan sched)
208+
scheduledJobs[i] = make(chan sched, maxJobs)
209209
}
210210

211211
// For each worker, spin up a goroutine that will forward the job it
@@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
387387
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
388388
// jobs among workers according to the peer ranking.
389389
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
390-
const numQueries = 4
390+
const numQueries = 40
391391
const numWorkers = 8
392392

393393
workMgr, workers := startWorkManager(t, numWorkers)
@@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
414414
var jobs []*queryJob
415415
for i := 0; i < numQueries; i++ {
416416
select {
417-
case job := <-workers[i].nextJob:
417+
case job := <-workers[i/10].nextJob:
418418
if job.index != uint64(i) {
419419
t.Fatalf("unexpected job")
420420
}
@@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
449449
// Go backwards, and succeed the queries.
450450
for i := numQueries - 1; i >= 0; i-- {
451451
select {
452-
case workers[i].results <- &jobResult{
452+
case workers[i/10].results <- &jobResult{
453453
job: jobs[i],
454454
err: nil,
455455
}:

0 commit comments

Comments
 (0)