Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/44044.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:breaking-change
resource/aws_glue_catalog_table_optimizer: `compaction_configuration` is required when setting optimizer type to `compaction`.
```

```release-note:enhancement
resource/aws_glue_catalog_table_optimizer: `sort` and `z-order` strategies supported in compaction configuration. Requires sort order to be defined on existing table.
```
192 changes: 190 additions & 2 deletions internal/service/glue/catalog_table_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package glue

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/glue"
Expand Down Expand Up @@ -93,7 +94,40 @@ func (r *catalogTableOptimizerResource) Schema(ctx context.Context, _ resource.S
Required: true,
},
},
PlanModifiers: []planmodifier.Object{
requireMatchingOptimizerConfiguration{},
},
Blocks: map[string]schema.Block{
"compaction_configuration": schema.ListNestedBlock{
CustomType: fwtypes.NewListNestedObjectTypeOf[compactionConfigurationData](ctx),
Validators: []validator.List{
listvalidator.SizeAtMost(1),
},
NestedObject: schema.NestedBlockObject{
Blocks: map[string]schema.Block{
"iceberg_configuration": schema.ListNestedBlock{
CustomType: fwtypes.NewListNestedObjectTypeOf[icebergCompactionConfigurationData](ctx),
Validators: []validator.List{
listvalidator.SizeAtMost(1),
},
NestedObject: schema.NestedBlockObject{
Attributes: map[string]schema.Attribute{
"strategy": schema.StringAttribute{
CustomType: fwtypes.StringEnumType[awstypes.CompactionStrategy](),
Required: true,
},
"min_input_files": schema.Int32Attribute{
Optional: true,
},
"delete_file_threshold": schema.Int32Attribute{
Optional: true,
},
},
},
},
},
},
},
"retention_configuration": schema.ListNestedBlock{
CustomType: fwtypes.NewListNestedObjectTypeOf[retentionConfigurationData](ctx),
Validators: []validator.List{
Expand Down Expand Up @@ -278,7 +312,6 @@ func (r *catalogTableOptimizerResource) Update(ctx context.Context, request reso
}

_, err := conn.UpdateTableOptimizer(ctx, &input)

if err != nil {
id, _ := flex.FlattenResourceId([]string{
state.CatalogID.ValueString(),
Expand Down Expand Up @@ -344,7 +377,6 @@ func (r *catalogTableOptimizerResource) Delete(ctx context.Context, request reso

func (r *catalogTableOptimizerResource) ImportState(ctx context.Context, request resource.ImportStateRequest, response *resource.ImportStateResponse) {
parts, err := flex.ExpandResourceId(request.ID, idParts, false)

if err != nil {
response.Diagnostics.AddError(
create.ProblemStandardMessage(names.Glue, create.ErrActionImporting, ResNameCatalogTableOptimizer, request.ID, err),
Expand Down Expand Up @@ -373,6 +405,7 @@ type configurationData struct {
RoleARN fwtypes.ARN `tfsdk:"role_arn"`
RetentionConfiguration fwtypes.ListNestedObjectValueOf[retentionConfigurationData] `tfsdk:"retention_configuration"`
OrphanFileDeletionConfiguration fwtypes.ListNestedObjectValueOf[orphanFileDeletionConfigurationData] `tfsdk:"orphan_file_deletion_configuration"`
CompactionConfiguration fwtypes.ListNestedObjectValueOf[compactionConfigurationData] `tfsdk:"compaction_configuration"`
}

type retentionConfigurationData struct {
Expand All @@ -394,6 +427,16 @@ type icebergOrphanFileDeletionConfigurationData struct {
Location types.String `tfsdk:"location"`
}

type compactionConfigurationData struct {
IcebergConfiguration fwtypes.ListNestedObjectValueOf[icebergCompactionConfigurationData] `tfsdk:"iceberg_configuration"`
}

type icebergCompactionConfigurationData struct {
DeleteFileThreshold types.Int32 `tfsdk:"delete_file_threshold"`
MinInputFiles types.Int32 `tfsdk:"min_input_files"`
Strategy fwtypes.StringEnum[awstypes.CompactionStrategy] `tfsdk:"strategy"`
}

func findCatalogTableOptimizer(ctx context.Context, conn *glue.Client, catalogID, dbName, tableName, optimizerType string) (*glue.GetTableOptimizerOutput, error) {
input := &glue.GetTableOptimizerInput{
CatalogId: aws.String(catalogID),
Expand Down Expand Up @@ -421,3 +464,148 @@ func findCatalogTableOptimizer(ctx context.Context, conn *glue.Client, catalogID

return output, nil
}

type requireMatchingOptimizerConfiguration struct{}

func (m requireMatchingOptimizerConfiguration) Description(ctx context.Context) string {
return "Requires the optimizer configuration to match the optimizer type"
}

func (m requireMatchingOptimizerConfiguration) MarkdownDescription(ctx context.Context) string {
return m.Description(ctx)
}

func (m requireMatchingOptimizerConfiguration) PlanModifyObject(ctx context.Context, req planmodifier.ObjectRequest, resp *planmodifier.ObjectResponse) {
if req.State.Raw.IsNull() {
return
}

if req.Plan.Raw.IsNull() {
return
}

if req.PlanValue.Equal(req.StateValue) {
return
}

var plan catalogTableOptimizerResourceModel
diag := req.Plan.Get(ctx, &plan)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting optimizer configuration: %v, %v\n", d.Summary(), d.Detail()))
}
return
}

if plan.Configuration.IsNull() {
if plan.Type.ValueString() == string(awstypes.TableOptimizerTypeCompaction) {
resp.Diagnostics.Append(fwdiag.NewAttributeRequiredWhenError(path.Root(names.AttrConfiguration), path.Root(names.AttrType), string(awstypes.TableOptimizerTypeCompaction)))
}
return
}

configData, diag := plan.Configuration.ToPtr(ctx)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting compaction configuration: %v, %v\n", d.Summary(), d.Detail()))
}
return
}

compactionConfig, diag := configData.CompactionConfiguration.ToPtr(ctx)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting compaction configuration: %v, %v\n", d.Summary(), d.Detail()))
}
}

retentionConfig, diag := configData.RetentionConfiguration.ToPtr(ctx)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting retention configuration: %v, %v\n", d.Summary(), d.Detail()))
}
}

orphanFileDeletionConfig, diag := configData.OrphanFileDeletionConfiguration.ToPtr(ctx)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting orphan file deletion configuration: %v, %v\n", d.Summary(), d.Detail()))
}
}

switch plan.Type.ValueString() {
case string(awstypes.TableOptimizerTypeCompaction):
if compactionConfig == nil {
resp.Diagnostics.Append(fwdiag.NewAttributeRequiredWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("compaction_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeCompaction)))
return
}

if retentionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("retention_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeCompaction)))
return
}

if orphanFileDeletionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("orphan_file_deletion_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeCompaction)))
return
}

icebergConfig, diag := compactionConfig.IcebergConfiguration.ToPtr(ctx)
if diag.HasError() {
for _, d := range diag.Errors() {
tflog.Error(ctx, fmt.Sprintf("error getting iceberg configuration: %v, %v\n", d.Summary(), d.Detail()))
}
return
}
if icebergConfig == nil {
resp.Diagnostics.Append(fwdiag.NewAttributeRequiredWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("compaction_configuration").AtListIndex(0).AtName("iceberg_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeCompaction)))
return
}

case string(awstypes.TableOptimizerTypeRetention):
if compactionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("compaction_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeRetention)))
return
}

if orphanFileDeletionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("orphan_file_deletion_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeRetention)))
return
}

case string(awstypes.TableOptimizerTypeOrphanFileDeletion):
if compactionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("compaction_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeOrphanFileDeletion)))
return
}

if retentionConfig != nil {
resp.Diagnostics.Append(fwdiag.NewAttributeConflictsWhenError(
path.Root(names.AttrConfiguration).AtListIndex(0).AtName("retention_configuration"),
path.Root(names.AttrType),
string(awstypes.TableOptimizerTypeOrphanFileDeletion)))
return
}
}
}
93 changes: 92 additions & 1 deletion internal/service/glue/catalog_table_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,59 @@ func testAccCatalogTableOptimizer_disappears(t *testing.T) {
})
}

func testAccCatalogTableOptimizer_CompactionConfiguration(t *testing.T) {
ctx := acctest.Context(t)
var catalogTableOptimizer glue.GetTableOptimizerOutput

resourceName := "aws_glue_catalog_table_optimizer.test"

rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

resource.Test(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.GlueServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckCatalogTableOptimizerDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccCatalogTableOptimizerConfig_compactionConfiguration(rName, 100),
Check: resource.ComposeTestCheckFunc(
testAccCheckCatalogTableOptimizerExists(ctx, resourceName, &catalogTableOptimizer),
acctest.CheckResourceAttrAccountID(ctx, resourceName, names.AttrCatalogID),
resource.TestCheckResourceAttr(resourceName, names.AttrDatabaseName, rName),
resource.TestCheckResourceAttr(resourceName, names.AttrTableName, rName),
resource.TestCheckResourceAttr(resourceName, names.AttrType, "compaction"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.enabled", acctest.CtTrue),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.strategy", "binpack"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.delete_file_threshold", "100"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.min_input_files", "10"),
),
},
{
ResourceName: resourceName,
ImportStateIdFunc: testAccCatalogTableOptimizerStateIDFunc(resourceName),
ImportStateVerifyIdentifierAttribute: names.AttrTableName,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccCatalogTableOptimizerConfig_compactionConfiguration(rName, 1000),
Check: resource.ComposeTestCheckFunc(
testAccCheckCatalogTableOptimizerExists(ctx, resourceName, &catalogTableOptimizer),
acctest.CheckResourceAttrAccountID(ctx, resourceName, names.AttrCatalogID),
resource.TestCheckResourceAttr(resourceName, names.AttrDatabaseName, rName),
resource.TestCheckResourceAttr(resourceName, names.AttrTableName, rName),
resource.TestCheckResourceAttr(resourceName, names.AttrType, "compaction"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.enabled", acctest.CtTrue),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.strategy", "binpack"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.delete_file_threshold", "1000"),
resource.TestCheckResourceAttr(resourceName, "configuration.0.compaction_configuration.0.iceberg_configuration.0.min_input_files", "10"),
),
},
},
})
}

func testAccCatalogTableOptimizer_RetentionConfiguration(t *testing.T) {
ctx := acctest.Context(t)
var catalogTableOptimizer glue.GetTableOptimizerOutput
Expand Down Expand Up @@ -253,7 +306,6 @@ func testAccCheckCatalogTableOptimizerExists(ctx context.Context, resourceName s
conn := acctest.Provider.Meta().(*conns.AWSClient).GlueClient(ctx)
resp, err := tfglue.FindCatalogTableOptimizer(ctx, conn, rs.Primary.Attributes[names.AttrCatalogID], rs.Primary.Attributes[names.AttrDatabaseName],
rs.Primary.Attributes[names.AttrTableName], rs.Primary.Attributes[names.AttrType])

if err != nil {
return create.Error(names.Glue, create.ErrActionCheckingExistence, tfglue.ResNameCatalogTableOptimizer, rs.Primary.ID, err)
}
Expand Down Expand Up @@ -427,11 +479,18 @@ resource "aws_glue_catalog_table_optimizer" "test" {
configuration {
role_arn = aws_iam_role.test.arn
enabled = true

compaction_configuration {
iceberg_configuration {
strategy = "binpack"
}
}
}
}
`,
)
}

func testAccCatalogTableOptimizerConfig_update(rName string, enabled bool) string {
return acctest.ConfigCompose(
testAccCatalogTableOptimizerConfig_baseConfig(rName),
Expand All @@ -445,11 +504,43 @@ resource "aws_glue_catalog_table_optimizer" "test" {
configuration {
role_arn = aws_iam_role.test.arn
enabled = %[1]t

compaction_configuration {
iceberg_configuration {
strategy = "binpack"
}
}
}
}
`, enabled))
}

func testAccCatalogTableOptimizerConfig_compactionConfiguration(rName string, deleteFileThreshold int) string {
return acctest.ConfigCompose(
testAccCatalogTableOptimizerConfig_baseConfig(rName),
fmt.Sprintf(`
resource "aws_glue_catalog_table_optimizer" "test" {
catalog_id = data.aws_caller_identity.current.account_id
database_name = aws_glue_catalog_database.test.name
table_name = aws_glue_catalog_table.test.name
type = "compaction"

configuration {
role_arn = aws_iam_role.test.arn
enabled = true

compaction_configuration {
iceberg_configuration {
strategy = "binpack"
delete_file_threshold = %[1]d
min_input_files = 10
}
}
}
}
`, deleteFileThreshold))
}

func testAccCatalogTableOptimizerConfig_retentionConfiguration(rName string, retentionPeriod int) string {
return acctest.ConfigCompose(
testAccCatalogTableOptimizerConfig_baseConfig(rName),
Expand Down
1 change: 1 addition & 0 deletions internal/service/glue/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestAccGlue_serial(t *testing.T) {
testCases := map[string]map[string]func(t *testing.T){
"CatalogTableOptimizer": {
acctest.CtBasic: testAccCatalogTableOptimizer_basic,
"compactionConfiguration": testAccCatalogTableOptimizer_CompactionConfiguration,
"deleteOrphanFileConfiguration": testAccCatalogTableOptimizer_DeleteOrphanFileConfiguration,
acctest.CtDisappears: testAccCatalogTableOptimizer_disappears,
"retentionConfiguration": testAccCatalogTableOptimizer_RetentionConfiguration,
Expand Down
Loading
Loading