Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion cmd/nfd-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package main
import (
"flag"
"fmt"
"net"
"os"
"strings"

"k8s.io/klog/v2"

Expand All @@ -32,7 +34,8 @@ import (

const (
// ProgramName is the canonical name of this program
ProgramName = "nfd-worker"
ProgramName = "nfd-worker"
kubeletSecurePort = 10250
)

func main() {
Expand Down Expand Up @@ -82,6 +85,20 @@ func parseArgs(flags *flag.FlagSet, osArgs ...string) *worker.Args {
os.Exit(2)
}

if len(args.KubeletConfigURI) == 0 {
nodeAddress := os.Getenv("NODE_ADDRESS")
if len(nodeAddress) == 0 {
_, _ = fmt.Fprintf(flags.Output(), "unable to determine the default kubelet config endpoint 'https://${NODE_ADDRESS}:%d/configz' due to empty NODE_ADDRESS environment, "+
"please either define the NODE_ADDRESS environment variable or specify endpoint with the -kubelet-config-uri flag\n", kubeletSecurePort)
os.Exit(1)
}
if isIPv6(nodeAddress) {
// With IPv6 we need to wrap the IP address in brackets as we append :port below
nodeAddress = "[" + nodeAddress + "]"
}
args.KubeletConfigURI = fmt.Sprintf("https://%s:%d/configz", nodeAddress, kubeletSecurePort)
}

// Handle overrides
flags.Visit(func(f *flag.Flag) {
switch f.Name {
Expand All @@ -106,6 +123,10 @@ func initFlags(flagset *flag.FlagSet) (*worker.Args, *worker.ConfigOverrideArgs)
"Config file to use.")
flagset.StringVar(&args.Kubeconfig, "kubeconfig", "",
"Kubeconfig to use")
flagset.StringVar(&args.KubeletConfigURI, "kubelet-config-uri", "",
"Kubelet config URI path. Default to kubelet configz endpoint.")
flagset.StringVar(&args.APIAuthTokenFile, "api-auth-token-file", "/var/run/secrets/kubernetes.io/serviceaccount/token",
"API auth token file path. It is used to request kubelet configz endpoint, only takes effect when kubelet-config-uri is https. Default to /var/run/secrets/kubernetes.io/serviceaccount/token.")
flagset.BoolVar(&args.Oneshot, "oneshot", false,
"Do not publish feature labels")
flagset.IntVar(&args.Port, "port", 8080,
Expand Down Expand Up @@ -134,3 +155,8 @@ func initFlags(flagset *flag.FlagSet) (*worker.Args, *worker.ConfigOverrideArgs)

return args, overrides
}

func isIPv6(addr string) bool {
ip := net.ParseIP(addr)
return ip != nil && strings.Count(ip.String(), ":") >= 2
}
2 changes: 2 additions & 0 deletions deployment/base/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ resources:
- worker-serviceaccount.yaml
- worker-role.yaml
- worker-rolebinding.yaml
- worker-clusterrole.yaml
- worker-clusterrolebinding.yaml
12 changes: 12 additions & 0 deletions deployment/base/rbac/worker-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nfd-worker
rules:
- apiGroups:
- ""
resources: ["pods"]
verbs: ["get"]
- apiGroups: [""]
resources: ["nodes/proxy"]
verbs: ["get"]
12 changes: 12 additions & 0 deletions deployment/base/rbac/worker-clusterrolebinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nfd-worker
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: nfd-worker
subjects:
- kind: ServiceAccount
name: nfd-worker
namespace: default
2 changes: 1 addition & 1 deletion deployment/base/rbac/worker-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ rules:
resources:
- pods
verbs:
- get
- get
8 changes: 8 additions & 0 deletions deployment/components/common/env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@
valueFrom:
fieldRef:
fieldPath: metadata.uid
- name: NODE_ADDRESS
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
38 changes: 1 addition & 37 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package nfdtopologyupdater
import (
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"

Expand Down Expand Up @@ -99,7 +98,7 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (Nf
}
go ntf.Run()

kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
kubeletConfigFunc, err := kubeconf.GetKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -379,38 +378,3 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
updateAttribute(lhs, attr)
}
}

func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}
47 changes: 36 additions & 11 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/utils/ptr"
klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/yaml"

apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -56,7 +58,7 @@ import (
_ "sigs.k8s.io/node-feature-discovery/source/fake"
_ "sigs.k8s.io/node-feature-discovery/source/kernel"
_ "sigs.k8s.io/node-feature-discovery/source/local"
_ "sigs.k8s.io/node-feature-discovery/source/memory"
memory "sigs.k8s.io/node-feature-discovery/source/memory"
_ "sigs.k8s.io/node-feature-discovery/source/network"
_ "sigs.k8s.io/node-feature-discovery/source/pci"
_ "sigs.k8s.io/node-feature-discovery/source/storage"
Expand Down Expand Up @@ -94,13 +96,16 @@ type Labels map[string]string

// Args are the command line arguments of NfdWorker.
type Args struct {
ConfigFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
Port int
NoOwnerRefs bool
ConfigFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
Port int
NoOwnerRefs bool
KubeletConfigPath string
KubeletConfigURI string
APIAuthTokenFile string

Overrides ConfigOverrideArgs
}
Expand All @@ -124,6 +129,7 @@ type nfdWorker struct {
featureSources []source.FeatureSource
labelSources []source.LabelSource
ownerReference []metav1.OwnerReference
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
}

// This ticker can represent infinite and normal intervals.
Expand Down Expand Up @@ -169,12 +175,25 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
stop: make(chan struct{}),
}

if nfd.args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile)
}

for _, o := range opts {
o.apply(nfd)
}

if nfd.args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile)
kubeletConfigFunc, err := kubeconf.GetKubeletConfigFunc(nfd.args.KubeletConfigURI, nfd.args.APIAuthTokenFile)
if err != nil {
return nil, err
}

