Skip to content

Conversation

alexstoick
Copy link
Contributor

No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required.

There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.

@alexstoick alexstoick force-pushed the feat/fetch-config-from-catalog branch 2 times, most recently from 7f140e4 to 00660b8 Compare August 28, 2025 13:27
if flowConfigUpdate.SnapshotNumTablesInParallel > 0 {
state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel
}
cfg = syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this > 0 behavior was added recently in #3435, this probably regresses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we no longer have or rely on state, this should be a non-issue.

@serprex serprex self-requested a review September 9, 2025 14:57
ctx workflow.Context,
cfg *protos.FlowConnectionConfigs,
flowJobName string,
// cfg *protos.FlowConnectionConfigs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks backwards compatibility. Should still take config & pull name from that to maintain function signature

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with keeing the protos.FlowConnectionConfig here, is that in cases where this is too large - it will still fail.

Is the suggestion to pass both and then remove the cfg from here?

Copy link
Member

@serprex serprex Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, only pass cfg. new code should pass an empty cfg with only FlowJobName present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK understood, I'll go in this direction then :)

}
// because we have renamed the tables.
cfg.TableMappings = state.SyncFlowOptions.TableMappings
return nil, errors.New("cannot start CDCFlow with Resync enabled, please drop the flow and start again")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

@alexstoick alexstoick force-pushed the feat/fetch-config-from-catalog branch 2 times, most recently from 20f7fd4 to 0b42cca Compare September 15, 2025 18:01
Comment on lines 21 to 24
func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
dbCtx := context.Background()
pool, _ := GetCatalogConnectionPoolFromEnv(dbCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should take context as param

) (*protos.SetupFlowOutput, error) {
s.Info("executing setup flow")
// gotta fetch the config from the catalog.
config, err := internal.FetchConfigFromDB(flowJobName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't call this in workflows

could maybe run as side effect, but then it ends up in history

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can this not be called in workflows? 🤔

@alexstoick alexstoick force-pushed the feat/fetch-config-from-catalog branch from 0b42cca to e7712b1 Compare October 6, 2025 16:08
No longer passing the entire configuration object to the Temporal job as
we can hit the 2MB hard limit imposed by Temporal. Instead, pass
flowJobName and fetch the config from the DB in all the places required.

There are some additional changes that were required to be made, as some
of the jobs are used from multiple contexts - and I had to adapt some of
the information passed to ensure that they continue to work as expected.
@alexstoick alexstoick force-pushed the feat/fetch-config-from-catalog branch from e7712b1 to 962a495 Compare October 10, 2025 16:43
@alexstoick alexstoick requested a deployment to external-contributor October 10, 2025 16:43 — with GitHub Actions Waiting
@alexstoick alexstoick requested a deployment to external-contributor October 10, 2025 16:43 — with GitHub Actions Waiting
@alexstoick alexstoick requested a deployment to external-contributor October 10, 2025 16:43 — with GitHub Actions Waiting
@alexstoick alexstoick requested a deployment to external-contributor October 10, 2025 16:43 — with GitHub Actions Waiting
alexstoick added a commit to alexstoick/peerdb that referenced this pull request Oct 10, 2025
This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully
hydrated `protos.FlowConnectionConfigs`. When passing this config to
Temporal we strip the `tableMappings` array element which can cause it
to go over the 2MB limit.

This contains a subset of the changes which were originally proposed in:
PeerDB-io#3407
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants