-
Notifications
You must be signed in to change notification settings - Fork 28
Db retries #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Db retries #146
Conversation
|
||
go 1.22.0 | ||
|
||
toolchain go1.25.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed for template
type DBOSError struct { | ||
Message string // Human-readable error message | ||
Code DBOSErrorCode // Error type code for programmatic handling | ||
IsBase bool // Internal errors that shouldn't be caught by user code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused
This reverts commit 9127d2c.
- name: Cache Go modules | ||
uses: actions/cache@v4 | ||
with: | ||
path: | | ||
~/go/pkg/mod | ||
~/.cache/go-build | ||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} | ||
restore-keys: | | ||
${{ runner.os }}-go- | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem very useful
} | ||
|
||
logger.Info("Postgres available", "url", fmt.Sprintf("postgres://postgres:%s@localhost:5432", password)) | ||
logger.Info("Postgres available", "url", fmt.Sprintf("postgres://postgres:%s@localhost:5432", url.QueryEscape(password))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix
AutoRemove: true, | ||
Binds: []string{ | ||
fmt.Sprintf("%s:%s", hostPgDataVolumeName, pgData), | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that an explicit volume binding is required. On MacOS, exporting PGDATA
is enough to instruct Docker desktop to mount the right thing. Docker desktop uses a light VM to run postgres, and persist this volume across container restarts.
On the github action runner -- and likely on other environments too? The mount is not persisted across containers restarts.
wfID, err := ctx.GetWorkflowID() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get workflow ID: %w", err) | ||
} | ||
err = ctx.(*dbosContext).systemDB.updateWorkflowOutcome(WithoutCancel(ctx), updateWorkflowOutcomeDBInput{ | ||
workflowID: wfID, | ||
status: WorkflowStatusError, | ||
err: newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input)), | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to record unexpected input type error: %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required because typedErasedWorkflow
is always call within RunWorkflow
, in a goroutine that update the workflow outcome.
mockCtx.AssertExpectations(t) | ||
mockChildHandle.AssertExpectations(t) | ||
mockGenericHandle.AssertExpectations(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A mock built with mocks.NewMoc...
already checks expectations before destroying the object.
dbos/client.go
Outdated
tx: tx, | ||
} | ||
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput) | ||
err := retry(dbosCtx, func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need retries here? It's the client, it can just fail, there's no deeper risk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily strictly speaking, but nice to have from a reliable library imo. Right now, all the client methods, which are wrapping the context methods, will retry.
Are you suggesting we should do the minimum amount of retries to avoid getting a corrupted Transact process (e.g., queue runner unusable), but should let the user handle connection errors on all other paths? E.g., cancel workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a huge deal either way--retries in the client are excessive, but not unsafe (and the Python client retries)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of doing less. We can easily add retries to client methods (enqueue, but also RetrieveWorkflow, CancelWorkflow, ResumeWorkflow, ListWorkflows, GetWorkflowSteps and ForkWorkflow) later if need be. The most important is to avoid having unusable-but-running Transact processes.
queue := dbos.NewWorkflowQueue(dbosCtx, "test_queue") | ||
|
||
// Define step functions | ||
stepOne := func(ctx dbos.DBOSContext, x int) (int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing we could check is if a scheduled workflow keeps ticking after reconnection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, important tests and fixes!
Address #93
This PR adds a
retry
method that can take in another function for retry until a specific condition is met.retry
can be configured with functional options and has aretryWithResult
generic counterpart for functions returning a value.All system database methods are now called within
retry
orretryWithResult
.For example
retry performs exponential backoff with jitter. It defaults to infinite retries.
The default condition for retrying is
isRetryablePGError
, which matches the function returned error with a set of connection errors (postgres codes, pgx connection errors,net.Error
)Note: in the particular case of RunWorkflow, which does manage transactions, the entire function has to be retried. This is because the transaction object becomes invalid as soon as Commit/Rollback have been called, regardless of whether the operation was successful. See https://github.com/jackc/pgx/blob/61d3c965ad442cc14d6b0e39e0ab3821f3684c03/tx.go#L180
This PR also improves
DBOSError
with the ability to unwrap the underlying error, if any.