Skip to content

Commit 87e0c39

Browse files
committed
nfd-worker: Watch features.d changes
Signed-off-by: Oleg Zhurakivskyy <[email protected]>
1 parent 04d835d commit 87e0c39

File tree

4 files changed

+132
-17
lines changed

4 files changed

+132
-17
lines changed

pkg/nfd-worker/nfd-worker-internal_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"testing"
2424
"time"
2525

26+
"golang.org/x/net/context"
27+
2628
. "github.com/smartystreets/goconvey/convey"
2729
"github.com/vektra/errors"
2830
fakeclient "k8s.io/client-go/kubernetes/fake"
@@ -102,7 +104,7 @@ func TestConfigParse(t *testing.T) {
102104
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}`
103105

104106
Convey("and no core cmdline flags have been specified", func() {
105-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
107+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
106108

107109
Convey("core overrides should be in effect", func() {
108110
So(worker.config.Core.LabelSources, ShouldResemble, []string{"fake"})
@@ -114,7 +116,7 @@ func TestConfigParse(t *testing.T) {
114116
worker.args = Args{Overrides: ConfigOverrideArgs{
115117
LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"},
116118
FeatureSources: &utils.StringSliceVal{"cpu"}}}
117-
So(worker.configure("non-existing-file", overrides), ShouldBeNil)
119+
So(worker.configure(context.Background(), "non-existing-file", overrides), ShouldBeNil)
118120

119121
Convey("core cmdline flags should be in effect instead overrides", func() {
120122
So(worker.config.Core.LabelSources, ShouldResemble, []string{"cpu", "kernel", "pci"})
@@ -150,7 +152,7 @@ sources:
150152

151153
Convey("and a proper config file is specified", func() {
152154
worker.args = Args{Overrides: ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"cpu", "kernel", "pci"}}}
153-
So(worker.configure(f.Name(), ""), ShouldBeNil)
155+
So(worker.configure(context.Background(), f.Name(), ""), ShouldBeNil)
154156

155157
Convey("specified configuration should take effect", func() {
156158
// Verify core config
@@ -172,7 +174,7 @@ sources:
172174
Convey("and a proper config file and overrides are given", func() {
173175
worker.args = Args{Overrides: ConfigOverrideArgs{FeatureSources: &utils.StringSliceVal{"cpu"}}}
174176
overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"pci": {"deviceClassWhitelist": ["03"]}}}`
175-
So(worker.configure(f.Name(), overrides), ShouldBeNil)
177+
So(worker.configure(context.Background(), f.Name(), overrides), ShouldBeNil)
176178

