Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/ci/app/ci/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ export const TEST_ROUTES: Pick<TestConfig, "route" | "waitForSeconds">[] = [
{
route: "invoke/workflows/workflowOne",
waitForSeconds: 10
},
{
route: "trigger-non-workflow/workflow",
waitForSeconds: 10
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export const { POST, GET } = testServe(
baseUrl: BASE_URL
}),
{
expectedCallCount: 28,
expectedCallCount: 29,
expectedResult: "done invoke",
payload,
headers: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const NON_WORKFLOW_ROUTE_RESPONSE = "super-secret-response-foo";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { NON_WORKFLOW_ROUTE_RESPONSE } from "../constants";

export const POST = async (request: Request) => {
return new Response(NON_WORKFLOW_ROUTE_RESPONSE, { status: 200 });
}
68 changes: 68 additions & 0 deletions examples/ci/app/test-routes/trigger-non-workflow/workflow/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { serve } from "@upstash/workflow/nextjs";
import { Client, StepError } from "@upstash/workflow";
import { BASE_URL } from "app/ci/constants";
import { saveResult } from "app/ci/upstash/redis";
import { expect, testServe } from "app/ci/utils";
import { NON_WORKFLOW_ROUTE_RESPONSE } from "../constants";

const header = `test-header-foo`
const headerValue = `header-bar`

const workflowClient = new Client({ baseUrl: process.env.QSTASH_URL, token: process.env.QSTASH_TOKEN! })

export const { POST, GET } = testServe(
serve(async (context) => {

const { workflowRunId } = await context.run("trigger non-workflow", async () =>
workflowClient.trigger({
url: `${BASE_URL}/test-routes/trigger-non-workflow/non-workflow`,
})
)

await context.sleep("wait before checking logs", 5)

const errorBody = await context.run("check run logs", async () => {
for (let counter = 0; counter < 5; counter++) {
const { runs } = await workflowClient.logs({ workflowRunId })
if (runs.length === 1) {
const run = runs[0];
expect(run.workflowState, "RUN_FAILED")
expect(run.steps.length, 2)

const secondStep = run.steps[1];
expect(secondStep.type, "next")
expect(secondStep.steps.length, 1)
expect(secondStep.steps[0].state, "STEP_FAILED")

const errors = (secondStep.steps[0] as { errors: StepError[] }).errors;

expect(errors.length, 1)
expect(errors[0].error, "detected a non-workflow destination for trigger/invoke. make sure you are sending the request to the correct endpoint")
return errors[0].body
} else {
await new Promise(r => setTimeout(r, 2000));
}
}
return false
})

if (!errorBody) {
throw new Error(`Workflow run with ID ${workflowRunId} did not complete successfully`)
}

await saveResult(
context,
errorBody
)

}, {
baseUrl: BASE_URL,
retries: 0
}), {
expectedCallCount: 5,
expectedResult: NON_WORKFLOW_ROUTE_RESPONSE,
payload: NON_WORKFLOW_ROUTE_RESPONSE,
headers: {
[header]: headerValue
}
})
11 changes: 8 additions & 3 deletions examples/express/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ app.use('/workflow', serve<{ message: string }>(async (context) => {

const { body } = await context.call("get-data", {
url: `${process.env.UPSTASH_WORKFLOW_URL ?? "http://localhost:3001"}/get-data`,
method: "POST",
method: "GET",
body: { message: result1 }
})

Expand All @@ -38,6 +38,11 @@ app.use('/workflow', serve<{ message: string }>(async (context) => {
})
}));

app.get('/get-data', (req, res) => {
const message = req.body.message as string;
res.json({ message: `Received: ${message}` });
});

/**
* ServeMany
*/
Expand Down Expand Up @@ -122,6 +127,6 @@ app.post("/ci", serve(
}
))

app.listen(3000, () => {
console.log('Server running on port 3000');
app.listen(3001, () => {
console.log('Server running on port 3001');
});
9 changes: 7 additions & 2 deletions platforms/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ function createExpressHandler<TInitialPayload = unknown, TResult = unknown>(

const response = await serveHandler(webRequest);

// set headers
for (const [key, value] of response.headers.entries()) {
res.setHeader(key, value);
}

res.status(response.status).json(await response.json());
};
}
Expand All @@ -77,7 +82,7 @@ export function serve<TInitialPayload = unknown, TResult = unknown>(

const handler: RequestHandler = createExpressHandler([routeFunction, options]);

router.all("*", handler);
router.all(/(.*)/, handler);

return router;
}
Expand Down Expand Up @@ -108,6 +113,6 @@ export const serveMany = (
options,
});

router.all("*", handler);
router.all(/(.*)/, handler);
return router;
};
6 changes: 6 additions & 0 deletions platforms/nextjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ export const servePagesRouter = <TInitialPayload = unknown, TResult = unknown>(
method: "POST",
});
const response = await serveHandler(request);

