Skip to content

Commit a9c8eb9

Browse files
authored
Dynamic docker labels handling (#319)
1 parent 3dfbb4a commit a9c8eb9

File tree

16 files changed

+557
-170
lines changed

16 files changed

+557
-170
lines changed

cli/config.go

Lines changed: 162 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
11
package cli
22

33
import (
4-
"os"
4+
"fmt"
55

6-
docker "github.com/fsouza/go-dockerclient"
76
"github.com/mcuadros/ofelia/core"
87
"github.com/mcuadros/ofelia/middlewares"
9-
logging "github.com/op/go-logging"
108

119
defaults "github.com/mcuadros/go-defaults"
1210
gcfg "gopkg.in/gcfg.v1"
1311
)
1412

1513
const (
16-
logFormat = "%{time} %{color} %{shortfile} ▶ %{level}%{color:reset} %{message}"
1714
jobExec = "job-exec"
1815
jobRun = "job-run"
1916
jobServiceRun = "job-service-run"
2017
jobLocal = "job-local"
2118
)
2219

23-
var IsDockerEnv bool
24-
2520
// Config contains the configuration
2621
type Config struct {
2722
Global struct {
@@ -33,113 +28,101 @@ type Config struct {
3328
RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"`
3429
ServiceJobs map[string]*RunServiceConfig `gcfg:"job-service-run" mapstructure:"job-service-run,squash"`
3530
LocalJobs map[string]*LocalJobConfig `gcfg:"job-local" mapstructure:"job-local,squash"`
31+
32+
sh *core.Scheduler
33+
dockerHandler *DockerHandler
34+
logger core.Logger
3635
}
3736

38-
// BuildFromDockerLabels builds a scheduler using the config from a docker labels
39-
func BuildFromDockerLabels(filterFlags ...string) (*core.Scheduler, error) {
37+
func NewConfig(logger core.Logger) *Config {
38+
// Initialize
4039
c := &Config{}
41-
42-
d, err := c.buildDockerClient()
43-
if err != nil {
44-
return nil, err
45-
}
46-
47-
labels, err := getLabels(d, filterFlags)
48-
if err != nil {
49-
return nil, err
50-
}
51-
52-
if err := c.buildFromDockerLabels(labels); err != nil {
53-
return nil, err
54-
}
55-
56-
return c.build()
40+
c.ExecJobs = make(map[string]*ExecJobConfig)
41+
c.RunJobs = make(map[string]*RunJobConfig)
42+
c.ServiceJobs = make(map[string]*RunServiceConfig)
43+
c.LocalJobs = make(map[string]*LocalJobConfig)
44+
c.logger = logger
45+
defaults.SetDefaults(c)
46+
return c
5747
}
5848

5949
// BuildFromFile builds a scheduler using the config from a file
60-
func BuildFromFile(filename string) (*core.Scheduler, error) {
61-
c := &Config{}
62-
if err := gcfg.ReadFileInto(c, filename); err != nil {
63-
return nil, err
64-
}
65-
66-
return c.build()
50+
func BuildFromFile(filename string, logger core.Logger) (*Config, error) {
51+
c := NewConfig(logger)
52+
err := gcfg.ReadFileInto(c, filename)
53+
return c, err
6754
}
6855

6956
// BuildFromString builds a scheduler using the config from a string
70-
func BuildFromString(config string) (*core.Scheduler, error) {
71-
c := &Config{}
57+
func BuildFromString(config string, logger core.Logger) (*Config, error) {
58+
c := NewConfig(logger)
7259
if err := gcfg.ReadStringInto(c, config); err != nil {
7360
return nil, err
7461
}
75-
76-
return c.build()
62+
return c, nil
7763
}
7864

79-
func (c *Config) build() (*core.Scheduler, error) {
80-
defaults.SetDefaults(c)
81-
82-
d, err := c.buildDockerClient()
83-
if err != nil {
84-
return nil, err
65+
// Call this only once at app init
66+
func (c *Config) InitializeApp() error {
67+
if c.sh == nil {
68+
return fmt.Errorf("scheduler is not initialized yet")
8569
}
8670

87-
sh := core.NewScheduler(c.buildLogger())
88-
c.buildSchedulerMiddlewares(sh)
71+
if c.dockerHandler.ConfigFromLabelsEnabled() {
72+
// Wait couple seconds for docker to propagate container labels
73+
c.dockerHandler.WaitForLabels()
74+
75+
// In order to support non dynamic job types such as Local or Run using labels
76+
// lets parse the labels and merge the job lists
77+
dockerLabels, err := c.dockerHandler.GetDockerLabels()
78+
if err != nil {
79+
return err
80+
}
81+
82+
if err := c.buildFromDockerLabels(dockerLabels); err != nil {
83+
return err
84+
}
85+
86+
// Initialize middlewares again after reading the labels
87+
c.buildSchedulerMiddlewares(c.sh)
88+
}
8989

9090
for name, j := range c.ExecJobs {
9191
defaults.SetDefaults(j)
92-
93-
j.Client = d
92+
j.Client = c.dockerHandler.GetInternalDockerClient()
9493
j.Name = name
9594
j.buildMiddlewares()
96-
sh.AddJob(j)
95+
c.sh.AddJob(j)
9796
}
9897

9998
for name, j := range c.RunJobs {
10099
defaults.SetDefaults(j)
101-
102-
j.Client = d
100+
j.Client = c.dockerHandler.GetInternalDockerClient()
103101
j.Name = name
104102
j.buildMiddlewares()
105-
sh.AddJob(j)
103+
c.sh.AddJob(j)
106104
}
107105

108106
for name, j := range c.LocalJobs {
109107
defaults.SetDefaults(j)
110-
111108
j.Name = name
112109
j.buildMiddlewares()
113-
sh.AddJob(j)
110+
c.sh.AddJob(j)
114111
}
115112

116113
for name, j := range c.ServiceJobs {
117114
defaults.SetDefaults(j)
118115
j.Name = name
119-
j.Client = d
116+
j.Client = c.dockerHandler.GetInternalDockerClient()
120117
j.buildMiddlewares()
121-
sh.AddJob(j)
122-
}
123-
124-
return sh, nil
125-
}
126-
127-
func (c *Config) buildDockerClient() (*docker.Client, error) {
128-
d, err := docker.NewClientFromEnv()
129-
if err != nil {
130-
return nil, err
118+
c.sh.AddJob(j)
131119
}
132120

133-
return d, nil
121+
return nil
134122
}
135123

136-
func (c *Config) buildLogger() core.Logger {
137-
stdout := logging.NewLogBackend(os.Stdout, "", 0)
138-
// Set the backends to be used.
139-
logging.SetBackend(stdout)
140-
logging.SetFormatter(logging.MustStringFormatter(logFormat))
141-
142-
return logging.MustGetLogger("ofelia")
124+
func (c *Config) JobsCount() int {
125+
return len(c.ExecJobs) + len(c.RunJobs) + len(c.LocalJobs) + len(c.ServiceJobs)
143126
}
144127

145128
func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
@@ -148,6 +131,115 @@ func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
148131
sh.Use(middlewares.NewMail(&c.Global.MailConfig))
149132
}
150133

134+
func (c *Config) dockerLabelsUpdate(labels map[string]map[string]string) {
135+
// Get the current labels
136+
var parsedLabelConfig Config
137+
parsedLabelConfig.buildFromDockerLabels(labels)
138+
139+
// Calculate the delta execJobs
140+
for name, j := range c.ExecJobs {
141+
found := false
142+
for newJobsName, newJob := range parsedLabelConfig.ExecJobs {
143+
// Check if the schedule has changed
144+
if name == newJobsName {
145+
found = true
146+
// There is a slight race condition were a job can be canceled / restarted with different params
147+
// so, lets take care of it by simply restarting
148+
// For the hash to work properly, we must fill the fields before calling it
149+
defaults.SetDefaults(newJob)
150+
newJob.Client = c.dockerHandler.GetInternalDockerClient()
151+
newJob.Name = newJobsName
152+
if newJob.Hash() != j.Hash() {
153+
c.logger.Debugf("Job %s has changed, restarting", name)
154+
// Remove from the scheduler
155+
c.sh.RemoveJob(j)
156+
// Add the job back to the scheduler
157+
newJob.buildMiddlewares()
158+
c.sh.AddJob(newJob)
159+
// Update the job config
160+
c.ExecJobs[name] = newJob
161+
}
162+
break
163+
}
164+
}
165+
if !found {
166+
c.logger.Debugf("Job %s is not found, Removing", name)
167+
// Remove the job
168+
c.sh.RemoveJob(j)
169+
delete(c.ExecJobs, name)
170+
}
171+
}
172+
173+
// Check for aditions
174+
for newJobsName, newJob := range parsedLabelConfig.ExecJobs {
175+
found := false
176+
for name := range c.ExecJobs {
177+
if name == newJobsName {
178+
found = true
179+
break
180+
}
181+
}
182+
if !found {
183+
defaults.SetDefaults(newJob)
184+
newJob.Client = c.dockerHandler.GetInternalDockerClient()
185+
newJob.Name = newJobsName
186+
newJob.buildMiddlewares()
187+
c.sh.AddJob(newJob)
188+
c.ExecJobs[newJobsName] = newJob
189+
}
190+
}
191+
192+
for name, j := range c.RunJobs {
193+
found := false
194+
for newJobsName, newJob := range parsedLabelConfig.RunJobs {
195+
// Check if the schedule has changed
196+
if name == newJobsName {
197+
found = true
198+
// There is a slight race condition were a job can be canceled / restarted with different params
199+
// so, lets take care of it by simply restarting
200+
// For the hash to work properly, we must fill the fields before calling it
201+
defaults.SetDefaults(newJob)
202+
newJob.Client = c.dockerHandler.GetInternalDockerClient()
203+
newJob.Name = newJobsName
204+
if newJob.Hash() != j.Hash() {
205+
// Remove from the scheduler
206+
c.sh.RemoveJob(j)
207+
// Add the job back to the scheduler
208+
newJob.buildMiddlewares()
209+
c.sh.AddJob(newJob)
210+
// Update the job config
211+
c.RunJobs[name] = newJob
212+
}
213+
break
214+
}
215+
}
216+
if !found {
217+
// Remove the job
218+
c.sh.RemoveJob(j)
219+
delete(c.RunJobs, name)
220+
}
221+
}
222+
223+
// Check for aditions
224+
for newJobsName, newJob := range parsedLabelConfig.RunJobs {
225+
found := false
226+
for name := range c.RunJobs {
227+
if name == newJobsName {
228+
found = true
229+
break
230+
}
231+
}
232+
if !found {
233+
defaults.SetDefaults(newJob)
234+
newJob.Client = c.dockerHandler.GetInternalDockerClient()
235+
newJob.Name = newJobsName
236+
newJob.buildMiddlewares()
237+
c.sh.AddJob(newJob)
238+
c.RunJobs[newJobsName] = newJob
239+
}
240+
}
241+
}
242+
151243
// ExecJobConfig contains all configuration params needed to build a ExecJob
152244
type ExecJobConfig struct {
153245
core.ExecJob `mapstructure:",squash"`

cli/config_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,16 @@ type SuiteConfig struct{}
1717

1818
var _ = Suite(&SuiteConfig{})
1919

20+
type TestLogger struct{}
21+
22+
func (*TestLogger) Criticalf(format string, args ...interface{}) {}
23+
func (*TestLogger) Debugf(format string, args ...interface{}) {}
24+
func (*TestLogger) Errorf(format string, args ...interface{}) {}
25+
func (*TestLogger) Noticef(format string, args ...interface{}) {}
26+
func (*TestLogger) Warningf(format string, args ...interface{}) {}
27+
2028
func (s *SuiteConfig) TestBuildFromString(c *C) {
21-
sh, err := BuildFromString(`
29+
conf, err := BuildFromString(`
2230
[job-exec "foo"]
2331
schedule = @every 10s
2432
@@ -33,10 +41,10 @@ func (s *SuiteConfig) TestBuildFromString(c *C) {
3341
3442
[job-service-run "bob"]
3543
schedule = @every 10s
36-
`)
44+
`, &TestLogger{})
3745

3846
c.Assert(err, IsNil)
39-
c.Assert(sh.Jobs, HasLen, 5)
47+
c.Assert(conf.JobsCount(), Equals, 5)
4048
}
4149

4250
func (s *SuiteConfig) TestJobDefaultsSet(c *C) {

0 commit comments

Comments
 (0)