Skip to content

Commit 1e9f0ec

Browse files
authored
Add scheduler (#5)
* Add ticker and scheduler to pool package Add TestAndSet to rmap package Fix bug in pool package that logged incorrect errors on stopping jobs * Add scheduler pool example. Fix issue with job dispatch return status handling. * Document TestAndSet
1 parent a191b49 commit 1e9f0ec

File tree

19 files changed

+1128
-173
lines changed

19 files changed

+1128
-173
lines changed

examples/pool/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# pool Example
2+
3+
This example shows how to use the pool package to create a pool of workers. It has three parts:
4+
5+
1. The `worker` process registers a worker with the node and waits for jobs.
6+
2. The `producer` process starts and stops two jobs. It also notifies the worker handling the second job.
7+
3. The `scheduler` process starts runs a schedule that starts and stops jobs alternately.
8+
9+
## Running the example
10+
11+
To run the example, first make sure Redis is running locally. The `start-redis` script
12+
in the scripts directory of the repository can be used to start Redis:
13+
14+
```bash
15+
$ cd pulse
16+
$ scripts/start-redis
17+
```
18+
19+
Then, in two separate terminals, run the following commands:
20+
21+
```bash
22+
$ source .env
23+
$ go run examples/pool/worker/main.go
24+
```
25+
26+
The above start two workers that wait for jobs. Then, in separate terminals, run
27+
the following commands:
28+
29+
```bash
30+
$ source .env
31+
$ go run examples/pool/producer/main.go
32+
```
33+
34+
The above starts and stops two jobs. The first job is handled by the first worker,
35+
and the second job is handled by the second worker.
36+
37+
Finally in the same terminal used above run the following command:
38+
39+
```bash
40+
$ go run examples/pool/scheduler/main.go
41+
```
42+
43+
The above starts a scheduler that starts and stops jobs alternately. The first job
44+
is handled by the first worker, and the second job is handled by the second worker.
45+
46+
To stop the worker processes simply press `Ctrl-C` in the terminal where they are
47+
running. Note that it takes a few seconds for the workers to stop (as they wait for
48+
the Redis poll to return).
49+
50+
## Adding Verbose Logging
51+
52+
The three processes can be run with verbose logging by passing the `-v` flag:
53+
54+
```bash
55+
$ go run examples/pool/worker/main.go -v
56+
$ go run examples/pool/producer/main.go -v
57+
$ go run examples/pool/scheduler/main.go -v
58+
```
59+
60+
Verbose logging will print logs created by the `pulse` package to the terminal.

examples/pool/producer/main.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"context"
5-
"fmt"
65
"os"
76
"time"
87

@@ -13,48 +12,60 @@ import (
1312
)
1413

1514
func main() {
16-
ctx := log.Context(context.Background(), log.WithDebug())
15+
// Setup Redis connection
1716
rdb := redis.NewClient(&redis.Options{
1817
Addr: "localhost:6379",
1918
Password: os.Getenv("REDIS_PASSWORD"),
2019
})
2120

22-
// Create client for node named "example"
23-
node, err := pool.AddNode(ctx, "example", rdb,
21+
// Setup clue logger.
22+
ctx := log.Context(context.Background())
23+
log.FlushAndDisableBuffering(ctx)
24+
25+
var logger pulse.Logger
26+
if len(os.Args) > 1 && os.Args[1] == "-v" {
27+
logger = pulse.ClueLogger(ctx)
28+
}
29+
30+
// Create client for worker pool "example"
31+
client, err := pool.AddNode(ctx, "example", rdb,
2432
pool.WithClientOnly(),
25-
pool.WithLogger(pulse.ClueLogger(ctx)),
33+
pool.WithLogger(logger),
2634
)
2735
if err != nil {
2836
panic(err)
2937
}
3038

31-
// Cleanup node on exit.
32-
defer func() {
33-
if err := node.Close(ctx); err != nil {
34-
panic(err)
35-
}
36-
}()
37-
3839
// Start 2 jobs
39-
fmt.Println("** Starting job one...")
40-
if err := node.DispatchJob(ctx, "one", nil); err != nil {
40+
log.Infof(ctx, "starting job one")
41+
if err := client.DispatchJob(ctx, "alpha", nil); err != nil {
4142
panic(err)
4243
}
43-
fmt.Println("** Starting job two...")
44-
if err := node.DispatchJob(ctx, "two", nil); err != nil {
44+
log.Infof(ctx, "starting job two")
45+
if err := client.DispatchJob(ctx, "beta", nil); err != nil {
4546
panic(err)
4647
}
4748
time.Sleep(200 * time.Millisecond) // emulate delay
48-
fmt.Println("Stopping job one...")
49-
if err := node.StopJob(ctx, "one"); err != nil {
49+
50+
// Stop job one
51+
log.Infof(ctx, "stopping job one")
52+
if err := client.StopJob(ctx, "alpha"); err != nil {
5053
panic(err)
5154
}
52-
fmt.Println("Notifying worker for job two...")
53-
if err := node.NotifyWorker(ctx, "two", []byte("hello")); err != nil {
55+
56+
// Notify and then stop job two
57+
log.Infof(ctx, "notifying job two worker")
58+
if err := client.NotifyWorker(ctx, "beta", []byte("hello")); err != nil {
59+
panic(err)
60+
}
61+
log.Infof(ctx, "stopping job two")
62+
if err := client.StopJob(ctx, "beta"); err != nil {
5463
panic(err)
5564
}
56-
fmt.Println("Stopping job two...")
57-
if err := node.StopJob(ctx, "two"); err != nil {
65+
66+
// Cleanup client on exit.
67+
log.Infof(ctx, "done")
68+
if err := client.Close(ctx); err != nil {
5869
panic(err)
5970
}
6071
}

examples/pool/scheduler/main.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"time"
7+
8+
redis "github.com/redis/go-redis/v9"
9+
"goa.design/clue/log"
10+
"goa.design/pulse/pool"
11+
"goa.design/pulse/pulse"
12+
)
13+
14+
func main() {
15+
// Setup Redis connection
16+
rdb := redis.NewClient(&redis.Options{
17+
Addr: "localhost:6379",
18+
Password: os.Getenv("REDIS_PASSWORD"),
19+
})
20+
21+
// Setup clue logger.
22+
ctx := log.Context(context.Background())
23+
log.FlushAndDisableBuffering(ctx)
24+
25+
var logger pulse.Logger
26+
if len(os.Args) > 1 && os.Args[1] == "-v" {
27+
logger = pulse.ClueLogger(ctx)
28+
}
29+
30+
// Create node for pool "example".
31+
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
32+
if err != nil {
33+
panic(err)
34+
}
35+
36+
// Start schedule
37+
log.Infof(ctx, "Starting schedule... CTRL+C to stop.")
38+
done := make(chan struct{})
39+
if err := node.Schedule(ctx, newProducer(ctx, done), time.Second); err != nil {
40+
panic(err)
41+
}
42+
43+
// Wait for producer to be done
44+
<-done
45+
46+
// Cleanup node on exit.
47+
if err := node.Close(ctx); err != nil {
48+
panic(err)
49+
}
50+
}
51+
52+
// producer is a job producer that alternatively starts and stops a job.
53+
// It closes the done channel when it is done.
54+
type producer struct {
55+
iter int
56+
done chan struct{}
57+
logctx context.Context
58+
}
59+
60+
func newProducer(ctx context.Context, done chan struct{}) *producer {
61+
return &producer{done: done, logctx: ctx}
62+
}
63+
64+
func (p *producer) Name() string {
65+
return "example"
66+
}
67+
68+
// Plan is called by the scheduler to determine the next job to start or stop.
69+
func (p *producer) Plan() (*pool.JobPlan, error) {
70+
p.iter++
71+
if p.iter > 10 {
72+
log.Infof(p.logctx, "done")
73+
close(p.done)
74+
return nil, pool.ErrScheduleStop
75+
}
76+
if p.iter%2 == 0 {
77+
log.Infof(p.logctx, "stop all")
78+
return &pool.JobPlan{StopAll: true}, nil
79+
}
80+
log.Infof(p.logctx, "start job")
81+
return &pool.JobPlan{Start: []*pool.JobParam{{Key: "job", Payload: []byte("payload")}}}, nil
82+
}

examples/pool/worker/main.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"os/signal"
78
"sync"
89
"time"
910

@@ -22,8 +23,8 @@ type (
2223
executions map[string]*Execution
2324
// node is the node the worker is registered with.
2425
node *pool.Node
25-
// done is closed when the worker is stopped.
26-
done chan struct{}
26+
// logctx is the logger context.
27+
logctx context.Context
2728
}
2829

2930
// Execution represents a single execution.
@@ -34,31 +35,42 @@ type (
3435
)
3536

3637
func main() {
37-
ctx := log.Context(context.Background(), log.WithDebug())
38+
// Setup Redis connection
3839
rdb := redis.NewClient(&redis.Options{
3940
Addr: "localhost:6379",
4041
Password: os.Getenv("REDIS_PASSWORD"),
4142
})
4243

44+
// Setup clue logger.
45+
ctx := log.Context(context.Background())
46+
log.FlushAndDisableBuffering(ctx)
47+
48+
var logger pulse.Logger
49+
if len(os.Args) > 1 && os.Args[1] == "-v" {
50+
logger = pulse.ClueLogger(ctx)
51+
}
52+
4353
// Create node for pool "example".
44-
node, err := pool.AddNode(ctx,
45-
"example",
46-
rdb,
47-
pool.WithLogger(pulse.ClueLogger(ctx)))
54+
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
4855
if err != nil {
4956
panic(err)
5057
}
5158

5259
// Create a new worker for pool "example".
53-
c := make(chan struct{})
54-
handler := &JobHandler{executions: make(map[string]*Execution), node: node, done: c}
60+
handler := &JobHandler{executions: make(map[string]*Execution), node: node, logctx: ctx}
5561
if _, err := node.AddWorker(ctx, handler); err != nil {
5662
panic(err)
5763
}
5864

5965
// Wait for jobs to complete.
60-
fmt.Println("Waiting for jobs...")
61-
<-c
66+
log.Infof(ctx, "Waiting for jobs... CTRL+C to stop.")
67+
68+
// Close done channel on CTRL-C.
69+
sigc := make(chan os.Signal, 1)
70+
signal.Notify(sigc, os.Interrupt)
71+
<-sigc
72+
close(sigc)
73+
6274
if err := node.Shutdown(ctx); err != nil {
6375
panic(err)
6476
}
@@ -70,7 +82,7 @@ func (w *JobHandler) Start(job *pool.Job) error {
7082
defer w.lock.Unlock()
7183
exec := &Execution{c: make(chan struct{})}
7284
w.executions[job.Key] = exec
73-
go exec.Start(job)
85+
go exec.Start(w.logctx, job)
7486
return nil
7587
}
7688

@@ -84,31 +96,29 @@ func (w *JobHandler) Stop(key string) error {
8496
}
8597
close(exec.c)
8698
delete(w.executions, key)
87-
if key == "two" {
88-
close(w.done)
89-
}
9099
return nil
91100
}
92101

93102
// Print notification.
94103
func (w *JobHandler) HandleNotification(key string, payload []byte) error {
95-
fmt.Printf(">> Notification: %s\n", string(payload))
104+
log.Info(w.logctx, log.Fields{"msg": "notification", "key": key, "payload": string(payload)})
96105
return nil
97106
}
98107

99108
// Start execution.
100-
func (c *Execution) Start(job *pool.Job) {
101-
defer fmt.Printf("Worker %s, Job %s, Done\n", job.Worker.ID, job.Key)
109+
func (c *Execution) Start(ctx context.Context, job *pool.Job) {
110+
log.Info(ctx, log.Fields{"msg": "job started", "worker-id": job.Worker.ID, "job": job.Key})
111+
defer log.Info(ctx, log.Fields{"msg": "job done", "worker-id": job.Worker.ID, "job": job.Key})
102112
i := 1
103-
ticker := time.NewTicker(500 * time.Millisecond)
113+
ticker := time.NewTicker(200 * time.Millisecond)
104114
defer ticker.Stop()
105115
for {
106116
select {
107117
case <-c.c:
108118
return
109119
case <-ticker.C:
110120
i++
111-
fmt.Printf(">> Worker %s, Job %s, Iteration %d\n", job.Worker.ID, job.Key, i)
121+
log.Info(ctx, log.Fields{"msg": "tick", "worker-id": job.Worker.ID, "job": job.Key, "i": i})
112122
}
113123
}
114124
}

pool/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,16 @@ The `StopJob` method is used to stop a job. It takes a job key as input and
160160
returns an error if the job could not be stopped. This can happen if the job key
161161
is invalid, the node is closed or the pool shutdown.
162162

163+
## Scheduling
164+
165+
The `Schedule` method of the `Node` struct can be used to schedule jobs to be
166+
dispatched or stopped on a recurring basis. The method takes as input a job
167+
producer and invokes it at the specified interval. The job producer returns
168+
a list of jobs to be started and stopped.
169+
170+
`Schedule` makes it possible to maintain a pool of jobs for example in a
171+
multi-tenant system. See the [examples](../examples/pool) for more details.
172+
163173
## Data Flows
164174

165175
The following sections provide additional details on the internal data flows

0 commit comments

Comments
 (0)