A highly reliable distributed task execution framework written in Go, designed to ensure that triggered tasks will eventually execute, even in the face of system failures or network issues.
Go Resilient Task provides a robust infrastructure for scheduling, executing, and monitoring distributed tasks with built-in reliability guarantees. It's designed for systems where task execution reliability is critical, such as payment processing, order fulfillment, or any workflow where tasks must complete despite temporary failures.
- Guaranteed Task Execution: Once a task is triggered, the framework ensures it will eventually execute to completion
- Flexible Retry Policies: Configurable retry strategies including exponential backoff with customizable parameters
- Concurrency Control: Limit the number of concurrent task executions to prevent system overload
- Processing Policies: Define how long tasks can run before timing out
- Task Scheduling: Schedule tasks to run at specific times in the future
- Task Deduplication: Prevent duplicate task execution using unique keys
- Kafka Integration: Use Kafka for reliable task distribution and execution triggering
- PostgreSQL Persistence: Store task state in PostgreSQL for durability
- Uber-FX Integration: Easy integration with the Uber-FX dependency injection framework
- Stuck Task Detection: Identify and recover from tasks that have been processing for too long
The framework consists of several key components:
- Task Service: Manages task creation, scheduling, and status tracking
- Task Executor: Executes tasks using registered task handlers
- Task Handlers: Process specific task types with custom business logic
- Retry Policies: Define how and when to retry failed tasks
- Concurrency Policies: Control how many tasks can execute simultaneously
- Processing Policies: Define task execution timeouts
- Task DAO: Persistence layer for storing task state
- Task Execution Triggerer: Triggers task execution based on events (e.g., Kafka messages)
- Go 1.18 or higher
- PostgreSQL
- Kafka (optional, for distributed task execution)
go get github.com/KDKHD/go-resilient-task
- Define your task handlers:
func PaymentProcessedHandler(logger *zap.Logger) taskhandler.ITaskHandler {
return taskhandleradapter.
NewTaskHandlerAdapterBuilder(
func(task taskmodel.IBaseTask) bool {
return task.GetType() == "payment_processed"
},
taskprocessor.TaskProcessorFunc(
func(task taskmodel.ITask) (taskprocessor.ProcessResult, error) {
logger.Debug("processing payment", zap.String("task_data", task.GetData()))
// Your business logic here
return taskprocessor.ProcessResult{
ResultCode: taskprocessor.DONE,
}, nil
},
),
).
WithConcurrencyPolicy(
concurrencypolicy.NewSimpleTaskConcurrencyPolicy(20, logger),
).
WithProcessingPolicy(
processingpolicy.NewSimpleTaskProcessingPolicy(time.Minute * 1),
).
WithRetryPolicy(
retrypolicy.NewExponentialRetryPolicy(
retrypolicy.WithDelay(5*time.Second),
retrypolicy.WithMultiplier(4),
retrypolicy.WithMaxCount(3),
retrypolicy.WithMaxDelay(20*time.Minute),
),
).
Build()
}
- Configure and start the framework with Uber-FX:
fx.New(
autoconfiguration.Provider(), // Default configuration
fx.Provide(
autoconfiguration.AsHandler(PaymentProcessedHandler), // Register handlers
NewPostgresClient,
NewTaskProperties,
),
autoconfiguration.InitiateResilientTaskFx(), // Start processing
).Run()
- Add tasks to be executed:
taskService.AddTask(taskservice.AddTaskRequest{
Type: "payment_processed",
TaskId: uuid.New(),
Data: []byte(`{"orderId": "12345"}`),
RunAfterTime: time.Now().UTC().Add(time.Second * 10),
ExpectedQueueTime: time.Second * 120,
})
taskproperties.NewTaskProperties(
taskproperties.WithTaskStuckTimeout(time.Minute*20),
taskproperties.WithTaskResumer(
taskproperties.NewTaskResumerProperties(
taskproperties.WithBatchSize(1000),
taskproperties.WithPollingInterval(time.Second*2),
taskproperties.WithConcurrency(10),
)),
taskproperties.WithKafka(
taskproperties.NewKafkaProperties(
taskproperties.WithBootstrapServers("localhost:29092"),
taskproperties.WithKafkaTopicsNamespace("my-namespace"),
taskproperties.WithKafkaConsumerGroupId("tasks"),
)),
)
- Exponential Retry Policy: Increases delay between retries exponentially
- No Retry Policy: Doesn't retry failed tasks
- Simple Concurrency Policy: Limits the number of concurrent task executions
- Simple Processing Policy: Sets a timeout for task execution
Check out the example in the example/uber-fx
directory for a complete working example using Uber-FX, PostgreSQL, and Kafka.
To run the example:
-
Start the required infrastructure:
cd example/uber-fx docker compose up
-
Run the example application:
go run .
We welcome contributions to the Go Resilient Task project! Here's how you can help:
- Use the GitHub issue tracker to report bugs
- Describe the bug or feature request in detail
- Include code examples and reproduction steps
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
- Follow Go best practices and style guidelines
- Write tests for new features
- Keep the code modular and maintainable
- Document public APIs
go test ./...