Skip to content

Commit d30fdd3

Browse files
committed
nfd-worker: Watch features.d changes
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
1 parent 04d835d commit d30fdd3

File tree

3 files changed

+108
-9
lines changed

3 files changed

+108
-9
lines changed

pkg/nfd-worker/nfd-worker.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ type nfdWorker struct {
120120
kubernetesNamespace string
121121
k8sClient k8sclient.Interface
122122
nfdClient nfdclient.Interface
123-
stop chan struct{} // channel for signaling stop
123+
stop chan struct{} // channel for signaling stop
124+
sourceEvent chan *source.FeatureSource // channel for events from sources
124125
featureSources []source.FeatureSource
125126
labelSources []source.LabelSource
126127
ownerReference []metav1.OwnerReference
@@ -220,6 +221,19 @@ func (i *infiniteTicker) Reset(d time.Duration) {
220221
}
221222
}
222223

224+
// Publish labels.
225+
func (w *nfdWorker) publishLabels() error {
226+
// Get the set of feature labels.
227+
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
228+
229+
// Update the node with the feature labels.
230+
if !w.config.Core.NoPublish {
231+
return w.advertiseFeatures(labels)
232+
}
233+
234+
return nil
235+
}
236+
223237
// Run feature discovery.
224238
func (w *nfdWorker) runFeatureDiscovery() error {
225239
discoveryStart := time.Now()
@@ -237,12 +251,9 @@ func (w *nfdWorker) runFeatureDiscovery() error {
237251
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
238252
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
239253
}
240-
// Get the set of feature labels.
241-
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
242254

243-
// Update the node with the feature labels.
244-
if !w.config.Core.NoPublish {
245-
return w.advertiseFeatures(labels)
255+
if err := w.publishLabels(); err != nil {
256+
return err
246257
}
247258

248259
return nil
@@ -341,6 +352,13 @@ func (w *nfdWorker) Run() error {
341352
return err
342353
}
343354

355+
case s := <-w.sourceEvent:
356+
(*s).Discover()
357+
err = w.publishLabels()
358+
if err != nil {
359+
return err
360+
}
361+
344362
case <-w.stop:
345363
klog.InfoS("shutting down nfd-worker")
346364
return nil
@@ -525,6 +543,14 @@ func (w *nfdWorker) configure(filepath string, overrides string) error {
525543
s.SetConfig(c.Sources[s.Name()])
526544
}
527545

546+
w.sourceEvent = make(chan *source.FeatureSource)
547+
eventSources := source.GetAllEventSources()
548+
for _, s := range eventSources {
549+
if err := s.SetNotifyChannel(w.sourceEvent); err != nil {
550+
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
551+
}
552+
}
553+
528554
klog.InfoS("configuration successfully updated", "configuration", w.config)
529555
return nil
530556
}

source/local/local.go

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"k8s.io/klog/v2"
2828

29+
"github.com/fsnotify/fsnotify"
2930
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
3031
"sigs.k8s.io/node-feature-discovery/pkg/utils"
3132
"sigs.k8s.io/node-feature-discovery/source"
@@ -65,10 +66,11 @@ var (
6566
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
6667
)
6768

68-
// localSource implements the FeatureSource and LabelSource interfaces.
69+
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
6970
type localSource struct {
70-
features *nfdv1alpha1.Features
71-
config *Config
71+
features *nfdv1alpha1.Features
72+
config *Config
73+
fsWatcher *fsnotify.Watcher
7274
}
7375

7476
type Config struct {
@@ -87,6 +89,7 @@ var (
8789
_ source.FeatureSource = &src
8890
_ source.LabelSource = &src
8991
_ source.ConfigurableSource = &src
92+
_ source.EventSource = &src
9093
)
9194

9295
// Name method of the LabelSource interface
@@ -318,6 +321,57 @@ func getFileContent(fileName string) ([][]byte, error) {
318321
return lines, nil
319322
}
320323

324+
func (s *localSource) runNotifier(ch chan *source.FeatureSource) {
325+
rateLimit := time.After(time.Second)
326+
limit := false
327+
for {
328+
select {
329+
case event := <-s.fsWatcher.Events:
330+
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
331+
if event.Op&opAny != 0 {
332+
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
333+
if !limit {
334+
fs := source.FeatureSource(s)
335+
ch <- &fs
336+
limit = true
337+
}
338+
}
339+
case err := <-s.fsWatcher.Errors:
340+
klog.ErrorS(err, "failed to watch features.d changes")
341+
case <-rateLimit:
342+
rateLimit = time.After(time.Second)
343+
limit = false
344+
}
345+
}
346+
}
347+
348+
// SetNotifyChannel method of the EventSource Interface
349+
func (s *localSource) SetNotifyChannel(ch chan *source.FeatureSource) error {
350+
info, err := os.Stat(featureFilesDir)
351+
if err != nil {
352+
if !os.IsNotExist(err) {
353+
return err
354+
}
355+
}
356+
357+
if info != nil && info.IsDir() {
358+
watcher, err := fsnotify.NewWatcher()
359+
if err != nil {
360+
return err
361+
}
362+
363+
err = watcher.Add(featureFilesDir)
364+
if err != nil {
365+
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
366+
}
367+
s.fsWatcher = watcher
368+
369+
go s.runNotifier(ch)
370+
}
371+
372+
return nil
373+
}
374+
321375
func init() {
322376
source.Register(&src)
323377
}

source/source.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ type SupplementalSource interface {
7777
DisableByDefault() bool
7878
}
7979

80+
// EventSource is an interface for a source that can send events
81+
type EventSource interface {
82+
FeatureSource
83+
84+
// SetNotifyChannel sets the channel
85+
SetNotifyChannel(chan *FeatureSource) error
86+
}
87+
8088
// FeatureLabelValue represents the value of one feature label
8189
type FeatureLabelValue interface{}
8290

@@ -155,6 +163,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
155163
return all
156164
}
157165

166+
// GetAllEventSources returns all registered event sources
167+
func GetAllEventSources() map[string]EventSource {
168+
all := make(map[string]EventSource)
169+
for k, v := range sources {
170+
if s, ok := v.(EventSource); ok {
171+
all[k] = s
172+
}
173+
}
174+
return all
175+
}
176+
158177
// GetAllFeatures returns a combined set of all features from all feature
159178
// sources.
160179
func GetAllFeatures() *nfdv1alpha1.Features {

0 commit comments

Comments
 (0)