Skip to content

Commit 3ac2bce

Browse files
committed
fix: ensure tipsetwork starting and stopping work
1 parent 186d605 commit 3ac2bce

File tree

9 files changed

+76
-111
lines changed

9 files changed

+76
-111
lines changed

.config.toml.swp

4 KB
Binary file not shown.

.docker-compose.yml.swp

12 KB
Binary file not shown.
Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
package distributed
22

33
import (
4-
"context"
5-
"encoding/json"
64
"fmt"
75
"os"
86

9-
logging "github.com/ipfs/go-log/v2"
10-
"go.uber.org/atomic"
11-
127
"github.com/hibiken/asynq"
13-
"go.opentelemetry.io/otel/trace"
8+
logging "github.com/ipfs/go-log/v2"
149

15-
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
1610
"github.com/filecoin-project/lily/config"
1711
)
1812

@@ -30,7 +24,7 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
3024
if _, exists := c.servers[name]; exists {
3125
return nil, fmt.Errorf("duplicate queue name: %q", name)
3226
}
33-
log.Infow("registering worker queue config", "name", name, "type", "redis")
27+
log.Infow("registering worker queue config", "name", name, "type", "redis", "addr", sc.RedisConfig.Addr)
3428

3529
// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
3630
// TODO use github.com/kelseyhightower/envconfig
@@ -42,34 +36,29 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
4236
}
4337

4438
c.servers[name] = &TipSetWorker{
45-
server: asynq.NewServer(
46-
asynq.RedisClientOpt{
47-
Network: sc.RedisConfig.Network,
48-
Addr: sc.RedisConfig.Addr,
49-
Username: sc.RedisConfig.Username,
50-
Password: queuePassword,
51-
DB: sc.RedisConfig.DB,
52-
PoolSize: sc.RedisConfig.PoolSize,
53-
},
54-
asynq.Config{
55-
LogLevel: sc.WorkerConfig.LogLevel(),
56-
Queues: sc.WorkerConfig.Queues(),
57-
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
58-
Concurrency: sc.WorkerConfig.Concurrency,
59-
StrictPriority: sc.WorkerConfig.StrictPriority,
60-
Logger: log.With("worker", name),
61-
ErrorHandler: &QueueErrorHandler{},
62-
},
63-
),
64-
running: atomic.NewBool(false),
39+
RedisConfig: asynq.RedisClientOpt{
40+
Network: sc.RedisConfig.Network,
41+
Addr: sc.RedisConfig.Addr,
42+
Username: sc.RedisConfig.Username,
43+
Password: queuePassword,
44+
DB: sc.RedisConfig.DB,
45+
PoolSize: sc.RedisConfig.PoolSize,
46+
},
47+
ServerConfig: asynq.Config{
48+
LogLevel: sc.WorkerConfig.LogLevel(),
49+
Queues: sc.WorkerConfig.Queues(),
50+
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
51+
Concurrency: sc.WorkerConfig.Concurrency,
52+
StrictPriority: sc.WorkerConfig.StrictPriority,
53+
},
6554
}
6655
}
6756

6857
for name, cc := range cfg.Notifiers {
6958
if _, exists := c.servers[name]; exists {
7059
return nil, fmt.Errorf("duplicate queue name: %q", name)
7160
}
72-
log.Infow("registering notifier queue config", "name", name, "type", "redis")
61+
log.Infow("registering notifier queue config", "name", name, "type", "redis", "addr", cc.Addr)
7362

7463
// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
7564
// TODO use github.com/kelseyhightower/envconfig
@@ -95,24 +84,8 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
9584
}
9685

