-
Notifications
You must be signed in to change notification settings - Fork 278
nfd-worker: Watch features.d changes #2156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,6 +121,7 @@ type nfdWorker struct { | |
k8sClient k8sclient.Interface | ||
nfdClient nfdclient.Interface | ||
stop chan struct{} // channel for signaling stop | ||
sourceEvent chan string // channel for events from sources | ||
featureSources []source.FeatureSource | ||
labelSources []source.LabelSource | ||
ownerReference []metav1.OwnerReference | ||
|
@@ -248,6 +249,36 @@ func (w *nfdWorker) runFeatureDiscovery() error { | |
return nil | ||
} | ||
|
||
// Run feature discovery. | ||
func (w *nfdWorker) runFeatureDiscoveryBySourceName(source string) error { | ||
discoveryStart := time.Now() | ||
for _, s := range w.featureSources { | ||
if s.Name() == source { | ||
currentSourceStart := time.Now() | ||
if err := s.Discover(); err != nil { | ||
klog.ErrorS(err, "feature discovery failed", "source", s.Name()) | ||
} | ||
klog.V(3).InfoS("feature discovery completed", "featureSource", s.Name(), "duration", time.Since(currentSourceStart)) | ||
} | ||
} | ||
|
||
discoveryDuration := time.Since(discoveryStart) | ||
klog.V(2).InfoS("feature discovery of all sources completed", "duration", discoveryDuration) | ||
featureDiscoveryDuration.WithLabelValues(utils.NodeName()).Observe(discoveryDuration.Seconds()) | ||
if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 { | ||
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration) | ||
} | ||
// Get the set of feature labels. | ||
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp) | ||
|
||
// Update the node with the feature labels. | ||
if !w.config.Core.NoPublish { | ||
return w.advertiseFeatures(labels) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Set owner ref | ||
func (w *nfdWorker) setOwnerReference() error { | ||
ownerReference := []metav1.OwnerReference{} | ||
|
@@ -304,6 +335,15 @@ func (w *nfdWorker) Run() error { | |
labelTrigger.Reset(w.config.Core.SleepInterval.Duration) | ||
defer labelTrigger.Stop() | ||
|
||
w.sourceEvent = make(chan string) | ||
eventSources := source.GetAllEventSources() | ||
for _, s := range eventSources { | ||
Comment on lines
+339
to
+340
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this stuff should be done in the |
||
if err := s.SetNotifyChannel(w.sourceEvent); err != nil { | ||
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name()) | ||
return fmt.Errorf("failed to set notify channel for event source %s: %w", s.Name(), err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not error out in this case (just log the error). The design pattern of nfd-worker has been that log errors if we cannot access some data but don't crash. Let's follow that |
||
} | ||
} | ||
|
||
httpMux := http.NewServeMux() | ||
|
||
// Register to metrics server | ||
|
@@ -341,6 +381,12 @@ func (w *nfdWorker) Run() error { | |
return err | ||
} | ||
|
||
case sourceName := <-w.sourceEvent: | ||
err = w.runFeatureDiscoveryBySourceName(sourceName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could simplify this quite bit by having sourceEvent of type |
||
if err != nil { | ||
return err | ||
} | ||
|
||
case <-w.stop: | ||
klog.InfoS("shutting down nfd-worker") | ||
return nil | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |||||||||||||
|
||||||||||||||
"k8s.io/klog/v2" | ||||||||||||||
|
||||||||||||||
"github.com/fsnotify/fsnotify" | ||||||||||||||
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1" | ||||||||||||||
"sigs.k8s.io/node-feature-discovery/pkg/utils" | ||||||||||||||
"sigs.k8s.io/node-feature-discovery/source" | ||||||||||||||
|
@@ -65,10 +66,11 @@ var ( | |||||||||||||
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/" | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
// localSource implements the FeatureSource and LabelSource interfaces. | ||||||||||||||
// localSource implements the FeatureSource, LabelSource, EventSource interfaces. | ||||||||||||||
type localSource struct { | ||||||||||||||
features *nfdv1alpha1.Features | ||||||||||||||
config *Config | ||||||||||||||
features *nfdv1alpha1.Features | ||||||||||||||
config *Config | ||||||||||||||
fsWatcher *fsnotify.Watcher | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
type Config struct { | ||||||||||||||
|
@@ -87,6 +89,7 @@ var ( | |||||||||||||
_ source.FeatureSource = &src | ||||||||||||||
_ source.LabelSource = &src | ||||||||||||||
_ source.ConfigurableSource = &src | ||||||||||||||
_ source.EventSource = &src | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
// Name method of the LabelSource interface | ||||||||||||||
|
@@ -318,6 +321,49 @@ func getFileContent(fileName string) ([][]byte, error) { | |||||||||||||
return lines, nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (s *localSource) runNotifier(ch chan string) { | ||||||||||||||
for { | ||||||||||||||
select { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The runNotifier goroutine runs indefinitely without a way to stop it. Consider adding a context or stop channel to allow graceful shutdown and prevent goroutine leaks.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||
case event := <-s.fsWatcher.Events: | ||||||||||||||
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod | ||||||||||||||
if event.Op&opAny != 0 { | ||||||||||||||
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op) | ||||||||||||||
ch <- s.Name() | ||||||||||||||
} | ||||||||||||||
case err := <-s.fsWatcher.Errors: | ||||||||||||||
klog.ErrorS(err, "failed to watch features.d changes") | ||||||||||||||
} | ||||||||||||||
time.Sleep(1 * time.Second) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The hardcoded 1-second sleep in the event loop may cause unnecessary delays in event processing. Consider removing this sleep or making it configurable, as fsnotify events should be processed immediately.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ozhuraki I think copilot is up to something here :) If we get a burst of say 10 events, we'd read them one-per-second, causing a streak of 10 1-per-second updates in nfd-worker. We'd only want a 1 second total delay, i.e. read events as fast as we can but "group" them into one event that gets sent to the ch. Elsewhere we've used e.g. a patter of |
||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// SetNotifyChannel method of the EventSource Interface | ||||||||||||||
func (s *localSource) SetNotifyChannel(ch chan string) error { | ||||||||||||||
info, err := os.Stat(featureFilesDir) | ||||||||||||||
if err != nil { | ||||||||||||||
if !os.IsNotExist(err) { | ||||||||||||||
return err | ||||||||||||||
} | ||||||||||||||
ozhuraki marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if info != nil && info.IsDir() { | ||||||||||||||
ozhuraki marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
watcher, err := fsnotify.NewWatcher() | ||||||||||||||
if err != nil { | ||||||||||||||
return err | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
err = watcher.Add(featureFilesDir) | ||||||||||||||
if err != nil { | ||||||||||||||
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err) | ||||||||||||||
} | ||||||||||||||
s.fsWatcher = watcher | ||||||||||||||
|
||||||||||||||
go s.runNotifier(ch) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Starting the goroutine unconditionally even when fsWatcher is nil (when directory doesn't exist) will cause the goroutine to block indefinitely on nil channel reads, leading to a goroutine leak.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback
ozhuraki marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
return nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func init() { | ||||||||||||||
source.Register(&src) | ||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't update the metrics here as the metric is for "all enabled feature sources" and we're only doing one here. We can think about per-source metrics in a separate PR