nfd = &nfdWorker{
kubeletConfigFunc: kubeletConfigFunc,
}

for _, o := range opts {
o.apply(nfd)
}

Comment on lines +191 to 198
Copy link
Preview

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nfd variable is being reassigned to a new nfdWorker struct, which discards all the previous initialization including the stop channel and other fields set earlier in the function. This will cause the worker to lose its previous configuration.

Suggested change
nfd = &nfdWorker{
kubeletConfigFunc: kubeletConfigFunc,
}
for _, o := range opts {
o.apply(nfd)
}
nfd.kubeletConfigFunc = kubeletConfigFunc

Copilot uses AI. Check for mistakes.

// k8sClient might've been set via opts by tests
Expand Down Expand Up @@ -312,6 +331,12 @@ func (w *nfdWorker) Run() error {
httpMux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}))
registerVersion(version.Get())

klConfig, err := w.kubeletConfigFunc()
if err != nil {
return err
}
memory.SetSwapMode(klConfig.MemorySwap.SwapBehavior)

err = w.runFeatureDiscovery()
if err != nil {
return err
Expand Down Expand Up @@ -624,7 +649,7 @@ func (m *nfdWorker) updateNodeFeatureObject(labels Labels) error {
return err
}
nodename := utils.NodeName()
namespace := m.kubernetesNamespace
namespace := os.Getenv("POD_NAMESPACE")

features := source.GetAllFeatures()

Expand Down
36 changes: 36 additions & 0 deletions pkg/utils/kubeconf/kubelet_config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kubeconf
import (
"context"
"fmt"
"net/url"

kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"
Expand Down Expand Up @@ -55,3 +56,38 @@ func GetKubeletConfigFromLocalFile(kubeletConfigPath string) (*kubeletconfigv1be

return kubeletConfig, nil
}

func GetKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}
21 changes: 16 additions & 5 deletions source/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ type memorySource struct {

// Singleton source instance
var (
src memorySource
_ source.FeatureSource = &src
_ source.LabelSource = &src
src memorySource
_ source.FeatureSource = &src
_ source.LabelSource = &src
defaultSwapBehavior = "NoSwap"
swapBehavior string
)

func SetSwapMode(behavior string) {
swapBehavior = behavior
}

// Name returns an identifier string for this feature source.
func (s *memorySource) Name() string { return Name }

Expand All @@ -80,6 +86,7 @@ func (s *memorySource) GetLabels() (source.FeatureLabels, error) {
// Swap
if isSwap, ok := features.Attributes[SwapFeature].Elements["enabled"]; ok && isSwap == "true" {
labels["swap"] = true
labels["swap.behavior"] = features.Attributes[SwapFeature].Elements["behavior"]
}

// NVDIMM
Expand All @@ -106,12 +113,16 @@ func (s *memorySource) Discover() error {
} else {
s.features.Attributes[NumaFeature] = nfdv1alpha1.AttributeFeatureSet{Elements: numa}
}

// Detect Swap
// Detect Swap and Swap Behavior
if swap, err := detectSwap(); err != nil {
klog.ErrorS(err, "failed to detect Swap nodes")
} else {
s.features.Attributes[SwapFeature] = nfdv1alpha1.AttributeFeatureSet{Elements: swap}
swap["behavior"] = defaultSwapBehavior
if swapBehavior != "" {
swap["behavior"] = swapBehavior
}

}

// Detect NVDIMM
Expand Down