9786
type TipSetWorker struct {
98-
server *asynq.Server
99-
running *atomic.Bool
100-
}
101-
102-
func (w *TipSetWorker) Running() bool {
103-
return w.running.Load()
104-
}
105-
106-
func (w *TipSetWorker) Run(mux *asynq.ServeMux) error {
107-
if w.running.Load() {
108-
return fmt.Errorf("server already running")
109-
}
110-
w.running.Swap(true)
111-
return w.server.Run(mux)
112-
}
113-
114-
func (w *TipSetWorker) Shutdown() {
115-
w.server.Shutdown()
87+
RedisConfig asynq.RedisClientOpt
88+
ServerConfig asynq.Config
11689
}
11790

11891
// Catalog contains a map of workers and clients
@@ -149,34 +122,3 @@ func (c *Catalog) Notifier(name string) (*asynq.Client, error) {
149122
}
150123
return client, nil
151124
}
152-
153-
type QueueErrorHandler struct{}
154-
155-
func (w *QueueErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
156-
switch task.Type() {
157-
case tasks.TypeIndexTipSet:
158-
var p tasks.IndexTipSetPayload
159-
if err := json.Unmarshal(task.Payload(), &p); err != nil {
160-
log.Errorw("failed to decode task type (developer error?)", "error", err)
161-
}
162-
if p.HasTraceCarrier() {
163-
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
164-
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
165-
trace.SpanFromContext(ctx).RecordError(err)
166-
}
167-
}
168-
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
169-
case tasks.TypeGapFillTipSet:
170-
var p tasks.GapFillTipSetPayload
171-
if err := json.Unmarshal(task.Payload(), &p); err != nil {
172-
log.Errorw("failed to decode task type (developer error?)", "error", err)
173-
}
174-
if p.HasTraceCarrier() {
175-
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
176-
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
177-
trace.SpanFromContext(ctx).RecordError(err)
178-
}
179-
}
180-
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
181-
}
182-
}

chain/indexer/distributed/queue/worker.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,32 @@ package queue
22

33
import (
44
"context"
5+
"encoding/json"
56

67
"github.com/hibiken/asynq"
8+
logging "github.com/ipfs/go-log/v2"
9+
"go.opentelemetry.io/otel/trace"
710

811
"github.com/filecoin-project/lily/chain/indexer"
912
"github.com/filecoin-project/lily/chain/indexer/distributed"
1013
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
1114
"github.com/filecoin-project/lily/storage"
1215
)
1316

17+
var log = logging.Logger("lily/distributed/worker")
18+
1419
type AsynqWorker struct {
15-
done chan struct{}
20+
done chan struct{}
21+
22+
name string
1623
server *distributed.TipSetWorker
1724
index indexer.Indexer
1825
db *storage.Database
1926
}
2027

21-
func NewAsynqWorker(i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
28+
func NewAsynqWorker(name string, i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
2229
return &AsynqWorker{
30+
name: name,
2331
server: server,
2432
index: i,
2533
db: db,
@@ -34,18 +42,49 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
3442
mux.HandleFunc(tasks.TypeIndexTipSet, tasks.NewIndexHandler(t.index).HandleIndexTipSetTask)
3543
mux.HandleFunc(tasks.TypeGapFillTipSet, tasks.NewGapFillHandler(t.index, t.db).HandleGapFillTipSetTask)
3644

37-
if err := t.server.Run(mux); err != nil {
45+
t.server.ServerConfig.Logger = log.With("name", t.name)
46+
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}
47+
48+
server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig)
49+
if err := server.Start(mux); err != nil {
3850
return err
3951
}
40-
41-
go func() {
42-
<-ctx.Done()
43-
t.server.Shutdown()
44-
}()
45-
52+
<-ctx.Done()
53+
server.Shutdown()
4654
return nil
4755
}
4856

4957
func (t *AsynqWorker) Done() <-chan struct{} {
5058
return t.done
5159
}
60+
61+
type WorkerErrorHandler struct{}
62+
63+
func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
64+
switch task.Type() {
65+
case tasks.TypeIndexTipSet:
66+
var p tasks.IndexTipSetPayload
67+
if err := json.Unmarshal(task.Payload(), &p); err != nil {
68+
log.Errorw("failed to decode task type (developer error?)", "error", err)
69+
}
70+
if p.HasTraceCarrier() {
71+
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
72+
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
73+
trace.SpanFromContext(ctx).RecordError(err)
74+
}
75+
}
76+
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
77+
case tasks.TypeGapFillTipSet:
78+
var p tasks.GapFillTipSetPayload
79+
if err := json.Unmarshal(task.Payload(), &p); err != nil {
80+
log.Errorw("failed to decode task type (developer error?)", "error", err)
81+
}
82+
if p.HasTraceCarrier() {
83+
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
84+
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
85+
trace.SpanFromContext(ctx).RecordError(err)
86+
}
87+
}
88+
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
89+
}
90+
}

