@@ -13,6 +13,7 @@ import (
13
13
"github.com/operable/go-relay/relay/worker"
14
14
"golang.org/x/net/context"
15
15
"strings"
16
+ "time"
16
17
)
17
18
18
19
const (
@@ -45,6 +46,8 @@ type cogRelay struct {
45
46
catalog * bundle.Catalog
46
47
announcer Announcer
47
48
directivesReplyTo string
49
+ bundleTimer * time.Timer
50
+ cleanTimer * time.Timer
48
51
}
49
52
50
53
// NewRelay constructs a new Relay instance
@@ -84,10 +87,23 @@ func (r *cogRelay) Start() error {
84
87
if err := conn .Connect (r .connOpts ); err != nil {
85
88
return err
86
89
}
90
+ if r .config .DockerEnabled () {
91
+ r .cleanTimer = time .AfterFunc (r .config .Docker .CleanDuration (), r .scheduledDockerCleanup )
92
+ log .Infof ("Cleaning up Docker environment on %d second intervals." , r .config .Docker .CleanDuration ()/ time .Second )
93
+ }
94
+ log .Infof ("Refreshing bundle catalog on %d second intervals." , r .config .RefreshDuration ()/ time .Second )
87
95
return nil
88
96
}
89
97
90
98
func (r * cogRelay ) Stop () error {
99
+ if r .bundleTimer != nil {
100
+ r .bundleTimer .Stop ()
101
+ }
102
+ if r .config .DockerEnabled () {
103
+ if r .bundleTimer != nil {
104
+ r .cleanTimer .Stop ()
105
+ }
106
+ }
91
107
return nil
92
108
}
93
109
@@ -134,9 +150,11 @@ func (r *cogRelay) handleCommand(conn bus.Connection, topic string, message []by
134
150
Payload : message ,
135
151
}
136
152
ctx := context .WithValue (context .Background (), "invoke" , invoke )
137
- log .Debugf ("Queue stopped: %v" , r .queue .IsStopped ())
138
- log .Debugf ("Eqneueud request: %s" , r .queue .Enqueue (ctx ))
139
- log .Debugf ("Enqueued invocation request for %s" , topic )
153
+ if err := r .queue .Enqueue (ctx ); err != nil {
154
+ log .Debugf ("Failed enqueuing invocation request: %s." , err )
155
+ } else {
156
+ log .Debugf ("Enqueued invocation request for %s" , topic )
157
+ }
140
158
}
141
159
142
160
func (r * cogRelay ) handleDirective (conn bus.Connection , topic string , message []byte ) {
@@ -148,7 +166,7 @@ func (r *cogRelay) handleDirective(conn bus.Connection, topic string, message []
148
166
// Dispatch on mesasge type
149
167
switch tm .(type ) {
150
168
case * messages.ListBundlesResponseEnvelope :
151
- log .Info ("Processing bundle list " )
169
+ log .Debug ("Processing bundle catalog updates. " )
152
170
r .updateCatalog (tm .(* messages.ListBundlesResponseEnvelope ))
153
171
}
154
172
}
@@ -174,21 +192,34 @@ func (r *cogRelay) updateCatalog(envelope *messages.ListBundlesResponseEnvelope)
174
192
// TODO: This should be bi-directional sync to catch bundle unassignments too
175
193
r .catalog .AddBatch (bundles )
176
194
if r .catalog .IsChanged () {
177
- r .refreshBundles ()
178
- log .Info ("Changes to bundle assignments detected." )
179
- r .announcer .SendAnnouncement ()
195
+ if err := r .refreshBundles (); err != nil {
196
+ log .Errorf ("Bundle catalog refresh failed: %s." , err )
197
+ } else {
198
+ log .Info ("Changes to bundle catalog detected." )
199
+ r .announcer .SendAnnouncement ()
200
+ }
180
201
}
202
+ r .bundleTimer = time .AfterFunc (r .config .RefreshDuration (), r .scheduledBundleRefresh )
181
203
}
182
204
183
205
func (r * cogRelay ) refreshBundles () error {
184
206
dockerEngine , err := engines .NewDockerEngine (* r .config )
185
207
if err != nil {
186
- return err
208
+ if r .config .DockerEnabled () == false {
209
+ dockerEngine = nil
210
+ } else {
211
+ return err
212
+ }
187
213
}
188
214
for _ , name := range r .catalog .BundleNames () {
189
215
if bundle := r .catalog .FindLatest (name ); bundle != nil {
190
216
if bundle .NeedsRefresh () {
191
217
if bundle .IsDocker () {
218
+ if r .config .DockerEnabled () == false {
219
+ log .Infof ("Skipping Docker-based bundle %s %s." , bundle .Name , bundle .Version )
220
+ bundle .SetAvailable (false )
221
+ continue
222
+ }
192
223
avail , _ := dockerEngine .IsAvailable (bundle .Docker .Image , bundle .Docker .Tag )
193
224
bundle .SetAvailable (avail )
194
225
} else {
@@ -208,10 +239,34 @@ func (r *cogRelay) requestBundles() error {
208
239
},
209
240
}
210
241
raw , _ := json .Marshal (& msg )
211
- log .Info ("Refreshing command bundles ." )
242
+ log .Debug ("Refreshing command catalog ." )
212
243
return r .conn .Publish (infoTopic , raw )
213
244
}
214
245
246
+ func (r * cogRelay ) scheduledBundleRefresh () {
247
+ if err := r .requestBundles (); err != nil {
248
+ log .Errorf ("Scheduled bundle catalog refresh failed: %s." , err )
249
+ r .bundleTimer = time .AfterFunc (r .config .RefreshDuration (), r .scheduledBundleRefresh )
250
+ }
251
+ }
252
+
253
+ func (r * cogRelay ) scheduledDockerCleanup () {
254
+ engine , err := engines .NewDockerEngine (* r .config )
255
+ if err != nil {
256
+ log .Errorf ("Scheduled clean up of Docker environment failed: %s." , err )
257
+ } else {
258
+ cleaned := engine .Clean ()
259
+ container := "containers"
260
+ if cleaned == 1 {
261
+ container = "container"
262
+ }
263
+ if cleaned > 0 {
264
+ log .Infof ("Scheduled Docker clean up removed %d %s." , cleaned , container )
265
+ }
266
+ }
267
+ r .cleanTimer = time .AfterFunc (r .config .Docker .CleanDuration (), r .scheduledDockerCleanup )
268
+ }
269
+
215
270
func verifyDockerConfig (c * config.Config ) error {
216
271
if c .DockerEnabled () == true {
217
272
if err := engines .VerifyDockerConfig (c .Docker ); err != nil {
0 commit comments