// set headers
for (const [key, value] of response.headers.entries()) {
res.setHeader(key, value);
}

res.status(response.status).json(await response.json());
};

Expand Down
8 changes: 4 additions & 4 deletions src/agents/adapters.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe("wrapTools", () => {
headers: {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-workflow-init": "false",
Expand Down Expand Up @@ -129,7 +129,7 @@ describe("wrapTools", () => {
headers: {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-workflow-init": "false",
Expand Down Expand Up @@ -198,7 +198,7 @@ describe("wrapTools", () => {
headers: {
"content-type": "application/json",
"upstash-delay": "1000s",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-workflow-init": "false",
Expand Down Expand Up @@ -246,7 +246,7 @@ describe("wrapTools", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-workflow-init": "false",
Expand Down
6 changes: 3 additions & 3 deletions src/agents/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe("agents", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down Expand Up @@ -165,7 +165,7 @@ describe("agents", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down Expand Up @@ -231,7 +231,7 @@ describe("agents", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down
6 changes: 3 additions & 3 deletions src/agents/task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ describe("tasks", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down Expand Up @@ -198,7 +198,7 @@ describe("tasks", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down Expand Up @@ -273,7 +273,7 @@ describe("tasks", () => {
"upstash-workflow-sdk-version": "1",
"content-type": "application/json",
"upstash-callback": "https://requestcatcher.com/api",
"upstash-callback-feature-set": "LazyFetch,InitialBody",
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-callback-forward-upstash-workflow-callback": "true",
"upstash-callback-forward-upstash-workflow-concurrent": "1",
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
Expand Down
13 changes: 7 additions & 6 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ describe("workflow client", () => {
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
Expand Down Expand Up @@ -299,7 +299,7 @@ describe("workflow client", () => {
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
Expand All @@ -319,13 +319,13 @@ describe("workflow client", () => {
"upstash-workflow-url": "https://requestcatcher.com/api",
"upstash-delay": "1s",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-telemetry-framework": "unknown",
"upstash-telemetry-runtime": expect.stringMatching(/bun@/),
"upstash-telemetry-sdk": expect.stringContaining("@upstash/workflow"),
"upstash-workflow-sdk-version": "1",
"upstash-failure-callback": "https://requestcatcher.com/api",
"upstash-failure-callback-feature-set": "LazyFetch,InitialBody",
"upstash-failure-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-failure-callback-forward-upstash-workflow-failure-callback": "true",
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
"upstash-failure-callback-forward-user-header": "user-header-value",
Expand Down Expand Up @@ -379,8 +379,8 @@ describe("workflow client", () => {
"upstash-delay": "1s",
"upstash-failure-callback": "https://requestcatcher.com/some-failure-callback",
"content-type": "application/json",
"upstash-feature-set": "LazyFetch,InitialBody",
"upstash-failure-callback-feature-set": "LazyFetch,InitialBody",
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-failure-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
"upstash-failure-callback-forward-upstash-workflow-failure-callback": "true",
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
"upstash-failure-callback-forward-user-header": "user-header-value",
Expand Down Expand Up @@ -668,6 +668,7 @@ describe("workflow client", () => {
errors: [
{
error: "400 Bad Request",
body: expect.any(String),
headers: expect.any(Object),
status: 400,
time: expect.any(Number),
Expand Down
15 changes: 14 additions & 1 deletion src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,21 @@ export type StepLog = BaseStepLog &
AsOptional<{ sleepUntil: number }> &
AsOptional<WaitEventGroup>;

type StepError = {
export type StepError = {
/**
* error message associated with the request
*
* example:
* ```
* detected a non-workflow destination for trigger/invoke.
* make sure you are sending the request to the correct endpoint
* ```
*/
error: string;
/**
* response body returned in the request which resulted in an error
*/
body: string;
headers: Record<string, string[]>;
status: number;
time: number;
Expand Down
Loading