Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type nfdWorker struct {
k8sClient k8sclient.Interface
nfdClient nfdclient.Interface
stop chan struct{} // channel for signaling stop
sourceEvent chan string // channel for events from sources
featureSources []source.FeatureSource
labelSources []source.LabelSource
ownerReference []metav1.OwnerReference
Expand Down Expand Up @@ -248,6 +249,36 @@ func (w *nfdWorker) runFeatureDiscovery() error {
return nil
}

// Run feature discovery.
func (w *nfdWorker) runFeatureDiscoveryBySourceName(source string) error {
discoveryStart := time.Now()
for _, s := range w.featureSources {
if s.Name() == source {
currentSourceStart := time.Now()
if err := s.Discover(); err != nil {
klog.ErrorS(err, "feature discovery failed", "source", s.Name())
}
klog.V(3).InfoS("feature discovery completed", "featureSource", s.Name(), "duration", time.Since(currentSourceStart))
}
}

discoveryDuration := time.Since(discoveryStart)
klog.V(2).InfoS("feature discovery of all sources completed", "duration", discoveryDuration)
featureDiscoveryDuration.WithLabelValues(utils.NodeName()).Observe(discoveryDuration.Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't update the metrics here as the metric is for "all enabled feature sources" and we're only doing one here. We can think about per-source metrics in a separate PR

if w.config.Core.SleepInterval.Duration > 0 && discoveryDuration > w.config.Core.SleepInterval.Duration/2 {
klog.InfoS("feature discovery sources took over half of sleep interval ", "duration", discoveryDuration, "sleepInterval", w.config.Core.SleepInterval.Duration)
}
// Get the set of feature labels.
labels := createFeatureLabels(w.labelSources, w.config.Core.LabelWhiteList.Regexp)

// Update the node with the feature labels.
if !w.config.Core.NoPublish {
return w.advertiseFeatures(labels)
}

return nil
}

// Set owner ref
func (w *nfdWorker) setOwnerReference() error {
ownerReference := []metav1.OwnerReference{}
Expand Down Expand Up @@ -304,6 +335,15 @@ func (w *nfdWorker) Run() error {
labelTrigger.Reset(w.config.Core.SleepInterval.Duration)
defer labelTrigger.Stop()

w.sourceEvent = make(chan string)
eventSources := source.GetAllEventSources()
for _, s := range eventSources {
Comment on lines +339 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this stuff should be done in the configure() function. The other similar stuff is there, too.

if err := s.SetNotifyChannel(w.sourceEvent); err != nil {
klog.ErrorS(err, "failed to set notify channel for event source", "source", s.Name())
return fmt.Errorf("failed to set notify channel for event source %s: %w", s.Name(), err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not error out in this case (just log the error). The design pattern of nfd-worker has been that log errors if we cannot access some data but don't crash. Let's follow that

}
}

httpMux := http.NewServeMux()

// Register to metrics server
Expand Down Expand Up @@ -341,6 +381,12 @@ func (w *nfdWorker) Run() error {
return err
}

case sourceName := <-w.sourceEvent:
err = w.runFeatureDiscoveryBySourceName(sourceName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could simplify this quite bit by having sourceEvent of type chan FeatureSource. We could just do s.Discover() here and ditch runFeatureDiscoveryBySourceName altogether. Need to split runFeatureDiscovery() into two parts to avoid copy-pasting code: feature-discovery and feature-advertisement (create labels etc). Then call the feature-advertisement part here.

if err != nil {
return err
}

case <-w.stop:
klog.InfoS("shutting down nfd-worker")
return nil
Expand Down
52 changes: 49 additions & 3 deletions source/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/klog/v2"

"github.com/fsnotify/fsnotify"
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/source"
Expand Down Expand Up @@ -65,10 +66,11 @@ var (
featureFilesDir = "/etc/kubernetes/node-feature-discovery/features.d/"
)

// localSource implements the FeatureSource and LabelSource interfaces.
// localSource implements the FeatureSource, LabelSource, EventSource interfaces.
type localSource struct {
features *nfdv1alpha1.Features
config *Config
features *nfdv1alpha1.Features
config *Config
fsWatcher *fsnotify.Watcher
}

type Config struct {
Expand All @@ -87,6 +89,7 @@ var (
_ source.FeatureSource = &src
_ source.LabelSource = &src
_ source.ConfigurableSource = &src
_ source.EventSource = &src
)

// Name method of the LabelSource interface
Expand Down Expand Up @@ -318,6 +321,49 @@ func getFileContent(fileName string) ([][]byte, error) {
return lines, nil
}

func (s *localSource) runNotifier(ch chan string) {
for {
select {
Copy link
Preview

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runNotifier goroutine runs indefinitely without a way to stop it. Consider adding a context or stop channel to allow graceful shutdown and prevent goroutine leaks.

Suggested change
select {
func (s *localSource) runNotifier(ctx context.Context, ch chan struct{}) {
for {
select {
case <-ctx.Done():
return

Copilot uses AI. Check for mistakes.

case event := <-s.fsWatcher.Events:
opAny := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename | fsnotify.Chmod
if event.Op&opAny != 0 {
klog.V(2).InfoS("fsnotify event", "eventName", event.Name, "eventOp", event.Op)
ch <- s.Name()
}
case err := <-s.fsWatcher.Errors:
klog.ErrorS(err, "failed to watch features.d changes")
}
time.Sleep(1 * time.Second)
Copy link
Preview

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 1-second sleep in the event loop may cause unnecessary delays in event processing. Consider removing this sleep or making it configurable, as fsnotify events should be processed immediately.

Suggested change
time.Sleep(1 * time.Second)

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ozhuraki I think copilot is up to something here :) If we get a burst of say 10 events, we'd read them one-per-second, causing a streak of 10 1-per-second updates in nfd-worker. We'd only want a 1 second total delay, i.e. read events as fast as we can but "group" them into one event that gets sent to the ch.

Elsewhere we've used e.g. a patter of rateLimit := time.After(time.Second)..., you could come up with something better, thoug...

}
}

// SetNotifyChannel method of the EventSource Interface
func (s *localSource) SetNotifyChannel(ch chan string) error {
info, err := os.Stat(featureFilesDir)
if err != nil {
if !os.IsNotExist(err) {
return err
}
}

if info != nil && info.IsDir() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}

err = watcher.Add(featureFilesDir)
if err != nil {
return fmt.Errorf("unable to access %v: %w", featureFilesDir, err)
}
s.fsWatcher = watcher

go s.runNotifier(ch)
}

Copy link
Preview

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting the goroutine unconditionally even when fsWatcher is nil (when directory doesn't exist) will cause the goroutine to block indefinitely on nil channel reads, leading to a goroutine leak.

Suggested change
go s.runNotifier(ch)
}

Copilot uses AI. Check for mistakes.

return nil
}

func init() {
source.Register(&src)
}
19 changes: 19 additions & 0 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ type SupplementalSource interface {
DisableByDefault() bool
}

// EventSource is an interface for a source that can send events
type EventSource interface {
Source

// SetNotifyChannel sets the channel
SetNotifyChannel(chan string) error
}

// FeatureLabelValue represents the value of one feature label
type FeatureLabelValue interface{}

Expand Down Expand Up @@ -155,6 +163,17 @@ func GetAllConfigurableSources() map[string]ConfigurableSource {
return all
}

// GetAllEventSources returns all registered event sources
func GetAllEventSources() map[string]EventSource {
all := make(map[string]EventSource)
for k, v := range sources {
if s, ok := v.(EventSource); ok {
all[k] = s
}
}
return all
}

// GetAllFeatures returns a combined set of all features from all feature
// sources.
func GetAllFeatures() *nfdv1alpha1.Features {
Expand Down