Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
},
"dependencies": {
"@ai-sdk/openai": "^1.2.1",
"@upstash/qstash": "^2.8.1",
"@upstash/qstash": "^2.8.2",
"ai": "^4.1.54",
"zod": "^3.24.1"
},
Expand Down
1 change: 1 addition & 0 deletions src/agents/adapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const fetchWithContextCall = async (
body,
timeout: agentCallParams?.timeout,
retries: agentCallParams?.retries,
retryDelay: agentCallParams?.retryDelay,
flowControl: agentCallParams?.flowControl,
});

Expand Down
8 changes: 8 additions & 0 deletions src/agents/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe("agents", () => {
url: WORKFLOW_ENDPOINT,
workflowRunId,
retries: 5,
retryDelay: "1000",
});

const agentsApi = new WorkflowAgents({ context });
Expand All @@ -43,6 +44,7 @@ describe("agents", () => {
},
retries: 5,
timeout: 10,
retryDelay: "1000",
},
});

Expand Down Expand Up @@ -119,9 +121,11 @@ describe("agents", () => {
"upstash-workflow-runid": workflowRunId,
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-callback-retries": "5",
"upstash-callback-retry-delay": "1000",
"upstash-flow-control-key": "flowControlKey",
"upstash-flow-control-value": "parallelism=2",
"upstash-retries": "5",
"upstash-retry-delay": "1000",
"upstash-timeout": "10",
},
},
Expand Down Expand Up @@ -173,6 +177,7 @@ describe("agents", () => {
"upstash-callback-forward-upstash-workflow-stepname": "Call Agent my agent",
"upstash-callback-forward-upstash-workflow-steptype": "Call",
"upstash-callback-retries": "5",
"upstash-callback-retry-delay": "1000",
"upstash-callback-workflow-calltype": "fromCallback",
"upstash-callback-workflow-init": "false",
"upstash-callback-workflow-runid": workflowRunId,
Expand All @@ -189,6 +194,7 @@ describe("agents", () => {
"upstash-flow-control-key": "flowControlKey",
"upstash-flow-control-value": "parallelism=2",
"upstash-retries": "5",
"upstash-retry-delay": "1000",
"upstash-timeout": "10",
},
},
Expand Down Expand Up @@ -239,6 +245,7 @@ describe("agents", () => {
"upstash-callback-forward-upstash-workflow-stepname": "Call Agent manager llm",
"upstash-callback-forward-upstash-workflow-steptype": "Call",
"upstash-callback-retries": "5",
"upstash-callback-retry-delay": "1000",
"upstash-callback-workflow-calltype": "fromCallback",
"upstash-callback-workflow-init": "false",
"upstash-callback-workflow-runid": workflowRunId,
Expand All @@ -255,6 +262,7 @@ describe("agents", () => {
"upstash-flow-control-key": "flowControlKey",
"upstash-flow-control-value": "parallelism=2",
"upstash-retries": "5",
"upstash-retry-delay": "1000",
"upstash-timeout": "10",
},
},
Expand Down
5 changes: 4 additions & 1 deletion src/agents/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ export type ManagerAgentParameters = {
Pick<AgentParameters, "maxSteps">;

type ModelParams = Parameters<ReturnType<typeof createOpenAI>>;
export type AgentCallParams = Pick<CallSettings, "flowControl" | "retries" | "timeout">;
export type AgentCallParams = Pick<
CallSettings,
"flowControl" | "retries" | "timeout" | "retryDelay"
>;
type CustomModelSettings = ModelParams["1"] & { baseURL?: string; apiKey?: string } & {
callSettings: AgentCallParams;
};
Expand Down
10 changes: 10 additions & 0 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ describe("workflow client", () => {
headers: { "user-header": "user-header-value" },
workflowRunId: myWorkflowRunId,
retries: 15,
retryDelay: "1000",
delay: 1,
});
},
Expand All @@ -229,6 +230,7 @@ describe("workflow client", () => {
"upstash-forward-user-header": "user-header-value",
"upstash-method": "POST",
"upstash-retries": "15",
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
Expand Down Expand Up @@ -261,6 +263,7 @@ describe("workflow client", () => {
headers: { "user-header": "user-header-value" },
workflowRunId: myWorkflowRunId,
retries: 15,
retryDelay: "1000",
delay: 1,
},
{
Expand All @@ -269,6 +272,7 @@ describe("workflow client", () => {
headers: { "user-header": "user-header-value" },
workflowRunId: myWorkflowRunId2,
retries: 15,
retryDelay: "2000",
delay: 1,
useFailureFunction: true,
},
Expand All @@ -294,6 +298,7 @@ describe("workflow client", () => {
"upstash-forward-user-header": "user-header-value",
"upstash-method": "POST",
"upstash-retries": "15",
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
Expand All @@ -314,6 +319,7 @@ describe("workflow client", () => {
"upstash-forward-user-header": "user-header-value",
"upstash-method": "POST",
"upstash-retries": "15",
"upstash-retry-delay": "2000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId2}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
Expand All @@ -330,6 +336,7 @@ describe("workflow client", () => {
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
"upstash-failure-callback-forward-user-header": "user-header-value",
"upstash-failure-callback-retries": "15",
"upstash-failure-callback-retry-delay": "2000",
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId2}`,
Expand All @@ -353,6 +360,7 @@ describe("workflow client", () => {
headers: { "user-header": "user-header-value" },
workflowRunId: myWorkflowRunId,
retries: 15,
retryDelay: "1000",
delay: 1,
failureUrl: "https://requestcatcher.com/some-failure-callback",
});
Expand All @@ -373,6 +381,7 @@ describe("workflow client", () => {
"upstash-forward-user-header": "user-header-value",
"upstash-method": "POST",
"upstash-retries": "15",
"upstash-retry-delay": "1000",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://requestcatcher.com/api",
Expand All @@ -385,6 +394,7 @@ describe("workflow client", () => {
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
"upstash-failure-callback-forward-user-header": "user-header-value",
"upstash-failure-callback-retries": "15",
"upstash-failure-callback-retry-delay": "1000",
"upstash-failure-callback-workflow-calltype": "failureCall",
"upstash-failure-callback-workflow-init": "false",
"upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`,
Expand Down
9 changes: 7 additions & 2 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ export class Client {
* headers: { ... }, // Optional headers
* workflowRunId: "my-workflow", // Optional workflow run ID
* retries: 3 // Optional retries for the initial request
* retryDelay: "1000" // Optional retry delay for the delay between retries
* });
*
* console.log(workflowRunId)
Expand All @@ -191,13 +192,15 @@ export class Client {
* headers: { ... }, // Optional headers
* workflowRunId: "my-workflow", // Optional workflow run ID
* retries: 3 // Optional retries for the initial request
* retryDelay: "1000" // Optional retry delay for the delay between retries
* },
* {
* url: "https://workflow-endpoint-2.com",
* body: "hello world!", // Optional body
* headers: { ... }, // Optional headers
* workflowRunId: "my-workflow-2", // Optional workflow run ID
* retries: 5 // Optional retries for the initial request
* retryDelay: "1000" // Optional retry delay for the delay between retries
* },
* ]);
*
Expand All @@ -218,6 +221,7 @@ export class Client {
* with `wfr_`.
* @param retries retry to use in the initial request. in the rest of
* the workflow, `retries` option of the `serve` will be used.
* @param retryDelay delay between retries.
* @param flowControl Settings for controlling the number of active requests
* and number of requests per second with the same key.
* @param delay Delay for the workflow run. This is used to delay the
Expand All @@ -241,13 +245,14 @@ export class Client {

const context = new WorkflowContext({
qstashClient: this.client,
// @ts-expect-error headers type mismatch
headers: new Headers(option.headers ?? {}),
// @ts-expect-error header type mismatch because of bun
headers: new Headers((option.headers ?? {})),
initialPayload: option.body,
steps: [],
url: option.url,
workflowRunId: finalWorkflowRunId,
retries: option.retries,
retryDelay: option.retryDelay,
telemetry: { sdk: SDK_TELEMETRY },
flowControl: option.flowControl,
failureUrl,
Expand Down
48 changes: 40 additions & 8 deletions src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type BaseStepLog = {
* number of retries for the step
*/
retries: number;
/**
* retry delay parameter for the step if it was set
*/
retryDelay?: string;
/**
* number of parallel steps
*
Expand All @@ -53,10 +57,6 @@ type BaseStepLog = {
* headers
*/
headers: Record<string, string[]>;
/**
* retry delay parameter for the step if it was set
*/
retryDelay?: string;
};

type CallUrlGroup = {
Expand Down Expand Up @@ -184,14 +184,14 @@ type StepLogGroup =
* retries
*/
retries: number;
/**
* errors which occured in the step
*/
errors?: StepError[];
/**
* retry delay parameter for the step if it was set
*/
retryDelay?: string;
/**
* errors which occured in the step
*/
errors?: StepError[];
}[];
/**
* Log which belongs to the next step
Expand Down Expand Up @@ -345,6 +345,38 @@ export type TriggerOptions = {
* @default 3
*/
retries?: number;
/**
* Delay between retries.
*
* By default, the `retryDelay` is exponential backoff.
* More details can be found in: https://upstash.com/docs/qstash/features/retry.
*
* The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.
*
* You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
* The special variable `retried` represents the current retry attempt count (starting from 0).
*
* Supported functions:
* - `pow`
* - `sqrt`
* - `abs`
* - `exp`
* - `floor`
* - `ceil`
* - `round`
* - `min`
* - `max`
*
* Examples of valid `retryDelay` values:
* ```ts
* 1000 // 1 second
* 1000 * (1 + retried) // 1 second multiplied by the current retry attempt
* pow(2, retried) // 2 to the power of the current retry attempt
* max(10, pow(2, retried)) // The greater of 10 or 2^retried
* ```
*/
retryDelay?: string;

/**
* Flow control to use for the workflow run.
* If not provided, no flow control will be used.
Expand Down
3 changes: 2 additions & 1 deletion src/context/api/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export abstract class BaseWorkflowApi {
>
): Promise<CallResponse<TResult>> {
const { url, appendHeaders, method } = getProviderInfo(settings.api);
const { method: userMethod, body, headers = {}, retries = 0, timeout } = settings;
const { method: userMethod, body, headers = {}, retries = 0, retryDelay, timeout } = settings;

return await this.context.call<TResult, TBody>(stepName, {
url,
Expand All @@ -43,6 +43,7 @@ export abstract class BaseWorkflowApi {
...headers,
},
retries,
retryDelay,
timeout,
});
}
Expand Down
8 changes: 8 additions & 0 deletions src/context/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ describe("auto-executor", () => {
steps,
url: WORKFLOW_ENDPOINT,
retries: 6,
retryDelay: "1000",
flowControl: {
key: "key",
parallelism: 10,
Expand Down Expand Up @@ -135,6 +136,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand Down Expand Up @@ -232,6 +234,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand All @@ -251,6 +254,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand All @@ -270,6 +274,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand All @@ -288,6 +293,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand Down Expand Up @@ -349,6 +355,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand Down Expand Up @@ -406,6 +413,7 @@ describe("auto-executor", () => {
"upstash-workflow-init": "false",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-retries": "6",
"upstash-retry-delay": "1000",
"upstash-flow-control-key": "key",
"upstash-flow-control-value": "parallelism=10",
"upstash-forward-upstash-workflow-invoke-count": "7",
Expand Down
Loading
Loading