Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions .idea/dbos-transact-golang.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 27 additions & 11 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ type DBOSContext interface {
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

// Workflow management
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
Expand Down Expand Up @@ -317,12 +317,28 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
initExecutor.executorID = config.ExecutorID

initExecutor.applicationID = os.Getenv("DBOS__APPID")
maxRetries := 5
retryDelay := time.Second * 2

var systemDB systemDatabase

for attempt := 1; attempt <= maxRetries; attempt++ {
systemDB, err = newSystemDatabase(initExecutor, config.DatabaseURL, initExecutor.logger)
if err == nil {
break
}
initExecutor.logger.Warn("Failed to connect to system DB", "attempt", attempt, "maxRetries", maxRetries, "error", err)

if attempt < maxRetries {
time.Sleep(retryDelay)
retryDelay *= 2 // Exponential backoff
}
}

// Create the system database
systemDB, err := newSystemDatabase(initExecutor, config.DatabaseURL, initExecutor.logger)
if err != nil {
return nil, newInitializationError(fmt.Sprintf("failed to create system database: %v", err))
return nil, newInitializationError(fmt.Sprintf("failed to create system database after %d attempts: %v", maxRetries, err))
}

initExecutor.systemDB = systemDB
initExecutor.logger.Info("System database initialized")

Expand Down