commands/job/worker.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,13 @@ import (
1111
)
1212

1313
var tipsetWorkerFlags struct {
14-
queue string
15-
concurrency int
14+
queue string
1615
}
1716

1817
var TipSetWorkerCmd = &cli.Command{
1918
Name: "tipset-worker",
2019
Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing",
2120
Flags: []cli.Flag{
22-
&cli.IntFlag{
23-
Name: "concurrency",
24-
Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.",
25-
Value: 1,
26-
Destination: &tipsetWorkerFlags.concurrency,
27-
},
2821
&cli.StringFlag{
2922
Name: "queue",
3023
Usage: "Name of queue system worker will consume work from.",
@@ -43,9 +36,8 @@ var TipSetWorkerCmd = &cli.Command{
4336
defer closer()
4437

4538
res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{
46-
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
47-
Queue: tipsetWorkerFlags.queue,
48-
Concurrency: tipsetWorkerFlags.concurrency,
39+
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
40+
Queue: tipsetWorkerFlags.queue,
4941
})
5042
if err != nil {
5143
return err

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ require (
6868
github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01
6969
github.com/jedib0t/go-pretty/v6 v6.2.7
7070
go.opentelemetry.io/otel/trace v1.3.0
71-
go.uber.org/atomic v1.9.0
7271
)
7372

7473
require (
@@ -336,6 +335,7 @@ require (
336335
github.com/zondax/ledger-go v0.12.1 // indirect
337336
go.opentelemetry.io/otel/metric v0.25.0 // indirect
338337
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
338+
go.uber.org/atomic v1.9.0 // indirect
339339
go.uber.org/dig v1.12.0 // indirect
340340
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
341341
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect

lens/lily/api.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,6 @@ type LilyTipSetWorkerConfig struct {
143143

144144
// Queue is the name of the queueing system the worker will consume work from.
145145
Queue string
146-
// Concurrency sets the maximum number of concurrent processing of tasks.
147-
// If set to a zero or negative value, NewServer will overwrite the value
148-
// to the number of CPUs usable by the current process.
149-
Concurrency int
150146
}
151147

152148
type LilySurveyConfig struct {

lens/lily/impl.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,6 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
9090
return nil, err
9191
}
9292

93-
if worker.Running() {
94-
return nil, fmt.Errorf("worker %s already running", cfg.Queue)
95-
}
96-
9793
taskAPI, err := datasource.NewDataSource(m)
9894
if err != nil {
9995
return nil, err
@@ -113,11 +109,10 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
113109
Name: cfg.JobConfig.Name,
114110
Type: "tipset-worker",
115111
Params: map[string]string{
116-
"queue": cfg.Queue,
117-
"storage": cfg.JobConfig.Storage,
118-
"concurrency": strconv.Itoa(cfg.Concurrency),
112+
"queue": cfg.Queue,
113+
"storage": cfg.JobConfig.Storage,
119114
},
120-
Job: queue.NewAsynqWorker(im, db, worker),
115+
Job: queue.NewAsynqWorker(cfg.JobConfig.Name, im, db, worker),
121116
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
122117
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
123118
RestartDelay: cfg.JobConfig.RestartDelay,

tasks/ipfs/task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package ipfs

0 commit comments

Comments
 (0)