Skip to content

Conversation

maxdml
Copy link
Collaborator

@maxdml maxdml commented Sep 23, 2025

Address #93

This PR adds a retry method that can take in another function for retry until a specific condition is met. retry can be configured with functional options and has a retryWithResult generic counterpart for functions returning a value.

All system database methods are now called within retry or retryWithResult.

For example

dequeuedWorkflows, err := retryWithResult(ctx, func() ([]dequeuedWorkflow, error) {
return ctx.systemDB.dequeueWorkflows(ctx, dequeueWorkflowsInput{
	queue:              queue,
	executorID:         ctx.executorID,
	applicationVersion: ctx.applicationVersion,
})
}, withRetrierLogger(qr.logger))

retry performs exponential backoff with jitter. It defaults to infinite retries.

The default condition for retrying is isRetryablePGError, which matches the function returned error with a set of connection errors (postgres codes, pgx connection errors, net.Error)

Note: in the particular case of RunWorkflow, which does manage transactions, the entire function has to be retried. This is because the transaction object becomes invalid as soon as Commit/Rollback have been called, regardless of whether the operation was successful. See https://github.com/jackc/pgx/blob/61d3c965ad442cc14d6b0e39e0ab3821f3684c03/tx.go#L180

This PR also improves DBOSError with the ability to unwrap the underlying error, if any.


go 1.22.0

toolchain go1.25.0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed for template

type DBOSError struct {
Message string // Human-readable error message
Code DBOSErrorCode // Error type code for programmatic handling
IsBase bool // Internal errors that shouldn't be caught by user code
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

Comment on lines -47 to -56
- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/go/pkg/mod
~/.cache/go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem very useful

}

logger.Info("Postgres available", "url", fmt.Sprintf("postgres://postgres:%s@localhost:5432", password))
logger.Info("Postgres available", "url", fmt.Sprintf("postgres://postgres:%s@localhost:5432", url.QueryEscape(password)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

AutoRemove: true,
Binds: []string{
fmt.Sprintf("%s:%s", hostPgDataVolumeName, pgData),
},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that an explicit volume binding is required. On MacOS, exporting PGDATA is enough to instruct Docker desktop to mount the right thing. Docker desktop uses a light VM to run postgres, and persist this volume across container restarts.

On the github action runner -- and likely on other environments too? The mount is not persisted across containers restarts.

Comment on lines -440 to -451
wfID, err := ctx.GetWorkflowID()
if err != nil {
return nil, fmt.Errorf("failed to get workflow ID: %w", err)
}
err = ctx.(*dbosContext).systemDB.updateWorkflowOutcome(WithoutCancel(ctx), updateWorkflowOutcomeDBInput{
workflowID: wfID,
status: WorkflowStatusError,
err: newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input)),
})
if err != nil {
return nil, fmt.Errorf("failed to record unexpected input type error: %w", err)
}
Copy link
Collaborator Author

@maxdml maxdml Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required because typedErasedWorkflow is always call within RunWorkflow, in a goroutine that update the workflow outcome.

Comment on lines -194 to -196
mockCtx.AssertExpectations(t)
mockChildHandle.AssertExpectations(t)
mockGenericHandle.AssertExpectations(t)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mock built with mocks.NewMoc... already checks expectations before destroying the object.

@maxdml maxdml marked this pull request as ready for review September 25, 2025 15:33
dbos/client.go Outdated
tx: tx,
}
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
err := retry(dbosCtx, func() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need retries here? It's the client, it can just fail, there's no deeper risk

Copy link
Collaborator Author

@maxdml maxdml Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily strictly speaking, but nice to have from a reliable library imo. Right now, all the client methods, which are wrapping the context methods, will retry.

Are you suggesting we should do the minimum amount of retries to avoid getting a corrupted Transact process (e.g., queue runner unusable), but should let the user handle connection errors on all other paths? E.g., cancel workflow.

Copy link
Member

@kraftp kraftp Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a huge deal either way--retries in the client are excessive, but not unsafe (and the Python client retries)

Copy link
Collaborator Author

@maxdml maxdml Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of doing less. We can easily add retries to client methods (enqueue, but also RetrieveWorkflow, CancelWorkflow, ResumeWorkflow, ListWorkflows, GetWorkflowSteps and ForkWorkflow) later if need be. The most important is to avoid having unusable-but-running Transact processes.

queue := dbos.NewWorkflowQueue(dbosCtx, "test_queue")

// Define step functions
stepOne := func(ctx dbos.DBOSContext, x int) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing we could check is if a scheduled workflow keeps ticking after reconnection

Copy link
Member

@kraftp kraftp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, important tests and fixes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants