Skip to content

Commit 4530ef5

Browse files
committed
feat: add workspace_name to stream connection resource and datasource
1 parent 4d06c1e commit 4530ef5

File tree

4 files changed

+183
-27
lines changed

4 files changed

+183
-27
lines changed

internal/service/streamconnection/data_source_stream_connection.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type streamConnectionDS struct {
2525

2626
func (d *streamConnectionDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
2727
resp.Schema = conversion.DataSourceSchemaFromResource(ResourceSchema(ctx), &conversion.DataSourceSchemaRequest{
28-
RequiredFields: []string{"project_id", "instance_name", "connection_name"},
28+
RequiredFields: []string{"project_id", "connection_name"},
2929
})
3030
}
3131

@@ -38,15 +38,21 @@ func (d *streamConnectionDS) Read(ctx context.Context, req datasource.ReadReques
3838

3939
connV2 := d.Client.AtlasV2
4040
projectID := streamConnectionConfig.ProjectID.ValueString()
41-
instanceName := streamConnectionConfig.InstanceName.ValueString()
41+
effectiveWorkspaceName := getEffectiveWorkspaceName(&streamConnectionConfig)
42+
if effectiveWorkspaceName == "" {
43+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
44+
return
45+
}
4246
connectionName := streamConnectionConfig.ConnectionName.ValueString()
43-
apiResp, _, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, instanceName, connectionName).Execute()
47+
apiResp, _, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, effectiveWorkspaceName, connectionName).Execute()
4448
if err != nil {
4549
resp.Diagnostics.AddError("error fetching resource", err.Error())
4650
return
4751
}
4852

