From 5e80117b737b0506e5570728c727ff20f49602d9 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Tue, 20 May 2025 17:28:24 -0700 Subject: [PATCH 01/23] Cache devices and their symlinks in node driver, periodically noting changes and printing the full list. example: periodic symlink cache read: /dev/disk/by-id/google-persistent-disk-0 -> /dev/sda; /dev/disk/by-id/google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67 -> /dev/sdb; /dev/disk/by-id/scsi-0Google_PersistentDisk_persistent-disk-0 -> /dev/sda; /dev/disk/by-id/scsi-0Google_PersistentDisk_pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67 -> /dev/sdb --- cmd/gce-pd-csi-driver/main.go | 5 +- pkg/gce-pd-csi-driver/cache.go | 9 +- pkg/linkcache/cache.go | 173 +++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 pkg/linkcache/cache.go diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index fee76338a..56f50c02b 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -34,6 +34,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -297,9 +298,11 @@ func handle() { if err := setupDataCache(ctx, *nodeName, nodeServer.MetadataService.GetName()); err != nil { klog.Errorf("Data Cache setup failed: %v", err) } - go driver.StartWatcher(*nodeName) + go driver.StartWatcher(ctx, *nodeName) } } + + go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx) } err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index b2ccd5e70..1dc486690 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -615,7 +615,7 @@ func InitializeDataCacheNode(nodeId string) error { return nil } -func StartWatcher(nodeName string) { +func StartWatcher(ctx context.Context, nodeName string) { dirToWatch := "/dev/" watcher, err := fsnotify.NewWatcher() if err != nil { @@ -630,7 +630,7 @@ func StartWatcher(nodeName string) { } errorCh := make(chan error, 1) // Handle the error received from the watcher goroutine - go watchDiskDetaches(watcher, nodeName, errorCh) + go watchDiskDetaches(ctx, watcher, nodeName, errorCh) select { case err := <-errorCh: @@ -638,9 +638,12 @@ func StartWatcher(nodeName string) { } } -func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { +func watchDiskDetaches(ctx context.Context, watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { for { select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return nil // watch for errors case err := <-watcher.Errors: errorCh <- fmt.Errorf("disk update event errored: %v", err) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go new file mode 100644 index 000000000..ee8e70b73 --- /dev/null +++ b/pkg/linkcache/cache.go @@ -0,0 +1,173 @@ +package linkcache + +import ( + "context" + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "k8s.io/klog/v2" +) + +var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) + +// ListingCache polls the filesystem at the specified directory once per +// periodand checks each non-directory entry for a symlink. The results are +// cached. Changes to the cache are logged, as well as the full contents of the +// cache. The cache's Run() method is expected to be called in a goroutine. +// Its cancellation is controlled via the context argument. +type ListingCache struct { + period time.Duration + dir string + links *linkCache +} + +func NewListingCache(period time.Duration, dir string) *ListingCache { + return &ListingCache{ + period: period, + dir: dir, + links: newLinkCache(), + } +} + +// Run starts the cache's background loop. The filesystem is listed and the +// cache updated according to the frequency specified by the period. It will run +// until the context is cancelled. +func (l *ListingCache) Run(ctx context.Context) { + // Start the loop that runs every minute + ticker := time.NewTicker(l.period) + defer ticker.Stop() + + // Initial list and update so we don't wait for the first tick. + err := l.listAndUpdate() + if err != nil { + klog.Warningf("Error listing and updating symlinks: %v", err) + } + + for { + select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return + case <-ticker.C: + err := l.listAndUpdate() + if err != nil { + klog.Warningf("Error listing and updating symlinks: %v", err) + continue + } + + klog.Infof("periodic symlink cache read: %s", l.links.String()) + } + } +} + +func (l *ListingCache) listAndUpdate() error { + visited := make(map[string]struct{}) + + entries, err := os.ReadDir(l.dir) + if err != nil { + return fmt.Errorf("failed to read directory %s: %w", l.dir, err) + } + + var errs []error + for _, entry := range entries { + if entry.IsDir() { + continue + } + + // TODO(juliankatz): To have certainty this works for all edge cases, we + // need to test this with a manually partitioned disk. + if partitionNameRegex.MatchString(entry.Name()) { + continue + } + + diskByIdPath := filepath.Join(l.dir, entry.Name()) + + // Add the device to the map regardless of successful symlink eval. + // Otherwise, a broken symlink will lead us to remove it from the cache. + visited[diskByIdPath] = struct{}{} + + realFSPath, err := filepath.EvalSymlinks(diskByIdPath) + if err != nil { + errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) + l.links.BrokenSymlink(diskByIdPath) + continue + } + + l.links.AddOrUpdateDevice(diskByIdPath, realFSPath) + } + + for _, id := range l.links.DeviceIDs() { + if _, found := visited[id]; !found { + l.links.RemoveDevice(id) + } + } + + if len(errs) > 0 { + return fmt.Errorf("failed to evaluate symlinks for %d devices: %v", len(errs), errs) + } + return nil +} + +// linkCache is a structure that maintains a cache of symlinks between +// /dev/disk/by-id and /dev/sd* paths. It provides methods to add/update, +// retrieve, and remove device symlinks from the cache. +type linkCache struct { + devices map[string]linkCacheEntry +} + +type linkCacheEntry struct { + path string + // If true, the symlink is known to be broken. + brokenSymlink bool +} + +func newLinkCache() *linkCache { + return &linkCache{ + devices: make(map[string]linkCacheEntry), + } +} + +func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { + prevEntry, exists := d.devices[symlink] + if !exists || prevEntry.path != realPath { + klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) + } + d.devices[symlink] = linkCacheEntry{path: realPath, brokenSymlink: false} +} + +// BrokenSymlink marks a symlink as broken. If the symlink is not in the cache, +// it is ignored. +func (d *linkCache) BrokenSymlink(symlink string) { + if entry, ok := d.devices[symlink]; ok { + entry.brokenSymlink = true + d.devices[symlink] = entry + } +} + +func (d *linkCache) RemoveDevice(symlink string) { + delete(d.devices, symlink) +} + +func (d *linkCache) DeviceIDs() []string { + ids := make([]string, 0, len(d.devices)) + for id := range d.devices { + ids = append(ids, id) + } + return ids +} + +func (d *linkCache) String() string { + var sb strings.Builder + for symlink, entry := range d.devices { + if entry.brokenSymlink { + sb.WriteString(fmt.Sprintf("%s -> broken symlink... last known value: %s; ", symlink, entry.path)) + } else { + sb.WriteString(fmt.Sprintf("%s -> %s; ", symlink, entry.path)) + } + } + return strings.TrimSuffix(sb.String(), "; ") +} From 0a5f453fce943a0b983f90f27ccc4bc8aba5564b Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 11:02:59 -0700 Subject: [PATCH 02/23] Some doc comment updates --- pkg/linkcache/cache.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index ee8e70b73..f62d7eb40 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -15,10 +15,10 @@ import ( var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) // ListingCache polls the filesystem at the specified directory once per -// periodand checks each non-directory entry for a symlink. The results are -// cached. Changes to the cache are logged, as well as the full contents of the -// cache. The cache's Run() method is expected to be called in a goroutine. -// Its cancellation is controlled via the context argument. +// period and checks each non-directory entry for a symlink. The results are +// cached. Changes to the cache are logged, as well as the full contents of the +// cache. The cache's Run() method is expected to be called in a goroutine. Its +// cancellation is controlled via the context argument. type ListingCache struct { period time.Duration dir string @@ -33,9 +33,9 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } -// Run starts the cache's background loop. The filesystem is listed and the -// cache updated according to the frequency specified by the period. It will run -// until the context is cancelled. +// Run starts the cache's background loop. The filesystem is listed and the cache +// updated according to the frequency specified by the period. It will run until +// the context is cancelled. func (l *ListingCache) Run(ctx context.Context) { // Start the loop that runs every minute ticker := time.NewTicker(l.period) From 6c5fb86aeaf095ba83ea8168692392fbe01f5f88 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:33:41 -0700 Subject: [PATCH 03/23] Add unit tests --- pkg/linkcache/cache.go | 31 +++-- pkg/linkcache/cache_test.go | 238 ++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 7 deletions(-) create mode 100644 pkg/linkcache/cache_test.go diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index f62d7eb40..7f3040725 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -3,9 +3,11 @@ package linkcache import ( "context" "fmt" + "maps" "os" "path/filepath" "regexp" + "slices" "strings" "time" @@ -14,6 +16,23 @@ import ( var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) +// fsInterface defines the filesystem operations needed by ListingCache +type fsInterface interface { + ReadDir(name string) ([]os.DirEntry, error) + EvalSymlinks(path string) (string, error) +} + +// realFS implements fsInterface using the real filesystem +type realFS struct{} + +func (f *realFS) ReadDir(name string) ([]os.DirEntry, error) { + return os.ReadDir(name) +} + +func (f *realFS) EvalSymlinks(path string) (string, error) { + return filepath.EvalSymlinks(path) +} + // ListingCache polls the filesystem at the specified directory once per // period and checks each non-directory entry for a symlink. The results are // cached. Changes to the cache are logged, as well as the full contents of the @@ -23,6 +42,7 @@ type ListingCache struct { period time.Duration dir string links *linkCache + fs fsInterface } func NewListingCache(period time.Duration, dir string) *ListingCache { @@ -30,6 +50,7 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { period: period, dir: dir, links: newLinkCache(), + fs: &realFS{}, } } @@ -67,7 +88,7 @@ func (l *ListingCache) Run(ctx context.Context) { func (l *ListingCache) listAndUpdate() error { visited := make(map[string]struct{}) - entries, err := os.ReadDir(l.dir) + entries, err := l.fs.ReadDir(l.dir) if err != nil { return fmt.Errorf("failed to read directory %s: %w", l.dir, err) } @@ -90,7 +111,7 @@ func (l *ListingCache) listAndUpdate() error { // Otherwise, a broken symlink will lead us to remove it from the cache. visited[diskByIdPath] = struct{}{} - realFSPath, err := filepath.EvalSymlinks(diskByIdPath) + realFSPath, err := l.fs.EvalSymlinks(diskByIdPath) if err != nil { errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) l.links.BrokenSymlink(diskByIdPath) @@ -153,11 +174,7 @@ func (d *linkCache) RemoveDevice(symlink string) { } func (d *linkCache) DeviceIDs() []string { - ids := make([]string, 0, len(d.devices)) - for id := range d.devices { - ids = append(ids, id) - } - return ids + return slices.Collect(maps.Keys(d.devices)) } func (d *linkCache) String() string { diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go new file mode 100644 index 000000000..65c86fcbd --- /dev/null +++ b/pkg/linkcache/cache_test.go @@ -0,0 +1,238 @@ +package linkcache + +import ( + "os" + "testing" + "testing/fstest" +) + +const ( + // Test disk names in /dev/disk/by-id format + gcpPersistentDiskID = "google-persistent-disk-0" + gcpPVCID = "google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67" + gcpPersistentDiskPartitionID = "google-persistent-disk-0-part1" + + // Test device paths in /dev format + devicePathSDA = "/dev/sda" + devicePathSDB = "/dev/sdb" +) + +// mockFS implements fsInterface for testing +type mockFS struct { + fstest.MapFS + symlinks map[string]string +} + +func newMockFS() *mockFS { + return &mockFS{ + MapFS: make(fstest.MapFS), + symlinks: make(map[string]string), + } +} + +func (m *mockFS) ReadDir(name string) ([]os.DirEntry, error) { + entries, err := m.MapFS.ReadDir(name) + if err != nil { + return nil, err + } + return entries, nil +} + +func (m *mockFS) EvalSymlinks(path string) (string, error) { + if target, ok := m.symlinks[path]; ok { + return target, nil + } + return "", os.ErrNotExist +} + +func TestListAndUpdate(t *testing.T) { + tests := []struct { + name string + setupFS func(*mockFS) + expectedLinks map[string]string + expectError bool + }{ + { + name: "valid symlinks", + setupFS: func(m *mockFS) { + // Create some device files + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + m.MapFS[gcpPVCID] = &fstest.MapFile{} + // Create symlinks + m.symlinks[gcpPersistentDiskID] = devicePathSDA + m.symlinks[gcpPVCID] = devicePathSDB + }, + expectedLinks: map[string]string{ + gcpPersistentDiskID: devicePathSDA, + gcpPVCID: devicePathSDB, + }, + expectError: false, + }, + { + name: "broken symlink not added to cache", + setupFS: func(m *mockFS) { + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + // No symlink target for gcpPersistentDiskID + }, + expectedLinks: map[string]string{}, + expectError: true, + }, + { + name: "partition files ignored", + setupFS: func(m *mockFS) { + m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} + m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + m.symlinks[gcpPersistentDiskID] = devicePathSDA + }, + expectedLinks: map[string]string{ + gcpPersistentDiskID: devicePathSDA, + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mock := newMockFS() + tt.setupFS(mock) + + cache := NewListingCache(0, ".") + cache.fs = mock // Inject our mock filesystem + err := cache.listAndUpdate() + + if tt.expectError { + if err == nil { + t.Error("expected error but got none") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + } + + // Verify the cache contents + for symlink, expectedTarget := range tt.expectedLinks { + entry, exists := cache.links.devices[symlink] + if !exists { + t.Errorf("symlink %s should exist in cache", symlink) + continue + } + if entry.path != expectedTarget { + t.Errorf("symlink %s should point to %s, got %s", symlink, expectedTarget, entry.path) + } + if entry.brokenSymlink { + t.Errorf("symlink %s should not be marked as broken", symlink) + } + } + }) + } +} + +func TestListAndUpdateWithChanges(t *testing.T) { + mock := newMockFS() + cache := NewListingCache(0, ".") + cache.fs = mock + + // Initial state: one disk with a valid symlink + mock.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} + mock.symlinks[gcpPersistentDiskID] = devicePathSDA + + // First listAndUpdate should add the disk to cache + err := cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in first listAndUpdate: %v", err) + } + + // Verify initial state + entry, exists := cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should exist in cache after first listAndUpdate") + } + if entry.path != devicePathSDA { + t.Errorf("gcpPersistentDiskID should point to %s, got %s", devicePathSDA, entry.path) + } + + // Add a new disk and update the symlink target + mock.MapFS[gcpPVCID] = &fstest.MapFile{} + mock.symlinks[gcpPVCID] = devicePathSDB + mock.symlinks[gcpPersistentDiskID] = devicePathSDB // Update existing disk's target + + // Second listAndUpdate should update the cache + err = cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in second listAndUpdate: %v", err) + } + + // Verify both disks are in cache with correct paths + entry, exists = cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPersistentDiskID should now point to %s, got %s", devicePathSDB, entry.path) + } + + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should exist in cache after second listAndUpdate") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should point to %s, got %s", devicePathSDB, entry.path) + } + + // Break the symlink for gcpPersistentDiskID but keep the file + delete(mock.symlinks, gcpPersistentDiskID) + + // Third listAndUpdate should mark the disk as broken but keep its last known value + err = cache.listAndUpdate() + if err == nil { + t.Error("expected error for broken symlink") + } + + // Verify gcpPersistentDiskID is marked as broken but maintains its last known value + entry, exists = cache.links.devices[gcpPersistentDiskID] + if !exists { + t.Fatal("gcpPersistentDiskID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPersistentDiskID should maintain its last known value %s, got %s", devicePathSDB, entry.path) + } + if !entry.brokenSymlink { + t.Error("gcpPersistentDiskID should be marked as broken") + } + + // Verify gcpPVCID is still valid + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + } + if entry.brokenSymlink { + t.Error("gcpPVCID should not be marked as broken") + } + + // Remove one disk + delete(mock.MapFS, gcpPersistentDiskID) + delete(mock.symlinks, gcpPersistentDiskID) + + // Fourth listAndUpdate should remove the deleted disk + err = cache.listAndUpdate() + if err != nil { + t.Fatalf("unexpected error in fourth listAndUpdate: %v", err) + } + + // Verify only gcpPVCID remains + if _, exists := cache.links.devices[gcpPersistentDiskID]; exists { + t.Error("gcpPersistentDiskID should be removed from cache") + } + + entry, exists = cache.links.devices[gcpPVCID] + if !exists { + t.Fatal("gcpPVCID should still exist in cache") + } + if entry.path != devicePathSDB { + t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + } +} From 67c90d00355a7bbd4df2909b55acaac1d527d9de Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:35:20 -0700 Subject: [PATCH 04/23] improve partition unit test --- pkg/linkcache/cache_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 65c86fcbd..e43799bc7 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -83,6 +83,7 @@ func TestListAndUpdate(t *testing.T) { m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} m.symlinks[gcpPersistentDiskID] = devicePathSDA + m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" }, expectedLinks: map[string]string{ gcpPersistentDiskID: devicePathSDA, From f13477d8713fa2900849b926f159482aac9fb730 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 15:37:48 -0700 Subject: [PATCH 05/23] Log on removal as well --- pkg/linkcache/cache.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 7f3040725..c2ceb4ab1 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -170,7 +170,10 @@ func (d *linkCache) BrokenSymlink(symlink string) { } func (d *linkCache) RemoveDevice(symlink string) { - delete(d.devices, symlink) + if entry, ok := d.devices[symlink]; ok { + klog.Infof("Removing device %s with path %s from cache, brokenSymlink: %t", symlink, entry.path, entry.brokenSymlink) + delete(d.devices, symlink) + } } func (d *linkCache) DeviceIDs() []string { From 682267c703b382bc5c9960a3f85c788530f02e4e Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 16:03:48 -0700 Subject: [PATCH 06/23] Updated unit tests to be clearer, relying on asserting linkCache --- pkg/linkcache/cache.go | 9 ++ pkg/linkcache/cache_test.go | 194 +++++++++++++----------------------- 2 files changed, 79 insertions(+), 124 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index c2ceb4ab1..006eed27d 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -54,6 +54,15 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } +func NewMockListingCache(period time.Duration, dir string) *ListingCache { + return &ListingCache{ + period: period, + dir: dir, + links: newLinkCache(), + fs: &mockFS{}, + } +} + // Run starts the cache's background loop. The filesystem is listed and the cache // updated according to the frequency specified by the period. It will run until // the context is cancelled. diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index e43799bc7..7683ed258 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -4,8 +4,12 @@ import ( "os" "testing" "testing/fstest" + + "github.com/google/go-cmp/cmp" ) +var allowUnexportedLinkCache = cmp.AllowUnexported(linkCache{}, linkCacheEntry{}) + const ( // Test disk names in /dev/disk/by-id format gcpPersistentDiskID = "google-persistent-disk-0" @@ -49,7 +53,7 @@ func TestListAndUpdate(t *testing.T) { tests := []struct { name string setupFS func(*mockFS) - expectedLinks map[string]string + expectedCache *linkCache expectError bool }{ { @@ -62,9 +66,11 @@ func TestListAndUpdate(t *testing.T) { m.symlinks[gcpPersistentDiskID] = devicePathSDA m.symlinks[gcpPVCID] = devicePathSDB }, - expectedLinks: map[string]string{ - gcpPersistentDiskID: devicePathSDA, - gcpPVCID: devicePathSDB, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, + }, }, expectError: false, }, @@ -74,8 +80,10 @@ func TestListAndUpdate(t *testing.T) { m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} // No symlink target for gcpPersistentDiskID }, - expectedLinks: map[string]string{}, - expectError: true, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{}, + }, + expectError: true, }, { name: "partition files ignored", @@ -85,8 +93,10 @@ func TestListAndUpdate(t *testing.T) { m.symlinks[gcpPersistentDiskID] = devicePathSDA m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" }, - expectedLinks: map[string]string{ - gcpPersistentDiskID: devicePathSDA, + expectedCache: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + }, }, expectError: false, }, @@ -111,129 +121,65 @@ func TestListAndUpdate(t *testing.T) { } } - // Verify the cache contents - for symlink, expectedTarget := range tt.expectedLinks { - entry, exists := cache.links.devices[symlink] - if !exists { - t.Errorf("symlink %s should exist in cache", symlink) - continue - } - if entry.path != expectedTarget { - t.Errorf("symlink %s should point to %s, got %s", symlink, expectedTarget, entry.path) - } - if entry.brokenSymlink { - t.Errorf("symlink %s should not be marked as broken", symlink) - } + // Compare the entire cache state + if diff := cmp.Diff(tt.expectedCache, cache.links, allowUnexportedLinkCache); diff != "" { + t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) } }) } } -func TestListAndUpdateWithChanges(t *testing.T) { - mock := newMockFS() - cache := NewListingCache(0, ".") - cache.fs = mock - - // Initial state: one disk with a valid symlink - mock.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - mock.symlinks[gcpPersistentDiskID] = devicePathSDA - - // First listAndUpdate should add the disk to cache - err := cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in first listAndUpdate: %v", err) - } - - // Verify initial state - entry, exists := cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should exist in cache after first listAndUpdate") - } - if entry.path != devicePathSDA { - t.Errorf("gcpPersistentDiskID should point to %s, got %s", devicePathSDA, entry.path) - } - - // Add a new disk and update the symlink target - mock.MapFS[gcpPVCID] = &fstest.MapFile{} - mock.symlinks[gcpPVCID] = devicePathSDB - mock.symlinks[gcpPersistentDiskID] = devicePathSDB // Update existing disk's target - - // Second listAndUpdate should update the cache - err = cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in second listAndUpdate: %v", err) - } - - // Verify both disks are in cache with correct paths - entry, exists = cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPersistentDiskID should now point to %s, got %s", devicePathSDB, entry.path) - } - - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should exist in cache after second listAndUpdate") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should point to %s, got %s", devicePathSDB, entry.path) - } - - // Break the symlink for gcpPersistentDiskID but keep the file - delete(mock.symlinks, gcpPersistentDiskID) - - // Third listAndUpdate should mark the disk as broken but keep its last known value - err = cache.listAndUpdate() - if err == nil { - t.Error("expected error for broken symlink") - } - - // Verify gcpPersistentDiskID is marked as broken but maintains its last known value - entry, exists = cache.links.devices[gcpPersistentDiskID] - if !exists { - t.Fatal("gcpPersistentDiskID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPersistentDiskID should maintain its last known value %s, got %s", devicePathSDB, entry.path) - } - if !entry.brokenSymlink { - t.Error("gcpPersistentDiskID should be marked as broken") - } - - // Verify gcpPVCID is still valid - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) - } - if entry.brokenSymlink { - t.Error("gcpPVCID should not be marked as broken") - } - - // Remove one disk - delete(mock.MapFS, gcpPersistentDiskID) - delete(mock.symlinks, gcpPersistentDiskID) - - // Fourth listAndUpdate should remove the deleted disk - err = cache.listAndUpdate() - if err != nil { - t.Fatalf("unexpected error in fourth listAndUpdate: %v", err) +func TestLinkCache(t *testing.T) { + tests := []struct { + name string + setupCache func(*linkCache) + expected *linkCache + }{ + { + name: "AddOrUpdateDevice", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.AddOrUpdateDevice("symlink2", "/dev/sdb") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + "symlink1": {path: "/dev/sda", brokenSymlink: false}, + "symlink2": {path: "/dev/sdb", brokenSymlink: false}, + }, + }, + }, + { + name: "BrokenSymlink", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.BrokenSymlink("symlink1") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + "symlink1": {path: "/dev/sda", brokenSymlink: true}, + }, + }, + }, + { + name: "RemoveDevice", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice("symlink1", "/dev/sda") + lc.RemoveDevice("symlink1") + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{}, + }, + }, } - // Verify only gcpPVCID remains - if _, exists := cache.links.devices[gcpPersistentDiskID]; exists { - t.Error("gcpPersistentDiskID should be removed from cache") - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache := newLinkCache() + tt.setupCache(cache) - entry, exists = cache.links.devices[gcpPVCID] - if !exists { - t.Fatal("gcpPVCID should still exist in cache") - } - if entry.path != devicePathSDB { - t.Errorf("gcpPVCID should still point to %s, got %s", devicePathSDB, entry.path) + if diff := cmp.Diff(tt.expected, cache, allowUnexportedLinkCache); diff != "" { + t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) + } + }) } } From 4237b80fa942395f2579d8198ad7af81fbc4c611 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 21 May 2025 16:30:07 -0700 Subject: [PATCH 07/23] Remove unused broken function --- pkg/linkcache/cache.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 006eed27d..c2ceb4ab1 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -54,15 +54,6 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { } } -func NewMockListingCache(period time.Duration, dir string) *ListingCache { - return &ListingCache{ - period: period, - dir: dir, - links: newLinkCache(), - fs: &mockFS{}, - } -} - // Run starts the cache's background loop. The filesystem is listed and the cache // updated according to the frequency specified by the period. It will run until // the context is cancelled. From ab44f988ef5345eb4cf17b1a340a10a588e974e9 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 23 May 2025 14:04:02 -0700 Subject: [PATCH 08/23] Move partition checking into the inner linkcache type. This makes it easier to unit test. --- pkg/linkcache/cache.go | 14 ++++++++++---- pkg/linkcache/cache_test.go | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index c2ceb4ab1..034cba99d 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -99,14 +99,12 @@ func (l *ListingCache) listAndUpdate() error { continue } - // TODO(juliankatz): To have certainty this works for all edge cases, we - // need to test this with a manually partitioned disk. + diskByIdPath := filepath.Join(l.dir, entry.Name()) + if partitionNameRegex.MatchString(entry.Name()) { continue } - diskByIdPath := filepath.Join(l.dir, entry.Name()) - // Add the device to the map regardless of successful symlink eval. // Otherwise, a broken symlink will lead us to remove it from the cache. visited[diskByIdPath] = struct{}{} @@ -153,6 +151,14 @@ func newLinkCache() *linkCache { } func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { + // Ignore partitions, which are noise as far as our logging is concerned. + // Expression: -part[0-9]+$ + if partitionNameRegex.MatchString(symlink) { + // TODO(juliankatz): To have certainty this works for all edge cases, we + // need to test this with a manually partitioned disk. + return + } + prevEntry, exists := d.devices[symlink] if !exists || prevEntry.path != realPath { klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 7683ed258..0b811e833 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -170,6 +170,20 @@ func TestLinkCache(t *testing.T) { devices: map[string]linkCacheEntry{}, }, }, + { + name: "PartitionIgnored", + setupCache: func(lc *linkCache) { + lc.AddOrUpdateDevice(gcpPersistentDiskPartitionID, devicePathSDA+"1") + lc.AddOrUpdateDevice(gcpPersistentDiskID, devicePathSDA) + lc.AddOrUpdateDevice(gcpPVCID, devicePathSDB) + }, + expected: &linkCache{ + devices: map[string]linkCacheEntry{ + gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, + gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, + }, + }, + }, } for _, tt := range tests { From 2ad5fcdcd8f24466e6af42880a88de35dc7f2c89 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 23 May 2025 14:14:36 -0700 Subject: [PATCH 09/23] Log when linkcache Run is triggered --- pkg/linkcache/cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go index 034cba99d..d4ec426ba 100644 --- a/pkg/linkcache/cache.go +++ b/pkg/linkcache/cache.go @@ -58,6 +58,8 @@ func NewListingCache(period time.Duration, dir string) *ListingCache { // updated according to the frequency specified by the period. It will run until // the context is cancelled. func (l *ListingCache) Run(ctx context.Context) { + klog.Infof("Starting symlink cache watcher for directory %s with period %s", l.dir, l.period) + // Start the loop that runs every minute ticker := time.NewTicker(l.period) defer ticker.Stop() @@ -150,6 +152,9 @@ func newLinkCache() *linkCache { } } +// AddOrUpdateDevice adds a new device or updates an existing device in the cache. +// It ignores partition symlinks as they are considered noise for logging purposes. +// If the symlink already exists and the real path has changed, it logs the update. func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { // Ignore partitions, which are noise as far as our logging is concerned. // Expression: -part[0-9]+$ From 4fc1f45747a64a6ccb3be891330685dd30ced623 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 14:38:56 -0700 Subject: [PATCH 10/23] New implementation that is hooked into nodestage/unstage. Just linux right now. --- cmd/gce-pd-csi-driver/main.go | 10 +- .../base/controller/controller.yaml | 1 + .../images/stable-master/image.yaml | 5 + pkg/gce-pd-csi-driver/cache.go | 42 +--- pkg/gce-pd-csi-driver/gce-pd-driver.go | 3 +- pkg/gce-pd-csi-driver/node.go | 25 ++- pkg/k8sclient/node.go | 49 +++++ pkg/linkcache/cache.go | 204 ------------------ pkg/linkcache/cache_test.go | 198 ----------------- pkg/linkcache/devices.go | 136 ++++++++++++ 10 files changed, 219 insertions(+), 454 deletions(-) create mode 100644 pkg/k8sclient/node.go delete mode 100644 pkg/linkcache/cache.go create mode 100644 pkg/linkcache/devices.go diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 56f50c02b..7d9d03482 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -276,6 +276,12 @@ func handle() { klog.Fatalf("Failed to get node info from API server: %v", err.Error()) } + deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) + if err != nil { + klog.Fatalf("Failed to create device cache: %v", err.Error()) + } + go deviceCache.Run(ctx) + // TODO(2042): Move more of the constructor args into this struct nsArgs := &driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, @@ -284,6 +290,7 @@ func handle() { DataCacheEnabledNodePool: isDataCacheEnabledNodePool, SysfsPath: "/sys", MetricsManager: metricsManager, + DeviceCache: deviceCache, } nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) @@ -302,9 +309,10 @@ func handle() { } } - go linkcache.NewListingCache(1*time.Minute, "/dev/disk/by-id/").Run(ctx) } + klog.Infof("NOT BLOCKED") + err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) if err != nil { klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error()) diff --git a/deploy/kubernetes/base/controller/controller.yaml b/deploy/kubernetes/base/controller/controller.yaml index e15b8593a..302874f82 100644 --- a/deploy/kubernetes/base/controller/controller.yaml +++ b/deploy/kubernetes/base/controller/controller.yaml @@ -145,6 +145,7 @@ spec: - "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme" - "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml" - --enable-data-cache + - --run-node-service=false - --enable-multitenancy command: - /gce-pd-csi-driver diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index 592b248ff..ea85d22c3 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -44,9 +44,14 @@ metadata: name: imagetag-gcepd-driver imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver +<<<<<<< HEAD # Don't change stable image without changing pdImagePlaceholder in # test/k8s-integration/main.go newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver +======= + # pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag + newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver +>>>>>>> 264b82ed (New implementation that is hooked into nodestage/unstage. Just linux) newTag: "latest" --- diff --git a/pkg/gce-pd-csi-driver/cache.go b/pkg/gce-pd-csi-driver/cache.go index 1dc486690..1b894d53c 100644 --- a/pkg/gce-pd-csi-driver/cache.go +++ b/pkg/gce-pd-csi-driver/cache.go @@ -7,19 +7,14 @@ import ( "regexp" "strconv" "strings" - "time" csi "github.com/container-storage-interface/spec/lib/go/csi" fsnotify "github.com/fsnotify/fsnotify" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" ) const ( @@ -257,18 +252,11 @@ func ValidateDataCacheConfig(dataCacheMode string, dataCacheSize string, ctx con } func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } + if val, found := node.GetLabels()[fmt.Sprintf(common.NodeLabelPrefix, common.DataCacheLssdCountLabel)]; found { dataCacheCount, err := strconv.Atoi(val) if err != nil { @@ -280,30 +268,6 @@ func GetDataCacheCountFromNodeLabel(ctx context.Context, nodeName string) (int, return 0, nil } -func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { - var nodeObj *v1.Node - backoff := wait.Backoff{ - Duration: 1 * time.Second, - Factor: 2.0, - Steps: 5, - } - err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { - node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) - return false, nil - } - nodeObj = node - klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) - return true, nil - }) - - if err != nil { - klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) - } - return nodeObj, err -} - func FetchRaidedLssdCountForDatacache() (int, error) { raidedPath, err := fetchRAIDedLocalSsdPath() if err != nil { diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 76aa38381..83ff9767a 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -159,6 +159,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi DataCacheEnabledNodePool: args.DataCacheEnabledNodePool, SysfsPath: args.SysfsPath, metricsManager: args.MetricsManager, + DeviceCache: args.DeviceCache, } } @@ -184,7 +185,7 @@ func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelT maxLogChar = grpcLogCharCap klog.V(4).Infof("Driver: %v", gceDriver.name) - //Start the nonblocking GRPC + // Start the nonblocking GRPC s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager) // TODO(#34): Only start specific servers based on a flag. // In the future have this only run specific combinations of servers depending on which version this is. diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index cb430355c..b46072d66 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -32,14 +32,14 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/resizefs" @@ -80,6 +80,8 @@ type GCENodeServer struct { csi.UnimplementedNodeServer metricsManager *metrics.MetricsManager + // A cache of the device paths for the volumes that are attached to the node. + DeviceCache *linkcache.DeviceCache } type NodeServerArgs struct { @@ -97,6 +99,7 @@ type NodeServerArgs struct { SysfsPath string MetricsManager *metrics.MetricsManager + DeviceCache *linkcache.DeviceCache } var _ csi.NodeServer = &GCENodeServer{} @@ -507,6 +510,11 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage } } + err = ns.DeviceCache.AddVolume(volumeID) + if err != nil { + klog.Warningf("Error adding volume %s to cache: %v", volumeID, err) + } + klog.V(4).Infof("NodeStageVolume succeeded on %v to %s", volumeID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil } @@ -620,6 +628,9 @@ func (ns *GCENodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns return nil, status.Errorf(codes.DataLoss, "Failed to cleanup cache for volume %s: %v", volumeID, err) } } + + ns.DeviceCache.RemoveVolume(volumeID) + klog.V(4).Infof("NodeUnstageVolume succeeded on %v from %s", volumeID, stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil } @@ -875,15 +886,7 @@ func (ns *GCENodeServer) GetVolumeLimits(ctx context.Context) (int64, error) { } func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string) (int64, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - return 0, err - } - kubeClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return 0, err - } - node, err := getNodeWithRetry(ctx, kubeClient, nodeName) + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { return 0, err } diff --git a/pkg/k8sclient/node.go b/pkg/k8sclient/node.go new file mode 100644 index 000000000..1c4fa9b8b --- /dev/null +++ b/pkg/k8sclient/node.go @@ -0,0 +1,49 @@ +package k8sclient + +import ( + "context" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +func GetNodeWithRetry(ctx context.Context, nodeName string) (*v1.Node, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + return getNodeWithRetry(ctx, kubeClient, nodeName) +} + +func getNodeWithRetry(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) { + var nodeObj *v1.Node + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Steps: 5, + } + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(_ context.Context) (bool, error) { + node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Error getting node %s: %v, retrying...\n", nodeName, err) + return false, nil + } + nodeObj = node + klog.V(4).Infof("Successfully retrieved node info %s\n", nodeName) + return true, nil + }) + + if err != nil { + klog.Errorf("Failed to get node %s after retries: %v\n", nodeName, err) + } + return nodeObj, err +} diff --git a/pkg/linkcache/cache.go b/pkg/linkcache/cache.go deleted file mode 100644 index d4ec426ba..000000000 --- a/pkg/linkcache/cache.go +++ /dev/null @@ -1,204 +0,0 @@ -package linkcache - -import ( - "context" - "fmt" - "maps" - "os" - "path/filepath" - "regexp" - "slices" - "strings" - "time" - - "k8s.io/klog/v2" -) - -var partitionNameRegex = regexp.MustCompile(`-part[0-9]+$`) - -// fsInterface defines the filesystem operations needed by ListingCache -type fsInterface interface { - ReadDir(name string) ([]os.DirEntry, error) - EvalSymlinks(path string) (string, error) -} - -// realFS implements fsInterface using the real filesystem -type realFS struct{} - -func (f *realFS) ReadDir(name string) ([]os.DirEntry, error) { - return os.ReadDir(name) -} - -func (f *realFS) EvalSymlinks(path string) (string, error) { - return filepath.EvalSymlinks(path) -} - -// ListingCache polls the filesystem at the specified directory once per -// period and checks each non-directory entry for a symlink. The results are -// cached. Changes to the cache are logged, as well as the full contents of the -// cache. The cache's Run() method is expected to be called in a goroutine. Its -// cancellation is controlled via the context argument. -type ListingCache struct { - period time.Duration - dir string - links *linkCache - fs fsInterface -} - -func NewListingCache(period time.Duration, dir string) *ListingCache { - return &ListingCache{ - period: period, - dir: dir, - links: newLinkCache(), - fs: &realFS{}, - } -} - -// Run starts the cache's background loop. The filesystem is listed and the cache -// updated according to the frequency specified by the period. It will run until -// the context is cancelled. -func (l *ListingCache) Run(ctx context.Context) { - klog.Infof("Starting symlink cache watcher for directory %s with period %s", l.dir, l.period) - - // Start the loop that runs every minute - ticker := time.NewTicker(l.period) - defer ticker.Stop() - - // Initial list and update so we don't wait for the first tick. - err := l.listAndUpdate() - if err != nil { - klog.Warningf("Error listing and updating symlinks: %v", err) - } - - for { - select { - case <-ctx.Done(): - klog.Infof("Context done, stopping watcher") - return - case <-ticker.C: - err := l.listAndUpdate() - if err != nil { - klog.Warningf("Error listing and updating symlinks: %v", err) - continue - } - - klog.Infof("periodic symlink cache read: %s", l.links.String()) - } - } -} - -func (l *ListingCache) listAndUpdate() error { - visited := make(map[string]struct{}) - - entries, err := l.fs.ReadDir(l.dir) - if err != nil { - return fmt.Errorf("failed to read directory %s: %w", l.dir, err) - } - - var errs []error - for _, entry := range entries { - if entry.IsDir() { - continue - } - - diskByIdPath := filepath.Join(l.dir, entry.Name()) - - if partitionNameRegex.MatchString(entry.Name()) { - continue - } - - // Add the device to the map regardless of successful symlink eval. - // Otherwise, a broken symlink will lead us to remove it from the cache. - visited[diskByIdPath] = struct{}{} - - realFSPath, err := l.fs.EvalSymlinks(diskByIdPath) - if err != nil { - errs = append(errs, fmt.Errorf("failed to evaluate symlink for %s: %w", diskByIdPath, err)) - l.links.BrokenSymlink(diskByIdPath) - continue - } - - l.links.AddOrUpdateDevice(diskByIdPath, realFSPath) - } - - for _, id := range l.links.DeviceIDs() { - if _, found := visited[id]; !found { - l.links.RemoveDevice(id) - } - } - - if len(errs) > 0 { - return fmt.Errorf("failed to evaluate symlinks for %d devices: %v", len(errs), errs) - } - return nil -} - -// linkCache is a structure that maintains a cache of symlinks between -// /dev/disk/by-id and /dev/sd* paths. It provides methods to add/update, -// retrieve, and remove device symlinks from the cache. -type linkCache struct { - devices map[string]linkCacheEntry -} - -type linkCacheEntry struct { - path string - // If true, the symlink is known to be broken. - brokenSymlink bool -} - -func newLinkCache() *linkCache { - return &linkCache{ - devices: make(map[string]linkCacheEntry), - } -} - -// AddOrUpdateDevice adds a new device or updates an existing device in the cache. -// It ignores partition symlinks as they are considered noise for logging purposes. -// If the symlink already exists and the real path has changed, it logs the update. -func (d *linkCache) AddOrUpdateDevice(symlink, realPath string) { - // Ignore partitions, which are noise as far as our logging is concerned. - // Expression: -part[0-9]+$ - if partitionNameRegex.MatchString(symlink) { - // TODO(juliankatz): To have certainty this works for all edge cases, we - // need to test this with a manually partitioned disk. - return - } - - prevEntry, exists := d.devices[symlink] - if !exists || prevEntry.path != realPath { - klog.Infof("Symlink updated for link %s, previous value: %s, new value: %s", symlink, prevEntry.path, realPath) - } - d.devices[symlink] = linkCacheEntry{path: realPath, brokenSymlink: false} -} - -// BrokenSymlink marks a symlink as broken. If the symlink is not in the cache, -// it is ignored. -func (d *linkCache) BrokenSymlink(symlink string) { - if entry, ok := d.devices[symlink]; ok { - entry.brokenSymlink = true - d.devices[symlink] = entry - } -} - -func (d *linkCache) RemoveDevice(symlink string) { - if entry, ok := d.devices[symlink]; ok { - klog.Infof("Removing device %s with path %s from cache, brokenSymlink: %t", symlink, entry.path, entry.brokenSymlink) - delete(d.devices, symlink) - } -} - -func (d *linkCache) DeviceIDs() []string { - return slices.Collect(maps.Keys(d.devices)) -} - -func (d *linkCache) String() string { - var sb strings.Builder - for symlink, entry := range d.devices { - if entry.brokenSymlink { - sb.WriteString(fmt.Sprintf("%s -> broken symlink... last known value: %s; ", symlink, entry.path)) - } else { - sb.WriteString(fmt.Sprintf("%s -> %s; ", symlink, entry.path)) - } - } - return strings.TrimSuffix(sb.String(), "; ") -} diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go index 0b811e833..aeb2416bf 100644 --- a/pkg/linkcache/cache_test.go +++ b/pkg/linkcache/cache_test.go @@ -1,199 +1 @@ package linkcache - -import ( - "os" - "testing" - "testing/fstest" - - "github.com/google/go-cmp/cmp" -) - -var allowUnexportedLinkCache = cmp.AllowUnexported(linkCache{}, linkCacheEntry{}) - -const ( - // Test disk names in /dev/disk/by-id format - gcpPersistentDiskID = "google-persistent-disk-0" - gcpPVCID = "google-pvc-f5418f78-dc07-4d69-9487-6c4a7232dd67" - gcpPersistentDiskPartitionID = "google-persistent-disk-0-part1" - - // Test device paths in /dev format - devicePathSDA = "/dev/sda" - devicePathSDB = "/dev/sdb" -) - -// mockFS implements fsInterface for testing -type mockFS struct { - fstest.MapFS - symlinks map[string]string -} - -func newMockFS() *mockFS { - return &mockFS{ - MapFS: make(fstest.MapFS), - symlinks: make(map[string]string), - } -} - -func (m *mockFS) ReadDir(name string) ([]os.DirEntry, error) { - entries, err := m.MapFS.ReadDir(name) - if err != nil { - return nil, err - } - return entries, nil -} - -func (m *mockFS) EvalSymlinks(path string) (string, error) { - if target, ok := m.symlinks[path]; ok { - return target, nil - } - return "", os.ErrNotExist -} - -func TestListAndUpdate(t *testing.T) { - tests := []struct { - name string - setupFS func(*mockFS) - expectedCache *linkCache - expectError bool - }{ - { - name: "valid symlinks", - setupFS: func(m *mockFS) { - // Create some device files - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - m.MapFS[gcpPVCID] = &fstest.MapFile{} - // Create symlinks - m.symlinks[gcpPersistentDiskID] = devicePathSDA - m.symlinks[gcpPVCID] = devicePathSDB - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, - }, - }, - expectError: false, - }, - { - name: "broken symlink not added to cache", - setupFS: func(m *mockFS) { - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - // No symlink target for gcpPersistentDiskID - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{}, - }, - expectError: true, - }, - { - name: "partition files ignored", - setupFS: func(m *mockFS) { - m.MapFS[gcpPersistentDiskPartitionID] = &fstest.MapFile{} - m.MapFS[gcpPersistentDiskID] = &fstest.MapFile{} - m.symlinks[gcpPersistentDiskID] = devicePathSDA - m.symlinks[gcpPersistentDiskPartitionID] = devicePathSDA + "1" - }, - expectedCache: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - }, - }, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mock := newMockFS() - tt.setupFS(mock) - - cache := NewListingCache(0, ".") - cache.fs = mock // Inject our mock filesystem - err := cache.listAndUpdate() - - if tt.expectError { - if err == nil { - t.Error("expected error but got none") - } - } else { - if err != nil { - t.Errorf("unexpected error: %v", err) - } - } - - // Compare the entire cache state - if diff := cmp.Diff(tt.expectedCache, cache.links, allowUnexportedLinkCache); diff != "" { - t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) - } - }) - } -} - -func TestLinkCache(t *testing.T) { - tests := []struct { - name string - setupCache func(*linkCache) - expected *linkCache - }{ - { - name: "AddOrUpdateDevice", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.AddOrUpdateDevice("symlink2", "/dev/sdb") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - "symlink1": {path: "/dev/sda", brokenSymlink: false}, - "symlink2": {path: "/dev/sdb", brokenSymlink: false}, - }, - }, - }, - { - name: "BrokenSymlink", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.BrokenSymlink("symlink1") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - "symlink1": {path: "/dev/sda", brokenSymlink: true}, - }, - }, - }, - { - name: "RemoveDevice", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice("symlink1", "/dev/sda") - lc.RemoveDevice("symlink1") - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{}, - }, - }, - { - name: "PartitionIgnored", - setupCache: func(lc *linkCache) { - lc.AddOrUpdateDevice(gcpPersistentDiskPartitionID, devicePathSDA+"1") - lc.AddOrUpdateDevice(gcpPersistentDiskID, devicePathSDA) - lc.AddOrUpdateDevice(gcpPVCID, devicePathSDB) - }, - expected: &linkCache{ - devices: map[string]linkCacheEntry{ - gcpPersistentDiskID: {path: devicePathSDA, brokenSymlink: false}, - gcpPVCID: {path: devicePathSDB, brokenSymlink: false}, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cache := newLinkCache() - tt.setupCache(cache) - - if diff := cmp.Diff(tt.expected, cache, allowUnexportedLinkCache); diff != "" { - t.Errorf("linkCache mismatch (-expected +got):\n%s", diff) - } - }) - } -} diff --git a/pkg/linkcache/devices.go b/pkg/linkcache/devices.go new file mode 100644 index 000000000..963ba43a9 --- /dev/null +++ b/pkg/linkcache/devices.go @@ -0,0 +1,136 @@ +package linkcache + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient" +) + +const byIdDir = "/dev/disk/by-id" + +type deviceMapping struct { + symlink string + realPath string +} + +type DeviceCache struct { + volumes map[string]deviceMapping + period time.Duration + // dir is the directory to look for device symlinks + dir string +} + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { + node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) + if err != nil { + return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) + } + + return newDeviceCacheForNode(ctx, period, node) +} + +func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.Node) (*DeviceCache, error) { + deviceCache := &DeviceCache{ + volumes: make(map[string]deviceMapping), + period: period, + dir: byIdDir, + } + + // Look at the status.volumesInUse field. For each, take the last section + // of the string (after the last "/") and call AddVolume for that + for _, volume := range node.Status.VolumesInUse { + klog.Infof("Adding volume %s to cache", string(volume)) + volumeID := strings.Split(string(volume), "^")[1] + deviceCache.AddVolume(volumeID) + } + + return deviceCache, nil +} + +// Run since it needs an infinite loop to keep itself up to date +func (d *DeviceCache) Run(ctx context.Context) { + klog.Infof("Starting device cache watcher for directory %s with period %s", d.dir, d.period) + + // Start the loop that runs every minute + ticker := time.NewTicker(d.period) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + klog.Infof("Context done, stopping watcher") + return + case <-ticker.C: + d.listAndUpdate() + + klog.Infof("Cache contents: %+v", d.volumes) + } + } +} + +// Add a volume. This will yield a call to the filesystem to find a +// /dev/disk/by-id symlink and an evaluation of that symlink. +func (d *DeviceCache) AddVolume(volumeID string) error { + klog.Infof("Adding volume %s to cache", volumeID) + + _, volumeKey, err := common.VolumeIDToKey(volumeID) + if err != nil { + return fmt.Errorf("error converting volume ID to key: %w", err) + } + deviceName, err := common.GetDeviceName(volumeKey) + if err != nil { + return fmt.Errorf("error getting device name: %w", err) + } + + // Look at the dir for a symlink that matches the pvName + symlink := filepath.Join(d.dir, "google-"+deviceName) + klog.Infof("Looking for symlink %s", symlink) + + realPath, err := filepath.EvalSymlinks(symlink) + if err != nil { + klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) + return nil + } + + klog.Infof("Found real path %s for volume %s", realPath, volumeID) + + d.volumes[volumeID] = deviceMapping{ + symlink: symlink, + realPath: realPath, + } + + return nil +} + +// Remove the volume from the cache. +func (d *DeviceCache) RemoveVolume(volumeID string) { + klog.Infof("Removing volume %s from cache", volumeID) + delete(d.volumes, volumeID) +} + +func (d *DeviceCache) listAndUpdate() { + for volumeID, device := range d.volumes { + // Evaluate the symlink + realPath, err := filepath.EvalSymlinks(device.symlink) + if err != nil { + klog.Warningf("Error evaluating symlink for volume %s: %v", volumeID, err) + continue + } + + // Check if the realPath has changed + if realPath != device.realPath { + klog.Warningf("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s", volumeID, device.symlink, device.realPath, realPath) + + // Update the cache with the new realPath + device.realPath = realPath + d.volumes[volumeID] = device + } + } +} From 6c599dd752d1dc29292382bc8196041a0f9f2feb Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 28 May 2025 12:59:51 -0700 Subject: [PATCH 11/23] Revert stable-master/image.yaml to master version --- deploy/kubernetes/images/stable-master/image.yaml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index ea85d22c3..54ebb4759 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -44,14 +44,8 @@ metadata: name: imagetag-gcepd-driver imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver -<<<<<<< HEAD - # Don't change stable image without changing pdImagePlaceholder in - # test/k8s-integration/main.go - newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver -======= # pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver ->>>>>>> 264b82ed (New implementation that is hooked into nodestage/unstage. Just linux) newTag: "latest" --- From b8f2da00406a72e2ec5a716d2063e72f66db9ac7 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 28 May 2025 13:11:52 -0700 Subject: [PATCH 12/23] Made a no-op windows implementation of the linkcache package --- manifest_osversion.sh | 4 +-- pkg/linkcache/cache_test.go | 1 - .../{devices.go => devices_linux.go} | 12 --------- pkg/linkcache/devices_windows.go | 25 +++++++++++++++++++ pkg/linkcache/types.go | 15 +++++++++++ 5 files changed, 42 insertions(+), 15 deletions(-) delete mode 100644 pkg/linkcache/cache_test.go rename pkg/linkcache/{devices.go => devices_linux.go} (94%) create mode 100644 pkg/linkcache/devices_windows.go create mode 100644 pkg/linkcache/types.go diff --git a/manifest_osversion.sh b/manifest_osversion.sh index 4feb4d5c6..37f61e3f2 100755 --- a/manifest_osversion.sh +++ b/manifest_osversion.sh @@ -4,7 +4,7 @@ set -o xtrace # The following is a workaround for issue https://github.com/moby/moby/issues/41417 # to manually inserver os.version information into docker manifest file -# TODO: once docker manifest annotation for os.versions is availabler for the installed docker here, +# TODO: once docker manifest annotation for os.versions is available for the installed docker here, # replace the following with annotation approach. https://github.com/docker/cli/pull/2578 export DOCKER_CLI_EXPERIMENTAL=enabled @@ -14,7 +14,7 @@ IFS=', ' read -r -a imagetags <<< "$WINDOWS_IMAGE_TAGS" IFS=', ' read -r -a baseimages <<< "$WINDOWS_BASE_IMAGES" MANIFEST_TAG=${STAGINGIMAGE}:${STAGINGVERSION} -# translate from image tag to docker manifest foler format +# translate from image tag to docker manifest folder format # e.g., gcr.io_k8s-staging-csi_gce-pd-windows-v2 manifest_folder=$(echo "${MANIFEST_TAG}" | sed "s|/|_|g" | sed "s/:/-/") echo ${manifest_folder} diff --git a/pkg/linkcache/cache_test.go b/pkg/linkcache/cache_test.go deleted file mode 100644 index aeb2416bf..000000000 --- a/pkg/linkcache/cache_test.go +++ /dev/null @@ -1 +0,0 @@ -package linkcache diff --git a/pkg/linkcache/devices.go b/pkg/linkcache/devices_linux.go similarity index 94% rename from pkg/linkcache/devices.go rename to pkg/linkcache/devices_linux.go index 963ba43a9..a6f61edff 100644 --- a/pkg/linkcache/devices.go +++ b/pkg/linkcache/devices_linux.go @@ -15,18 +15,6 @@ import ( const byIdDir = "/dev/disk/by-id" -type deviceMapping struct { - symlink string - realPath string -} - -type DeviceCache struct { - volumes map[string]deviceMapping - period time.Duration - // dir is the directory to look for device symlinks - dir string -} - func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { node, err := k8sclient.GetNodeWithRetry(ctx, nodeName) if err != nil { diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go new file mode 100644 index 000000000..59e8eac73 --- /dev/null +++ b/pkg/linkcache/devices_windows.go @@ -0,0 +1,25 @@ +//go:build windows + +package linkcache + +import ( + "context" + "fmt" + "time" +) + +func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { + return nil, fmt.Errorf("NewDeviceCacheForNode is not implemented for Windows") +} + +func (d *DeviceCache) Run(ctx context.Context) { + // Not implemented for Windows +} + +func (d *DeviceCache) AddVolume(volumeID string) error { + return fmt.Errorf("AddVolume is not implemented for Windows") +} + +func (d *DeviceCache) RemoveVolume(volumeID string) { + // Not implemented for Windows +} diff --git a/pkg/linkcache/types.go b/pkg/linkcache/types.go new file mode 100644 index 000000000..b8dcae3fb --- /dev/null +++ b/pkg/linkcache/types.go @@ -0,0 +1,15 @@ +package linkcache + +import "time" + +type deviceMapping struct { + symlink string + realPath string +} + +type DeviceCache struct { + volumes map[string]deviceMapping + period time.Duration + // dir is the directory to look for device symlinks + dir string +} From b241423d7ae5578ebe5229ea00ce23c4db8c0715 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 15:05:05 -0700 Subject: [PATCH 13/23] Made test device caches in node_test.go --- pkg/gce-pd-csi-driver/node_test.go | 17 +++++++++---- pkg/linkcache/devices_linux.go | 39 ++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index 4d2883ee8..5c105e2f7 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/mount-utils" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -44,11 +45,13 @@ const ( ) func getTestGCEDriver(t *testing.T) *GCEDriver { - return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) + return getCustomTestGCEDriver(t, mountmanager.NewFakeSafeMounter(), deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{defaultVolumeID})), + }) } -func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount) *GCEDriver { - return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), &NodeServerArgs{}) +func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAndMount, args *NodeServerArgs) *GCEDriver { + return getCustomTestGCEDriver(t, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), args) } func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService, args *NodeServerArgs) *GCEDriver { @@ -188,7 +191,9 @@ func TestNodeGetVolumeStats(t *testing.T) { } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{tc.volumeID})), + }) ns := gceDriver.ns req := &csi.NodeGetVolumeStatsRequest{ @@ -1147,7 +1152,9 @@ func TestNodeStageVolume(t *testing.T) { )) } mounter := mountmanager.NewFakeSafeMounterWithCustomExec(&testingexec.FakeExec{CommandScript: actionList, ExactOrder: true}) - gceDriver := getTestGCEDriverWithCustomMounter(t, mounter) + gceDriver := getTestGCEDriverWithCustomMounter(t, mounter, &NodeServerArgs{ + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{volumeID})), + }) ns := gceDriver.ns ns.SysfsPath = tempDir + "/sys" _, err := ns.NodeStageVolume(context.Background(), tc.req) diff --git a/pkg/linkcache/devices_linux.go b/pkg/linkcache/devices_linux.go index a6f61edff..96e4732b8 100644 --- a/pkg/linkcache/devices_linux.go +++ b/pkg/linkcache/devices_linux.go @@ -21,10 +21,27 @@ func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName s return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) } - return newDeviceCacheForNode(ctx, period, node) + return newDeviceCacheForNode(period, node), nil } -func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.Node) (*DeviceCache, error) { +func TestDeviceCache(period time.Duration, node *v1.Node) *DeviceCache { + return newDeviceCacheForNode(period, node) +} + +func TestNodeWithVolumes(volumes []string) *v1.Node { + volumesInUse := make([]v1.UniqueVolumeName, len(volumes)) + for i, volume := range volumes { + volumesInUse[i] = v1.UniqueVolumeName("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume) + } + + return &v1.Node{ + Status: v1.NodeStatus{ + VolumesInUse: volumesInUse, + }, + } +} + +func newDeviceCacheForNode(period time.Duration, node *v1.Node) *DeviceCache { deviceCache := &DeviceCache{ volumes: make(map[string]deviceMapping), period: period, @@ -35,11 +52,23 @@ func newDeviceCacheForNode(ctx context.Context, period time.Duration, node *v1.N // of the string (after the last "/") and call AddVolume for that for _, volume := range node.Status.VolumesInUse { klog.Infof("Adding volume %s to cache", string(volume)) - volumeID := strings.Split(string(volume), "^")[1] - deviceCache.AddVolume(volumeID) + vID, err := pvNameFromVolumeID(string(volume)) + if err != nil { + klog.Warningf("failure to retrieve name, skipping volume %q: %v", string(volume), err) + continue + } + deviceCache.AddVolume(vID) } - return deviceCache, nil + return deviceCache +} + +func pvNameFromVolumeID(volumeID string) (string, error) { + tokens := strings.Split(volumeID, "^") + if len(tokens) != 2 { + return "", fmt.Errorf("invalid volume ID, split on `^` returns %d tokens, expected 2", len(tokens)) + } + return tokens[1], nil } // Run since it needs an infinite loop to keep itself up to date From f1d1be0e684481a417a1d2c46de8672349ad6f45 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Fri, 6 Jun 2025 15:16:32 -0700 Subject: [PATCH 14/23] Fix sanity test --- test/sanity/sanity_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 89192f150..408dc047e 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -33,6 +33,7 @@ import ( gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/linkcache" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -77,6 +78,7 @@ func TestSanity(t *testing.T) { EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0, EnableDataCache: enableDataCache, + DeviceCache: linkcache.TestDeviceCache(1*time.Minute, linkcache.TestNodeWithVolumes([]string{})), } // Initialize GCE Driver From d55470aa9462f3d261fe1f5f37eaceb637087a93 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 15:35:40 -0700 Subject: [PATCH 15/23] Only warn on failure to create cache --- cmd/gce-pd-csi-driver/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 7d9d03482..dc93a1bf3 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -278,7 +278,7 @@ func handle() { deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) if err != nil { - klog.Fatalf("Failed to create device cache: %v", err.Error()) + klog.Warningf("Failed to create device cache: %v", err.Error()) } go deviceCache.Run(ctx) @@ -311,8 +311,6 @@ func handle() { } - klog.Infof("NOT BLOCKED") - err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer) if err != nil { klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error()) From 2f4ba1296c02f42ce78445af89b33720c5fb4088 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 15:37:45 -0700 Subject: [PATCH 16/23] Only warn on windows instantiation --- pkg/linkcache/devices_windows.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go index 59e8eac73..bfed9158d 100644 --- a/pkg/linkcache/devices_windows.go +++ b/pkg/linkcache/devices_windows.go @@ -6,10 +6,13 @@ import ( "context" "fmt" "time" + + "k8s.io/klog/v2" ) func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { - return nil, fmt.Errorf("NewDeviceCacheForNode is not implemented for Windows") + klog.Warningf("NewDeviceCacheForNode is not implemented for Windows") + return nil, nil } func (d *DeviceCache) Run(ctx context.Context) { From a7b3bbb40eb85e8feb085387b187ed3ae07bb03e Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:02:19 -0700 Subject: [PATCH 17/23] Make non-implemented on windows an info --- pkg/linkcache/devices_windows.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/linkcache/devices_windows.go b/pkg/linkcache/devices_windows.go index bfed9158d..b767556ea 100644 --- a/pkg/linkcache/devices_windows.go +++ b/pkg/linkcache/devices_windows.go @@ -4,14 +4,13 @@ package linkcache import ( "context" - "fmt" "time" "k8s.io/klog/v2" ) func NewDeviceCacheForNode(ctx context.Context, period time.Duration, nodeName string) (*DeviceCache, error) { - klog.Warningf("NewDeviceCacheForNode is not implemented for Windows") + klog.Infof("NewDeviceCacheForNode is not implemented for Windows") return nil, nil } @@ -20,7 +19,8 @@ func (d *DeviceCache) Run(ctx context.Context) { } func (d *DeviceCache) AddVolume(volumeID string) error { - return fmt.Errorf("AddVolume is not implemented for Windows") + klog.Infof("AddVolume is not implemented for Windows") + return nil } func (d *DeviceCache) RemoveVolume(volumeID string) { From a64ceeda8bd696b69ea6f998bb60a16865ba4be9 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:05:47 -0700 Subject: [PATCH 18/23] Fix stable-master/image.yaml --- deploy/kubernetes/images/stable-master/image.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index 54ebb4759..e436c2579 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -45,7 +45,7 @@ metadata: imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver # pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag - newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver - newTag: "latest" + newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver + newTag: "v1.17.2" --- From 1f6ace0d00438e183375591c6678c154137a5fc7 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:44:19 -0700 Subject: [PATCH 19/23] Improved some error messages to provide better test failure feedback --- test/remote/setup-teardown.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/remote/setup-teardown.go b/test/remote/setup-teardown.go index 3026b28b0..14cbbbf19 100644 --- a/test/remote/setup-teardown.go +++ b/test/remote/setup-teardown.go @@ -73,7 +73,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes archiveName := fmt.Sprintf("e2e_driver_binaries_%s.tar.gz", uuid.NewUUID()) archivePath, err := CreateDriverArchive(archiveName, instance.cfg.Architecture, config.PkgPath, config.BinPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create driver archive: %v", err.Error()) } defer func() { err = os.Remove(archivePath) @@ -92,7 +92,7 @@ func SetupNewDriverAndClient(instance *InstanceInfo, config *ClientConfig) (*Tes // Upload archive to instance and run binaries driverPID, err := instance.UploadAndRun(archivePath, config.WorkspaceDir, config.RunDriverCmd) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to upload and run driver: %v", err.Error()) } // Create an SSH tunnel from port to port From 833ef6ea4865574563cf959214eecbcb6f162df0 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 16:47:18 -0700 Subject: [PATCH 20/23] Always print helpful logs in failing area --- test/remote/runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/remote/runner.go b/test/remote/runner.go index 860646e71..87543e433 100644 --- a/test/remote/runner.go +++ b/test/remote/runner.go @@ -29,7 +29,7 @@ import ( func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd string) (int, error) { // Create the temp staging directory - klog.V(4).Infof("Staging test binaries on %q", i.cfg.Name) + klog.Infof("Staging test binaries on %q", i.cfg.Name) // Do not sudo here, so that we can use scp to copy test archive to the directdory. if output, err := i.SSHNoSudo("mkdir", remoteWorkspace); err != nil { @@ -49,7 +49,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s fmt.Sprintf("cd %s", remoteWorkspace), fmt.Sprintf("tar -xzvf ./%s", archiveName), ) - klog.V(4).Infof("Extracting tar on %q", i.cfg.Name) + klog.Infof("Extracting tar on %q", i.cfg.Name) // Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but // we want the extracted files to be owned by the current user. if output, err := i.SSHNoSudo("sh", "-c", cmd); err != nil { @@ -57,7 +57,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s return -1, fmt.Errorf("failed to extract test archive: %v, output: %q", err.Error(), output) } - klog.V(4).Infof("Starting driver on %q", i.cfg.Name) + klog.Infof("Starting driver on %q", i.cfg.Name) // When the process is killed the driver should close the TCP endpoint, then we want to download the logs output, err := i.SSH(driverRunCmd) if err != nil { From b6a601bfc1736344a0442d72cbb1d5325f47b280 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:11:28 -0700 Subject: [PATCH 21/23] Remove now unnecessary corp-helper when running from cloudtop --- test/remote/instance.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/remote/instance.go b/test/remote/instance.go index 554e7612e..84c56625f 100644 --- a/test/remote/instance.go +++ b/test/remote/instance.go @@ -235,7 +235,7 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { } if i.cfg.CloudtopHost { - output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project, "--", "-o", "ProxyCommand=corp-ssh-helper %h %p", "--", "echo").CombinedOutput() + output, err := exec.Command("gcloud", "compute", "ssh", i.cfg.Name, "--zone", i.cfg.Zone, "--project", i.cfg.Project).CombinedOutput() if err != nil { klog.Errorf("Failed to bootstrap ssh (%v): %s", err, string(output)) return false, nil @@ -257,9 +257,8 @@ func (i *InstanceInfo) CreateOrGetInstance(localSSDCount int) error { return true, nil }) - // If instance didn't reach running state in time, return with error now. if err != nil { - return err + return fmt.Errorf("instance %v did not reach running state in time: %v", i.cfg.Name, err.Error()) } // Instance reached running state in time, make sure that cloud-init is complete From 99d9ff3e154fc175bfa3e4a27559c606338c8609 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:24:41 -0700 Subject: [PATCH 22/23] Only run device cache if successfully created --- cmd/gce-pd-csi-driver/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index dc93a1bf3..e181aff3a 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -279,8 +279,9 @@ func handle() { deviceCache, err := linkcache.NewDeviceCacheForNode(ctx, 1*time.Minute, *nodeName) if err != nil { klog.Warningf("Failed to create device cache: %v", err.Error()) + } else { + go deviceCache.Run(ctx) } - go deviceCache.Run(ctx) // TODO(2042): Move more of the constructor args into this struct nsArgs := &driver.NodeServerArgs{ From 5832ea4dd2ef1e2dacadfd4c6b101bb725dfb659 Mon Sep 17 00:00:00 2001 From: juliankatz Date: Wed, 2 Jul 2025 17:27:39 -0700 Subject: [PATCH 23/23] Replace verbosities --- test/remote/runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/remote/runner.go b/test/remote/runner.go index 87543e433..860646e71 100644 --- a/test/remote/runner.go +++ b/test/remote/runner.go @@ -29,7 +29,7 @@ import ( func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd string) (int, error) { // Create the temp staging directory - klog.Infof("Staging test binaries on %q", i.cfg.Name) + klog.V(4).Infof("Staging test binaries on %q", i.cfg.Name) // Do not sudo here, so that we can use scp to copy test archive to the directdory. if output, err := i.SSHNoSudo("mkdir", remoteWorkspace); err != nil { @@ -49,7 +49,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s fmt.Sprintf("cd %s", remoteWorkspace), fmt.Sprintf("tar -xzvf ./%s", archiveName), ) - klog.Infof("Extracting tar on %q", i.cfg.Name) + klog.V(4).Infof("Extracting tar on %q", i.cfg.Name) // Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but // we want the extracted files to be owned by the current user. if output, err := i.SSHNoSudo("sh", "-c", cmd); err != nil { @@ -57,7 +57,7 @@ func (i *InstanceInfo) UploadAndRun(archivePath, remoteWorkspace, driverRunCmd s return -1, fmt.Errorf("failed to extract test archive: %v, output: %q", err.Error(), output) } - klog.Infof("Starting driver on %q", i.cfg.Name) + klog.V(4).Infof("Starting driver on %q", i.cfg.Name) // When the process is killed the driver should close the TCP endpoint, then we want to download the logs output, err := i.SSH(driverRunCmd) if err != nil {