177179
Convey("overrides should take precedence over the config file", func() {
178180
// Verify core config
@@ -205,7 +207,7 @@ func TestNewNfdWorker(t *testing.T) {
205207
So(err, ShouldBeNil)
206208
})
207209
worker := w.(*nfdWorker)
208-
So(worker.configure("", ""), ShouldBeNil)
210+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
209211
Convey("all sources should be enabled and the whitelist regexp should be empty", func() {
210212
So(len(worker.featureSources), ShouldEqual, len(source.GetAllFeatureSources())-1)
211213
So(len(worker.labelSources), ShouldEqual, len(source.GetAllLabelSources())-1)
@@ -223,7 +225,7 @@ func TestNewNfdWorker(t *testing.T) {
223225
So(err, ShouldBeNil)
224226
})
225227
worker := w.(*nfdWorker)
226-
So(worker.configure("", ""), ShouldBeNil)
228+
So(worker.configure(context.Background(), "", ""), ShouldBeNil)
227229
Convey("proper sources should be enabled", func() {
228230
So(len(worker.featureSources), ShouldEqual, 1)
229231
So(worker.featureSources[0].Name(), ShouldEqual, "cpu")

pkg/nfd-worker/nfd-worker.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ type nfdWorker struct {
120120
kubernetesNamespace string
121121
k8sClient k8sclient.Interface
122122
nfdClient nfdclient.Interface
123-
stop chan struct{} // channel for signaling stop
123+
stop chan struct{} // channel for signaling stop
124+
sourceEvent chan *source.FeatureSource // channel for events from sources
124125
featureSources []source.FeatureSource
125126
labelSources []source.LabelSource
126127
ownerReference []metav1.OwnerReference
@@ -220,6 +221,19 @@ func (i *infiniteTicker) Reset(d time.Duration) {
220221
}
221222
}
222223

224+
// Publish labels.
225+
func (w *nfdWorker) publishLabels() error {
226+
// Get the set of feature labels.
227+
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
228+
229+
// Update the node with the feature labels.
230+
if !w.config.Core.NoPublish {
231+
return w.advertiseFeatures(labels)
232+
}
233+
234+
return nil
235+
}
236+
223237
// Run feature discovery.
224238
func (w *nfdWorker) runFeatureDiscovery() error {
225239
discoveryStart := time.Now()
@@ -237,12 +251,9 @@ func (w *nfdWorker) runFeatureDiscovery() error {
237251
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
238252
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
239253
}
240-
// Get the set of feature labels.
241-
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)
242254

243-
// Update the node with the feature labels.
244-
if !w.config.Core.NoPublish {
245-
return w.advertiseFeatures(labels)
255+
if err := w.publishLabels(); err != nil {
256+
return err
246257
}
247258

248259
return nil
@@ -293,8 +304,11 @@ func (w *nfdWorker) setOwnerReference() error {
293304
func (w *nfdWorker) Run() error {
294305
klog.InfoS("Node Feature Discovery Worker", "version", version.Get(), "nodeName", utils.NodeName(), "namespace", w.kubernetesNamespace)
295306

307+
ctx, cancel := context.WithCancel(context.Background())
308+
defer cancel()
309+
296310
// Read configuration file
297-
err := w.configure(w.configFilePath, w.args.Options)
311+
err := w.configure(ctx, w.configFilePath, w.args.Options)
298312
if err != nil {
299313
return err
300314
}
@@ -341,6 +355,15 @@ func (w *nfdWorker) Run() error {
341355
return err
342356
}
343357

358+
case s := <-w.sourceEvent:
359+
if err := (*s).Discover(); err != nil {
360+
klog.ErrorS(err, "feature discovery failed", "source", (*s).Name())
361+
break
362+
}
363+
if err = w.publishLabels(); err != nil {
364+
return err
365+
}
366+
344367
case <-w.stop:
345368
klog.InfoS("shutting down nfd-worker")
346369
return nil
@@ -461,7 +484,7 @@ func (w *nfdWorker) configureCore(c coreConfig) error {
461484
}
462485

463486
// Parse configuration options
464-
func (w *nfdWorker) configure(filepath string, overrides string) error {
487+
func (w *nfdWorker) configure(ctx context.Context, filepath string, overrides string) error {
465488
// Create a new default config
466489
c := newDefaultConfig()
467490
confSources := source.GetAllConfigurableSources()
@@ -525,6 +548,14 @@ func (w *nfdWorker) configure(filepath string, overrides string) error {
525548
s.SetConfig(c.Sources[s.Name()])
526549
}
527550

551+
w.sourceEvent = make(chan *source.FeatureSource)
552+
eventSources := source.GetAllEventSources()
553+
for _, s := range eventSources {
554+
if err := s.SetNotifyChannel(ctx, w.sourceEvent); err != nil {
555+
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
556+
}
557+
}
558+
528559
klog.InfoS("configuration successfully updated", "configuration", w.config)
529560
return nil
530561
}

source/local/local.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
"strings"
2525
"time"
2626

27+
"golang.org/x/net/context"
2728
"k8s.io/klog/v2"
2829

30+
"github.com/fsnotify/fsnotify"
2931
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
3032
"sigs.k8s.io/node-feature-discovery/pkg/utils"
3133
"sigs.k8s.io/node-feature-discovery/source"
@@ -65,10 +67,11 @@ var (
6567
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
6668
)
6769

68-
// localSource implements the FeatureSource and LabelSource interfaces.
70+
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
6971
type localSource struct {
70-
features *nfdv1alpha1.Features
71-
config *Config
72+
features *nfdv1alpha1.Features
73+
config *Config
74+
fsWatcher *fsnotify.Watcher
7275
}
7376

7477
type Config struct {
@@ -87,6 +90,7 @@ var (
8790
_ source.FeatureSource = &src
8891
_ source.LabelSource = &src
8992
_ source.ConfigurableSource = &src
93+
_ source.EventSource = &src
9094
)
9195

9296
// Name method of the LabelSource interface
@@ -318,6 +322,63 @@ func getFileContent(fileName string) ([][]byte, error) {
318322
return lines, nil
319323
}
320324

325+
func (s *localSource) runNotifier(ctx context.Context, ch chan *source.FeatureSource) {
326+
rateLimit := time.After(time.Second)
327+
limit := false
328+
for {
329+
select {
330+
case event := <-s.fsWatcher.Events:
331+
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
332+
if event.Op&opAny != 0 {
333+
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
334+
if !limit {
335+
fs := source.FeatureSource(s)
336+
ch <- &fs
337+
limit = true
338+
}
339+
}
340+
case err := <-s.fsWatcher.Errors:
341+
klog.ErrorS(err, "failed to watch features.d changes")
342+
case <-rateLimit:
343+
rateLimit = time.After(time.Second)
344+
limit = false
345+
case <-ctx.Done():
346+
return
347+
}
348+
}
349+
}
350+
351+
// SetNotifyChannel method of the EventSource Interface
352+
func (s *localSource) SetNotifyChannel(ctx context.Context, ch chan *source.FeatureSource) error {
353+
info, err := os.Stat(featureFilesDir)
354+
if err != nil {
355+
if !os.IsNotExist(err) {
356+
return err
357+
}
358+
}
359+
360+
if info != nil && info.IsDir() {
361+
watcher, err := fsnotify.NewWatcher()
362+
if err != nil {
363+
return err
364+
}
365+
366+
err = watcher.Add(featureFilesDir)
367+
if err != nil {
368+
errWatcher := watcher.Close()
369+
if errWatcher != nil {
370+
klog.ErrorS(errWatcher, "failed to close fsnotify watcher")
371+
}
372+
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
373+
}
374+
s.fsWatcher = watcher
375+
376+
go s.runNotifier(ctx, ch)
377+
}
378+
379+
return nil
380+
}
381+
321382
func init() {
322383
source.Register(&src)
323384
}

source/source.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package source
2121
import (
2222
"fmt"
2323

24+
"golang.org/x/net/context"
25+
2426
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
2527
)
2628

@@ -77,6 +79,14 @@ type SupplementalSource interface {
7779
DisableByDefault() bool
7880
}
7981

82+
// EventSource is an interface for a source that can send events
83+
type EventSource interface {
84+
FeatureSource
85+
86+
// SetNotifyChannel sets the notification channel used to send updates about feature changes.
87+
SetNotifyChannel(ctx context.Context, ch chan *FeatureSource) error
88+
}
89+
8090
// FeatureLabelValue represents the value of one feature label
8191
type FeatureLabelValue interface{}
8292

@@ -155,6 +165,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
155165
return all
156166
}
157167

168+
// GetAllEventSources returns all registered event sources
169+
func GetAllEventSources() map[string]EventSource {
170+
all := make(map[string]EventSource)
171+
for k, v := range sources {
172+
if s, ok := v.(EventSource); ok {
173+
all[k] = s
174+
}
175+
}
176+
return all
177+
}
178+
158179
// GetAllFeatures returns a combined set of all features from all feature
159180
// sources.
160181
func GetAllFeatures() *nfdv1alpha1.Features {

0 commit comments

Comments
 (0)