49-
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, apiResp)
53+
instanceName := streamConnectionConfig.InstanceName.ValueString()
54+
workspaceName := streamConnectionConfig.WorkspaceName.ValueString()
55+
newStreamConnectionModel, diags := NewTFStreamConnectionWithInstanceName(ctx, projectID, instanceName, workspaceName, nil, apiResp)
5056
if diags.HasError() {
5157
resp.Diagnostics.Append(diags...)
5258
return

internal/service/streamconnection/data_source_stream_connections.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,22 @@ type streamConnectionsDS struct {
2828

2929
func (d *streamConnectionsDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
3030
resp.Schema = conversion.PluralDataSourceSchemaFromResource(ResourceSchema(ctx), &conversion.PluralDataSourceSchemaRequest{
31-
RequiredFields: []string{"project_id", "instance_name"},
31+
RequiredFields: []string{"project_id"},
3232
HasLegacyFields: true,
3333
})
3434
}
3535

36+
// getEffectiveWorkspaceNameForDS returns the workspace name from either instance_name or workspace_name field for datasource model
37+
func getEffectiveWorkspaceNameForDS(model *TFStreamConnectionsDSModel) string {
38+
if !model.WorkspaceName.IsNull() && !model.WorkspaceName.IsUnknown() {
39+
return model.WorkspaceName.ValueString()
40+
}
41+
if !model.InstanceName.IsNull() && !model.InstanceName.IsUnknown() {
42+
return model.InstanceName.ValueString()
43+
}
44+
return ""
45+
}
46+
3647
func (d *streamConnectionsDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
3748
var streamConnectionsConfig TFStreamConnectionsDSModel
3849
resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionsConfig)...)
@@ -42,13 +53,17 @@ func (d *streamConnectionsDS) Read(ctx context.Context, req datasource.ReadReque
4253

4354
connV2 := d.Client.AtlasV2
4455
projectID := streamConnectionsConfig.ProjectID.ValueString()
45-
instanceName := streamConnectionsConfig.InstanceName.ValueString()
56+
workspaceName := getEffectiveWorkspaceNameForDS(&streamConnectionsConfig)
57+
if workspaceName == "" {
58+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
59+
return
60+
}
4661
itemsPerPage := streamConnectionsConfig.ItemsPerPage.ValueInt64Pointer()
4762
pageNum := streamConnectionsConfig.PageNum.ValueInt64Pointer()
4863

4964
apiResp, _, err := connV2.StreamsApi.ListStreamConnectionsWithParams(ctx, &admin.ListStreamConnectionsApiParams{
5065
GroupId: projectID,
51-
TenantName: instanceName,
66+
TenantName: workspaceName,
5267
ItemsPerPage: conversion.Int64PtrToIntPtr(itemsPerPage),
5368
PageNum: conversion.Int64PtrToIntPtr(pageNum),
5469
}).Execute()
@@ -67,11 +82,12 @@ func (d *streamConnectionsDS) Read(ctx context.Context, req datasource.ReadReque
6782
}
6883

6984
type TFStreamConnectionsDSModel struct {
70-
ID types.String `tfsdk:"id"`
71-
ProjectID types.String `tfsdk:"project_id"`
72-
InstanceName types.String `tfsdk:"instance_name"`
73-
Results []TFStreamConnectionModel `tfsdk:"results"`
74-
PageNum types.Int64 `tfsdk:"page_num"`
75-
ItemsPerPage types.Int64 `tfsdk:"items_per_page"`
76-
TotalCount types.Int64 `tfsdk:"total_count"`
85+
ID types.String `tfsdk:"id"`
86+
ProjectID types.String `tfsdk:"project_id"`
87+
InstanceName types.String `tfsdk:"instance_name"`
88+
WorkspaceName types.String `tfsdk:"workspace_name"`
89+
Results []TFStreamConnectionModel `tfsdk:"results"`
90+
PageNum types.Int64 `tfsdk:"page_num"`
91+
ItemsPerPage types.Int64 `tfsdk:"items_per_page"`
92+
TotalCount types.Int64 `tfsdk:"total_count"`
7793
}

internal/service/streamconnection/resource_stream_connection.go

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type streamConnectionRS struct {
3636
type TFStreamConnectionModel struct {
3737
ID types.String `tfsdk:"id"`
3838
ProjectID types.String `tfsdk:"project_id"`
39+
WorkspaceName types.String `tfsdk:"workspace_name"`
3940
InstanceName types.String `tfsdk:"instance_name"`
4041
ConnectionName types.String `tfsdk:"connection_name"`
4142
Type types.String `tfsdk:"type"`
@@ -128,6 +129,17 @@ func (r *streamConnectionRS) Schema(ctx context.Context, req resource.SchemaRequ
128129
conversion.UpdateSchemaDescription(&resp.Schema)
129130
}
130131

132+
// getEffectiveWorkspaceName returns the workspace name from workspace_name or instance_name field
133+
func getEffectiveWorkspaceName(model *TFStreamConnectionModel) string {
134+
if !model.WorkspaceName.IsNull() && !model.WorkspaceName.IsUnknown() {
135+
return model.WorkspaceName.ValueString()
136+
}
137+
if !model.InstanceName.IsNull() && !model.InstanceName.IsUnknown() {
138+
return model.InstanceName.ValueString()
139+
}
140+
return ""
141+
}
142+
131143
func (r *streamConnectionRS) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
132144
var streamConnectionPlan TFStreamConnectionModel
133145
resp.Diagnostics.Append(req.Plan.Get(ctx, &streamConnectionPlan)...)
@@ -137,19 +149,27 @@ func (r *streamConnectionRS) Create(ctx context.Context, req resource.CreateRequ
137149

138150
connV2 := r.Client.AtlasV2
139151
projectID := streamConnectionPlan.ProjectID.ValueString()
140-
instanceName := streamConnectionPlan.InstanceName.ValueString()
152+
effectiveWorkspaceName := getEffectiveWorkspaceName(&streamConnectionPlan)
153+
if effectiveWorkspaceName == "" {
154+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
155+
return
156+
}
157+
141158
streamConnectionReq, diags := NewStreamConnectionReq(ctx, &streamConnectionPlan)
142159
if diags.HasError() {
143160
resp.Diagnostics.Append(diags...)
144161
return
145162
}
146-
apiResp, _, err := connV2.StreamsApi.CreateStreamConnection(ctx, projectID, instanceName, streamConnectionReq).Execute()
163+
apiResp, _, err := connV2.StreamsApi.CreateStreamConnection(ctx, projectID, effectiveWorkspaceName, streamConnectionReq).Execute()
147164
if err != nil {
148165
resp.Diagnostics.AddError("error creating resource", err.Error())
149166
return
150167
}
151168

152-
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, apiResp)
169+
instanceName := streamConnectionPlan.InstanceName.ValueString()
170+
workspaceName := streamConnectionPlan.WorkspaceName.ValueString()
171+
172+
newStreamConnectionModel, diags := NewTFStreamConnectionWithInstanceName(ctx, projectID, instanceName, workspaceName, &streamConnectionPlan.Authentication, apiResp)
153173
if diags.HasError() {
154174
resp.Diagnostics.Append(diags...)
155175
return
@@ -166,9 +186,13 @@ func (r *streamConnectionRS) Read(ctx context.Context, req resource.ReadRequest,
166186

167187
connV2 := r.Client.AtlasV2
168188
projectID := streamConnectionState.ProjectID.ValueString()
169-
instanceName := streamConnectionState.InstanceName.ValueString()
189+
effectiveWorkspaceName := getEffectiveWorkspaceName(&streamConnectionState)
190+
if effectiveWorkspaceName == "" {
191+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
192+
return
193+
}
170194
connectionName := streamConnectionState.ConnectionName.ValueString()
171-
apiResp, getResp, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, instanceName, connectionName).Execute()
195+
apiResp, getResp, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, effectiveWorkspaceName, connectionName).Execute()
172196
if err != nil {
173197
if validate.StatusNotFound(getResp) {
174198
resp.State.RemoveResource(ctx)
@@ -178,7 +202,9 @@ func (r *streamConnectionRS) Read(ctx context.Context, req resource.ReadRequest,
178202
return
179203
}
180204

181-
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionState.Authentication, apiResp)
205+
instanceName := streamConnectionState.InstanceName.ValueString()
206+
workspaceName := streamConnectionState.WorkspaceName.ValueString()
207+
newStreamConnectionModel, diags := NewTFStreamConnectionWithInstanceName(ctx, projectID, instanceName, workspaceName, &streamConnectionState.Authentication, apiResp)
182208
if diags.HasError() {
183209
resp.Diagnostics.Append(diags...)
184210
return
@@ -195,20 +221,26 @@ func (r *streamConnectionRS) Update(ctx context.Context, req resource.UpdateRequ
195221

196222
connV2 := r.Client.AtlasV2
197223
projectID := streamConnectionPlan.ProjectID.ValueString()
198-
instanceName := streamConnectionPlan.InstanceName.ValueString()
224+
effectiveWorkspaceName := getEffectiveWorkspaceName(&streamConnectionPlan)
225+
if effectiveWorkspaceName == "" {
226+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
227+
return
228+
}
199229
connectionName := streamConnectionPlan.ConnectionName.ValueString()
200230
streamConnectionReq, diags := NewStreamConnectionUpdateReq(ctx, &streamConnectionPlan)
201231
if diags.HasError() {
202232
resp.Diagnostics.Append(diags...)
203233
return
204234
}
205-
apiResp, _, err := connV2.StreamsApi.UpdateStreamConnection(ctx, projectID, instanceName, connectionName, streamConnectionReq).Execute()
235+
apiResp, _, err := connV2.StreamsApi.UpdateStreamConnection(ctx, projectID, effectiveWorkspaceName, connectionName, streamConnectionReq).Execute()
206236
if err != nil {
207237
resp.Diagnostics.AddError("error updating resource", err.Error())
208238
return
209239
}
210240

211-
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, apiResp)
241+
instanceName := streamConnectionPlan.InstanceName.ValueString()
242+
workspaceName := streamConnectionPlan.WorkspaceName.ValueString()
243+
newStreamConnectionModel, diags := NewTFStreamConnectionWithInstanceName(ctx, projectID, instanceName, workspaceName, &streamConnectionPlan.Authentication, apiResp)
212244
if diags.HasError() {
213245
resp.Diagnostics.Append(diags...)
214246
return
@@ -225,7 +257,11 @@ func (r *streamConnectionRS) Delete(ctx context.Context, req resource.DeleteRequ
225257

226258
connV2 := r.Client.AtlasV2
227259
projectID := streamConnectionState.ProjectID.ValueString()
228-
instanceName := streamConnectionState.InstanceName.ValueString()
260+
instanceName := getEffectiveWorkspaceName(streamConnectionState)
261+
if instanceName == "" {
262+
resp.Diagnostics.AddError("validation error", "workspace_name must be provided")
263+
return
264+
}
229265
connectionName := streamConnectionState.ConnectionName.ValueString()
230266
if err := DeleteStreamConnection(ctx, connV2.StreamsApi, projectID, instanceName, connectionName, 10*time.Minute); err != nil {
231267
resp.Diagnostics.AddError("error deleting resource", err.Error())

internal/service/streamconnection/resource_stream_connection_test.go

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@ const (
1919
dataSourceConfig = `
2020
data "mongodbatlas_stream_connection" "test" {
2121
project_id = mongodbatlas_stream_connection.test.project_id
22-
instance_name = mongodbatlas_stream_connection.test.instance_name
22+
workspace_name = mongodbatlas_stream_connection.test.workspace_name
2323
connection_name = mongodbatlas_stream_connection.test.connection_name
2424
}
2525
`
2626

2727
dataSourcePluralConfig = `
2828
data "mongodbatlas_stream_connections" "test" {
2929
project_id = mongodbatlas_stream_connection.test.project_id
30-
instance_name = mongodbatlas_stream_connection.test.instance_name
30+
workspace_name = mongodbatlas_stream_connection.test.workspace_name
3131
}
3232
`
3333
dataSourcePluralConfigWithPage = `
3434
data "mongodbatlas_stream_connections" "test" {
3535
project_id = mongodbatlas_stream_connection.test.project_id
36-
instance_name = mongodbatlas_stream_connection.test.instance_name
36+
workspace_name = mongodbatlas_stream_connection.test.workspace_name
3737
page_num = 2 # no specific reason for 2, just to test pagination
3838
items_per_page = 1
3939
}
@@ -400,6 +400,57 @@ func TestAccStreamRSStreamConnection_AWSLambda(t *testing.T) {
400400
})
401401
}
402402

403+
func TestAccStreamRSStreamConnection_workspaceName(t *testing.T) {
404+
var (
405+
projectID, instanceName = acc.ProjectIDExecutionWithStreamInstance(t)
406+
connectionName = "workspace-name-test"
407+
)
408+
409+
resource.ParallelTest(t, resource.TestCase{
410+
PreCheck: func() { acc.PreCheckBasic(t) },
411+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
412+
CheckDestroy: CheckDestroyStreamConnection,
413+
Steps: []resource.TestStep{
414+
{
415+
Config: configureKafkaWithWorkspaceName(projectID, instanceName, connectionName, "user", "password", "localhost:9092"),
416+
Check: resource.ComposeAggregateTestCheckFunc(
417+
checkStreamConnectionExists(),
418+
resource.TestCheckResourceAttr(resourceName, "workspace_name", instanceName),
419+
resource.TestCheckResourceAttr(resourceName, "connection_name", connectionName),
420+
resource.TestCheckResourceAttr(resourceName, "type", "Kafka"),
421+
resource.TestCheckNoResourceAttr(resourceName, "instance_name"),
422+
),
423+
},
424+
{
425+
ResourceName: resourceName,
426+
ImportStateIdFunc: checkStreamConnectionImportStateIDFunc(resourceName),
427+
ImportState: true,
428+
ImportStateVerify: true,
429+
ImportStateVerifyIgnore: []string{"authentication.password"},
430+
},
431+
},
432+
})
433+
}
434+
435+
func TestAccStreamRSStreamConnection_conflictingFields(t *testing.T) {
436+
var (
437+
projectID, instanceName = acc.ProjectIDExecutionWithStreamInstance(t)
438+
connectionName = "conflict-test"
439+
)
440+
441+
resource.ParallelTest(t, resource.TestCase{
442+
PreCheck: func() { acc.PreCheckBasic(t) },
443+
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
444+
CheckDestroy: CheckDestroyStreamConnection,
445+
Steps: []resource.TestStep{
446+
{
447+
Config: configureKafkaWithInstanceAndWorkspaceName(projectID, instanceName, connectionName, "user", "password", "localhost:9092"),
448+
ExpectError: regexp.MustCompile("Attribute \"instance_name\" cannot be specified when \"workspace_name\" is specified"),
449+
},
450+
},
451+
})
452+
}
453+
403454
func getKafkaAuthenticationConfig(mechanism, username, password, tokenEndpointURL, clientID, clientSecret, scope, saslOauthbearerExtensions, httpsCaPem string) string {
404455
if mechanism == "PLAIN" {
405456
return fmt.Sprintf(`authentication = {
@@ -464,6 +515,53 @@ func configureSampleStream(projectID, instanceName, sampleName string) string {
464515
`, streamInstanceConfig, sampleName)
465516
}
466517

518+
func configureKafkaWithWorkspaceName(projectID, instanceName, connectionName, username, password, bootstrapServers string) string {
519+
return fmt.Sprintf(`
520+
resource "mongodbatlas_stream_connection" "test" {
521+
project_id = %[1]q
522+
workspace_name = %[2]q
523+
connection_name = %[3]q
524+
type = "Kafka"
525+
authentication = {
526+
mechanism = "PLAIN"
527+
username = %[4]q
528+
password = %[5]q
529+
}
530+
bootstrap_servers = %[6]q
531+
config = {
532+
"auto.offset.reset": "earliest"
533+
}
534+
security = {
535+
protocol = "SASL_PLAINTEXT"
536+
}
537+
}
538+
`, projectID, instanceName, connectionName, username, password, bootstrapServers)
539+
}
540+
541+
func configureKafkaWithInstanceAndWorkspaceName(projectID, instanceName, connectionName, username, password, bootstrapServers string) string {
542+
return fmt.Sprintf(`
543+
resource "mongodbatlas_stream_connection" "test" {
544+
project_id = %[1]q
545+
instance_name = %[2]q
546+
workspace_name = %[2]q
547+
connection_name = %[3]q
548+
type = "Kafka"
549+
authentication = {
550+
mechanism = "PLAIN"
551+
username = %[4]q
552+
password = %[5]q
553+
}
554+
bootstrap_servers = %[6]q
555+
config = {
556+
"auto.offset.reset": "earliest"
557+
}
558+
security = {
559+
protocol = "SASL_PLAINTEXT"
560+
}
561+
}
562+
`, projectID, instanceName, connectionName, username, password, bootstrapServers)
563+
}
564+
467565
func checkSampleStreamAttributes(
468566
resourceName, instanceName, sampleName string) resource.TestCheckFunc {
469567
resourceChecks := []resource.TestCheckFunc{

0 commit comments

Comments
 (0)