-
Notifications
You must be signed in to change notification settings - Fork 134
Fetch configuration from DB #3407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fetch configuration from DB #3407
Conversation
7f140e4
to
00660b8
Compare
if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { | ||
state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel | ||
} | ||
cfg = syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this > 0
behavior was added recently in #3435, this probably regresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that we no longer have or rely on state, this should be a non-issue.
flow/workflows/cdc_flow.go
Outdated
ctx workflow.Context, | ||
cfg *protos.FlowConnectionConfigs, | ||
flowJobName string, | ||
// cfg *protos.FlowConnectionConfigs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this breaks backwards compatibility. Should still take config & pull name from that to maintain function signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with keeing the protos.FlowConnectionConfig
here, is that in cases where this is too large - it will still fail.
Is the suggestion to pass both and then remove the cfg
from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, only pass cfg. new code should pass an empty cfg with only FlowJobName present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK understood, I'll go in this direction then :)
} | ||
// because we have renamed the tables. | ||
cfg.TableMappings = state.SyncFlowOptions.TableMappings | ||
return nil, errors.New("cannot start CDCFlow with Resync enabled, please drop the flow and start again") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
20f7fd4
to
0b42cca
Compare
func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) { | ||
var configBytes sql.RawBytes | ||
dbCtx := context.Background() | ||
pool, _ := GetCatalogConnectionPoolFromEnv(dbCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should take context as param
) (*protos.SetupFlowOutput, error) { | ||
s.Info("executing setup flow") | ||
// gotta fetch the config from the catalog. | ||
config, err := internal.FetchConfigFromDB(flowJobName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't call this in workflows
could maybe run as side effect, but then it ends up in history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can this not be called in workflows? 🤔
0b42cca
to
e7712b1
Compare
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required. There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.
e7712b1
to
962a495
Compare
This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully hydrated `protos.FlowConnectionConfigs`. When passing this config to Temporal we strip the `tableMappings` array element which can cause it to go over the 2MB limit. This contains a subset of the changes which were originally proposed in: PeerDB-io#3407
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required.
There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.