Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ The AWS EBS CSI Driver supports [tagging](tagging.md) through `StorageClass.para
|------------------------------|-------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| "csi.storage.k8s.io/fstype" | xfs, ext3, ext4 | ext4 | File system type that will be formatted during volume creation. This parameter is case sensitive! |
| "type" | io1, io2, gp2, gp3, sc1, st1, standard | gp3* | EBS volume type. |
| "iopsPerGB" | | | I/O operations per second per GiB. Can be specified for IO1, IO2, and GP3 volumes. |
| "iopsPerGB" | | | I/O operations per second per GiB. Can be specified for IO1, IO2, and GP3. If `iopsPerGB * <volume size>` exceed volume limits, the IOPS will be capped at the maximum allowed for that volume type and the call will succeed. volumes. |
| "allowAutoIOPSPerGBIncrease" | true, false | false | When `"true"`, the CSI driver increases IOPS for a volume when `iopsPerGB * <volume size>` is too low to fit into IOPS range supported by AWS. This allows dynamic provisioning to always succeed, even when user specifies too small PVC capacity or `iopsPerGB` value. On the other hand, it may introduce additional costs, as such volumes have higher IOPS than requested in `iopsPerGB`. |
| "allowAutoIOPSIncreaseOnModify" | true, false | false | When `"true"`, the CSI Driver adjusts IOPS for a volume during resizing if `iopsPerGB` is set. This ensures that the volume maintains the desired IOPS/GiB ratio.
| "iops" | | | I/O operations per second. Can be specified for IO1, IO2, and GP3 volumes. |
| "throughput" | | 125 | Throughput in MiB/s. Only effective when gp3 volume type is specified. If empty, it will set to 125MiB/s as documented [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-volume-types.html). |
| "encrypted" | true, false | false | Whether the volume should be encrypted or not. Valid values are "true" or "false". |
Expand Down
188 changes: 139 additions & 49 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ const (
KubernetesTagKeyPrefix = "kubernetes.io"
// AwsEbsDriverTagKey is the tag to identify if a volume/snapshot is managed by ebs csi driver.
AwsEbsDriverTagKey = util.DriverName + "/cluster"
// AllowAutoIOPSIncreaseOnModifyKey is the tag key for allowing IOPS increase on resizing if IOPSPerGB is set to ensure desired ratio is maintained.
AllowAutoIOPSIncreaseOnModifyKey = util.DriverName + "/AllowAutoIOPSIncreaseOnModify"
// IOPSPerGBKey represents the tag key for IOPS per GB.
IOPSPerGBKey = util.DriverName + "/IOPSPerGb"
)

// Batcher.
Expand Down Expand Up @@ -250,9 +254,11 @@ type DiskOptions struct {

// ModifyDiskOptions represents parameters to modify an EBS volume.
type ModifyDiskOptions struct {
VolumeType string
IOPS int32
Throughput int32
VolumeType string
IOPS int32
Throughput int32
IOPSPerGB int32
AllowIopsIncreaseOnResize bool
}

// iopsLimits represents the IOPS limits set by EBS of a volume dependent on the volume type.
Expand Down Expand Up @@ -905,27 +911,89 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if err != nil {
return 0, err
}
needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options)
request := &ec2.DescribeVolumesInput{
VolumeIds: []string{volumeID},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

needsModification, volumeSize, err := c.validateVolumeState(ctx, volumeID, newSizeGiB, *volume.Size, options)
if err != nil || !needsModification {
return volumeSize, err
}

if options.IOPS > 0 && options.IOPSPerGB > 0 {
return 0, errors.New("invalid VAC parameter; specify either IOPS or IOPSPerGb, not both")
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
if newSizeBytes != 0 {
req.Size = aws.Int32(newSizeGiB)
}
if options.IOPS != 0 {
req.Iops = aws.Int32(options.IOPS)
}
volTypeToUse := volume.VolumeType
if options.VolumeType != "" {
req.VolumeType = types.VolumeType(options.VolumeType)
volTypeToUse = req.VolumeType
}
if options.Throughput != 0 {
req.Throughput = aws.Int32(options.Throughput)
}

var sizeToUse int32
if req.Size != nil {
sizeToUse = *req.Size
} else {
sizeToUse = *volume.Size
}

allowautoincreaseIsSet, iopsPerGbVal, err := c.checkIfIopsIncreaseOnExpansion(volume.Tags)
if err != nil {
return 0, err
}
var iopsForModify int32
switch {
case options.IOPS != 0:
iopsForModify = options.IOPS
case options.IOPSPerGB != 0 && (options.AllowIopsIncreaseOnResize || allowautoincreaseIsSet):
iopsForModify = sizeToUse * options.IOPSPerGB
case iopsPerGbVal > 0 && (options.AllowIopsIncreaseOnResize || allowautoincreaseIsSet):
iopsForModify = sizeToUse * iopsPerGbVal
}
if iopsForModify != 0 {
azParams := getVolumeLimitsParams{}

if volume.AvailabilityZone != nil {
azParams.availabilityZone = *volume.AvailabilityZone
}
if volume.AvailabilityZoneId != nil {
azParams.availabilityZoneId = *volume.AvailabilityZoneId
}
// EBS doesn't handle empty outpost arn, so we have to include it only when it's non-empty
if volume.OutpostArn != nil {
azParams.outpostArn = *volume.OutpostArn
}
iopsLimit, err := c.getVolumeLimits(ctx, string(volTypeToUse), azParams)
if err != nil {
return 0, err
}
// Even if volume type does not support IOPS, still pass value and let EC2 handle error.
if iopsLimit.maxIops == 0 {
req.Iops = aws.Int32(iopsForModify)
} else {
req.Iops = aws.Int32(capIOPS(string(volTypeToUse), sizeToUse, iopsForModify, iopsLimit, true))
}
options.IOPS = *req.Iops
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make the code saner if we passed req into validateModifyVolume instead of options?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also mean we would have to modify validateVolumeState which is called much earlier on before we construct the request input with the options, I would prefer to leave as is unless you feel there is a very strong reason to make the switch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we have to modify validateVolumeState?

Copy link
Member Author

@mdzraf mdzraf Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it also calls checkDesiredState which would also have to be changed to take in req instead of options if we did this.

}

needsModification, volumeSize, err = c.validateModifyVolume(ctx, volumeID, newSizeGiB, options, *volume)
if err != nil || !needsModification {
return volumeSize, err
}

response, err := c.ec2.ModifyVolume(ctx, req, func(o *ec2.Options) {
o.Retryer = c.rm.modifyVolumeRetryer
})
Expand Down Expand Up @@ -2404,7 +2472,7 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func needsVolumeModification(volume types.Volume, newSizeGiB int32, options *ModifyDiskOptions) bool {
func needsVolumeModification(volume types.Volume, newSizeGiB int32, req *ModifyDiskOptions) bool {
oldSizeGiB := *volume.Size
//nolint:staticcheck // staticcheck suggests merging all of the below conditionals into one line,
// but that would be extremely difficult to read
Expand All @@ -2414,74 +2482,107 @@ func needsVolumeModification(volume types.Volume, newSizeGiB int32, options *Mod
needsModification = true
}

if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != options.IOPS) {
if req.IOPS != 0 && (volume.Iops == nil || *volume.Iops != req.IOPS) {
needsModification = true
}

if options.VolumeType != "" && !strings.EqualFold(string(volume.VolumeType), options.VolumeType) {
if req.VolumeType != "" && !strings.EqualFold(string(volume.VolumeType), req.VolumeType) {
needsModification = true
}

if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != options.Throughput) {
if req.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != req.Throughput) {
needsModification = true
}

return needsModification
}

func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int32, options *ModifyDiskOptions) (bool, int32, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []string{volumeID},
func getVolumeAttachmentsList(volume types.Volume) []string {
var volumeAttachmentList []string
for _, attachment := range volume.Attachments {
if attachment.State == types.VolumeAttachmentStateAttached {
volumeAttachmentList = append(volumeAttachmentList, aws.ToString(attachment.InstanceId))
}
}

volume, err := c.getVolume(ctx, request)
if err != nil {
return true, 0, err
}
return volumeAttachmentList
}

if volume.Size == nil {
return true, 0, fmt.Errorf("volume %q has no size", volumeID)
// Checks if a volume's IOPS can be increased on expansion to adhere to IopsPerGB ratio.
func (c *cloud) checkIfIopsIncreaseOnExpansion(existingTags []types.Tag) (allowautoincreaseIsSet bool, iopsPerGbVal int32, err error) {
for _, tag := range existingTags {
switch *tag.Key {
case IOPSPerGBKey:
iopsPerGbVal64, err := strconv.ParseInt(*tag.Value, 10, 32)
if err != nil {
return false, 0, fmt.Errorf("%w: %w", ErrInvalidArgument, err)
}
iopsPerGbVal = int32(iopsPerGbVal64)
case AllowAutoIOPSIncreaseOnModifyKey:
allowautoincreaseIsSet, err = strconv.ParseBool(*tag.Value)
if err != nil {
return false, 0, fmt.Errorf("%w: %w", ErrInvalidArgument, err)
}
}
}
oldSizeGiB := *volume.Size
return allowautoincreaseIsSet, iopsPerGbVal, nil
}

// This call must NOT be batched because a missing volume modification will return client error
func (c *cloud) getVolumeModificationState(ctx context.Context, volumeID string) (*types.VolumeModification, error) {
latestMod, err := c.getLatestVolumeModification(ctx, volumeID, false)
if err != nil && !errors.Is(err, ErrVolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
return nil, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}
return latestMod, nil
}

func (c *cloud) validateVolumeState(ctx context.Context, volumeID string, newSizeGiB int32, oldSizeGiB int32, options *ModifyDiskOptions) (bool, int32, error) {
latestMod, err := c.getVolumeModificationState(ctx, volumeID)
if err != nil {
return false, 0, err
}

state := ""
// latestMod can be nil if the volume has never been modified
if latestMod != nil {
state = string(latestMod.ModificationState)
if state == string(types.VolumeModificationStateModifying) {
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return true, oldSizeGiB, err
}
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
if latestMod != nil && string(latestMod.ModificationState) == string(types.VolumeModificationStateModifying) {
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return false, oldSizeGiB, err
}
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}
return true, 0, nil
}

func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int32, options *ModifyDiskOptions, volume types.Volume) (bool, int32, error) {
if volume.Size == nil {
return true, 0, fmt.Errorf("volume %q has no size", volumeID)
}
oldSizeGiB := *volume.Size

// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
// This is because as of March 2024, EC2 ModifyVolume calls that don't change any parameters still modify the volume
if !needsVolumeModification(*volume, newSizeGiB, options) {
if !needsVolumeModification(volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
// state before the volume is actually finished modifying
err = c.waitForVolumeModification(ctx, volumeID)
err := c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return true, oldSizeGiB, err
}
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}

if state == string(types.VolumeModificationStateOptimizing) {
latestMod, err := c.getVolumeModificationState(ctx, volumeID)
if err != nil {
return true, 0, err
}

if latestMod != nil && string(latestMod.ModificationState) == string(types.VolumeModificationStateOptimizing) {
return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID)
}

Expand All @@ -2492,17 +2593,6 @@ func volumeModificationDone(state string) bool {
return state == string(types.VolumeModificationStateCompleted) || state == string(types.VolumeModificationStateOptimizing)
}

func getVolumeAttachmentsList(volume types.Volume) []string {
var volumeAttachmentList []string
for _, attachment := range volume.Attachments {
if attachment.State == types.VolumeAttachmentStateAttached {
volumeAttachmentList = append(volumeAttachmentList, aws.ToString(attachment.InstanceId))
}
}

return volumeAttachmentList
}

// Calculate actual IOPS for a volume and cap it at supported AWS limits.
func capIOPS(volumeType string, requestedCapacityGiB int32, requestedIops int32, iopsLimits iopsLimits, allowIncrease bool) int32 {
// If requestedIops is zero the user did not request a specific amount, and the default will be used instead
Expand Down
Loading