@@ -45,18 +45,14 @@ type CDCFlowWorkflowState struct {
4545
4646// returns a new empty PeerFlowState
4747func NewCDCFlowWorkflowState (ctx workflow.Context , logger log.Logger , cfg * protos.FlowConnectionConfigs ) * CDCFlowWorkflowState {
48- tableMappings := make ([]* protos.TableMapping , 0 , len (cfg .TableMappings ))
49- for _ , tableMapping := range cfg .TableMappings {
50- tableMappings = append (tableMappings , proto .CloneOf (tableMapping ))
51- }
5248 state := CDCFlowWorkflowState {
5349 ActiveSignal : model .NoopSignal ,
5450 CurrentFlowStatus : protos .FlowStatus_STATUS_SETUP ,
5551 FlowConfigUpdate : nil ,
5652 SyncFlowOptions : & protos.SyncFlowOptions {
5753 BatchSize : cfg .MaxBatchSize ,
5854 IdleTimeoutSeconds : cfg .IdleTimeoutSeconds ,
59- TableMappings : tableMappings ,
55+ TableMappings : [] * protos. TableMapping {} ,
6056 },
6157 SnapshotNumRowsPerPartition : cfg .SnapshotNumRowsPerPartition ,
6258 SnapshotNumPartitionsOverride : cfg .SnapshotNumPartitionsOverride ,
@@ -129,6 +125,7 @@ func syncStateToConfigProtoInCatalog(
129125 cfg * protos.FlowConnectionConfigs ,
130126 state * CDCFlowWorkflowState ,
131127) * protos.FlowConnectionConfigs {
128+ // TODO: make sure that `cfg` includes table mappings.
132129 cloneCfg := updateFlowConfigWithLatestSettings (cfg , state )
133130 uploadConfigToCatalog (ctx , cloneCfg )
134131 return cloneCfg
@@ -278,7 +275,7 @@ func processTableAdditions(
278275 alterPublicationAddAdditionalTablesFuture := workflow .ExecuteActivity (
279276 alterPublicationAddAdditionalTablesCtx ,
280277 flowable .AddTablesToPublication ,
281- cfg , flowConfigUpdate .AdditionalTables )
278+ internal . MinimizeFlowConfiguration ( cfg ) , flowConfigUpdate .AdditionalTables )
282279
283280 var res * CDCFlowWorkflowResult
284281 var addTablesFlowErr error
@@ -320,6 +317,7 @@ func processTableAdditions(
320317 childAddTablesCDCFlowFuture := workflow .ExecuteChildWorkflow (
321318 childAddTablesCDCFlowCtx ,
322319 CDCFlowWorkflow ,
320+ // TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal.
323321 additionalTablesCfg ,
324322 nil ,
325323 )
@@ -394,7 +392,7 @@ func processTableRemovals(
394392 rawTableCleanupFuture := workflow .ExecuteActivity (
395393 removeTablesCtx ,
396394 flowable .RemoveTablesFromRawTable ,
397- cfg , state .FlowConfigUpdate .RemovedTables )
395+ internal . MinimizeFlowConfiguration ( cfg ) , state .FlowConfigUpdate .RemovedTables )
398396 removeTablesSelector .AddFuture (rawTableCleanupFuture , func (f workflow.Future ) {
399397 if err := f .Get (ctx , nil ); err != nil {
400398 logger .Error ("failed to clean up raw table for removed tables" , slog .Any ("error" , err ))
@@ -406,7 +404,7 @@ func processTableRemovals(
406404 removeTablesFromCatalogFuture := workflow .ExecuteActivity (
407405 removeTablesCtx ,
408406 flowable .RemoveTablesFromCatalog ,
409- cfg , state .FlowConfigUpdate .RemovedTables )
407+ internal . MinimizeFlowConfiguration ( cfg ) , state .FlowConfigUpdate .RemovedTables )
410408 removeTablesSelector .AddFuture (removeTablesFromCatalogFuture , func (f workflow.Future ) {
411409 if err := f .Get (ctx , nil ); err != nil {
412410 logger .Error ("failed to clean up raw table for removed tables" , slog .Any ("error" , err ))
@@ -576,7 +574,7 @@ func CDCFlowWorkflow(
576574
577575 logger .Info ("mirror resumed" , slog .Duration ("after" , time .Since (startTime )))
578576 state .updateStatus (ctx , logger , protos .FlowStatus_STATUS_RUNNING )
579- return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , cfg , state )
577+ return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , internal . MinimizeFlowConfiguration ( cfg ) , state )
580578 }
581579
582580 originalRunID := workflow .GetInfo (ctx ).OriginalRunID
@@ -609,22 +607,6 @@ func CDCFlowWorkflow(
609607 // for safety, rely on the idempotency of SetupFlow instead
610608 // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
611609 if state .CurrentFlowStatus != protos .FlowStatus_STATUS_RUNNING {
612- originalTableMappings := make ([]* protos.TableMapping , 0 , len (cfg .TableMappings ))
613- for _ , tableMapping := range cfg .TableMappings {
614- originalTableMappings = append (originalTableMappings , proto .CloneOf (tableMapping ))
615- }
616- // if resync is true, alter the table name schema mapping to temporarily add
617- // a suffix to the table names.
618- if cfg .Resync {
619- for _ , mapping := range state .SyncFlowOptions .TableMappings {
620- if mapping .Engine != protos .TableEngine_CH_ENGINE_NULL {
621- mapping .DestinationTableIdentifier += "_resync"
622- }
623- }
624- // because we have renamed the tables.
625- cfg .TableMappings = state .SyncFlowOptions .TableMappings
626- }
627-
628610 // start the SetupFlow workflow as a child workflow, and wait for it to complete
629611 // it should return the table schema for the source peer
630612 setupFlowID := GetChildWorkflowID ("setup-flow" , cfg .FlowJobName , originalRunID )
@@ -648,7 +630,6 @@ func CDCFlowWorkflow(
648630 state .ActiveSignal = model .ResyncSignal
649631 cfg .Resync = true
650632 cfg .DoInitialSnapshot = true
651- cfg .TableMappings = originalTableMappings
652633 // this is the only place where we can have a resync during a resync
653634 // so we need to NOT sync the tableMappings to catalog to preserve original names
654635 uploadConfigToCatalog (ctx , cfg )
@@ -672,7 +653,10 @@ func CDCFlowWorkflow(
672653 WaitForCancellation : true ,
673654 }
674655 setupFlowCtx := workflow .WithChildOptions (ctx , childSetupFlowOpts )
675- setupFlowFuture := workflow .ExecuteChildWorkflow (setupFlowCtx , SetupFlowWorkflow , cfg )
656+ // Resync will rely rely on the `cfg.Resync` flag to rename the tables
657+ // during the snapshot process. This is how we're able to also remove the need
658+ // to sync the config back into the DB / not rely on the `state.TableMappings`.
659+ setupFlowFuture := workflow .ExecuteChildWorkflow (setupFlowCtx , SetupFlowWorkflow , internal .MinimizeFlowConfiguration (cfg ))
676660
677661 var setupFlowOutput * protos.SetupFlowOutput
678662 var setupFlowError error
@@ -808,7 +792,7 @@ func CDCFlowWorkflow(
808792 logger .Info ("executed setup flow and snapshot flow, start running" )
809793 state .updateStatus (ctx , logger , protos .FlowStatus_STATUS_RUNNING )
810794 }
811- return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , cfg , state )
795+ return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , internal . MinimizeFlowConfiguration ( cfg ) , state )
812796 }
813797
814798 var finished bool
@@ -819,7 +803,8 @@ func CDCFlowWorkflow(
819803 WaitForCancellation : true ,
820804 RetryPolicy : & temporal.RetryPolicy {MaximumAttempts : 1 },
821805 }))
822- syncFlowFuture := workflow .ExecuteActivity (syncCtx , flowable .SyncFlow , cfg , state .SyncFlowOptions )
806+ state .SyncFlowOptions .TableMappings = []* protos.TableMapping {}
807+ syncFlowFuture := workflow .ExecuteActivity (syncCtx , flowable .SyncFlow , internal .MinimizeFlowConfiguration (cfg ), state .SyncFlowOptions )
823808
824809 mainLoopSelector := workflow .NewNamedSelector (ctx , "MainLoop" )
825810 mainLoopSelector .AddReceive (ctx .Done (), func (_ workflow.ReceiveChannel , _ bool ) {
@@ -940,7 +925,7 @@ func CDCFlowWorkflow(
940925 if state .ActiveSignal == model .TerminateSignal || state .ActiveSignal == model .ResyncSignal {
941926 return state , workflow .NewContinueAsNewError (ctx , DropFlowWorkflow , state .DropFlowInput )
942927 }
943- return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , cfg , state )
928+ return state , workflow .NewContinueAsNewError (ctx , CDCFlowWorkflow , internal . MinimizeFlowConfiguration ( cfg ) , state )
944929 }
945930 }
946931}
0 commit comments