Skip to content

Commit 82c1991

Browse files
authored
DX-1811: workflow endpoint feature set (#120)
* feat: add feature set for checking if endpoint is workflow * fix: fmt * fix: add body field in errors * fix: tests * fix: return headers in all serve handlers * fix: add protocol header to error responses too * fix: add header to serveMany error responses * fix: set headers before sending response and fix express issues in #118, we bumped express to express 5 which is a breaking change. One thing we missed was how the catch all routes were defined. Because of this, we were getting 'TypeError: Missing parameter name at 1: https://git.new/pathToRegexpError' errors in express example. Fixed it by updating the catch all routes in express handler. See the related issue for more details: expressjs/express#5936 * fix: change expected call count for invoke route * fix: add non workflow trigger to ci
1 parent b78bb0d commit 82c1991

File tree

25 files changed

+221
-77
lines changed

25 files changed

+221
-77
lines changed

examples/ci/app/ci/constants.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ export const TEST_ROUTES: Pick<TestConfig, "route" | "waitForSeconds">[] = [
9999
{
100100
route: "invoke/workflows/workflowOne",
101101
waitForSeconds: 10
102+
},
103+
{
104+
route: "trigger-non-workflow/workflow",
105+
waitForSeconds: 10
102106
}
103107

104108
/**

examples/ci/app/test-routes/invoke/workflows/[...]/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ export const { POST, GET } = testServe(
210210
baseUrl: BASE_URL
211211
}),
212212
{
213-
expectedCallCount: 28,
213+
expectedCallCount: 29,
214214
expectedResult: "done invoke",
215215
payload,
216216
headers: {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export const NON_WORKFLOW_ROUTE_RESPONSE = "super-secret-response-foo";
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { NON_WORKFLOW_ROUTE_RESPONSE } from "../constants";
2+
3+
export const POST = async (request: Request) => {
4+
return new Response(NON_WORKFLOW_ROUTE_RESPONSE, { status: 200 });
5+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { serve } from "@upstash/workflow/nextjs";
2+
import { Client, StepError } from "@upstash/workflow";
3+
import { BASE_URL } from "app/ci/constants";
4+
import { saveResult } from "app/ci/upstash/redis";
5+
import { expect, testServe } from "app/ci/utils";
6+
import { NON_WORKFLOW_ROUTE_RESPONSE } from "../constants";
7+
8+
const header = `test-header-foo`
9+
const headerValue = `header-bar`
10+
11+
const workflowClient = new Client({ baseUrl: process.env.QSTASH_URL, token: process.env.QSTASH_TOKEN! })
12+
13+
export const { POST, GET } = testServe(
14+
serve(async (context) => {
15+
16+
const { workflowRunId } = await context.run("trigger non-workflow", async () =>
17+
workflowClient.trigger({
18+
url: `${BASE_URL}/test-routes/trigger-non-workflow/non-workflow`,
19+
})
20+
)
21+
22+
await context.sleep("wait before checking logs", 5)
23+
24+
const errorBody = await context.run("check run logs", async () => {
25+
for (let counter = 0; counter < 5; counter++) {
26+
const { runs } = await workflowClient.logs({ workflowRunId })
27+
if (runs.length === 1) {
28+
const run = runs[0];
29+
expect(run.workflowState, "RUN_FAILED")
30+
expect(run.steps.length, 2)
31+
32+
const secondStep = run.steps[1];
33+
expect(secondStep.type, "next")
34+
expect(secondStep.steps.length, 1)
35+
expect(secondStep.steps[0].state, "STEP_FAILED")
36+
37+
const errors = (secondStep.steps[0] as { errors: StepError[] }).errors;
38+
39+
expect(errors.length, 1)
40+
expect(errors[0].error, "detected a non-workflow destination for trigger/invoke. make sure you are sending the request to the correct endpoint")
41+
return errors[0].body
42+
} else {
43+
await new Promise(r => setTimeout(r, 2000));
44+
}
45+
}
46+
return false
47+
})
48+
49+
if (!errorBody) {
50+
throw new Error(`Workflow run with ID ${workflowRunId} did not complete successfully`)
51+
}
52+
53+
await saveResult(
54+
context,
55+
errorBody
56+
)
57+
58+
}, {
59+
baseUrl: BASE_URL,
60+
retries: 0
61+
}), {
62+
expectedCallCount: 5,
63+
expectedResult: NON_WORKFLOW_ROUTE_RESPONSE,
64+
payload: NON_WORKFLOW_ROUTE_RESPONSE,
65+
headers: {
66+
[header]: headerValue
67+
}
68+
})

examples/express/api/index.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ app.use('/workflow', serve<{ message: string }>(async (context) => {
2626

2727
const { body } = await context.call("get-data", {
2828
url: `${process.env.UPSTASH_WORKFLOW_URL ?? "http://localhost:3001"}/get-data`,
29-
method: "POST",
29+
method: "GET",
3030
body: { message: result1 }
3131
})
3232

@@ -38,6 +38,11 @@ app.use('/workflow', serve<{ message: string }>(async (context) => {
3838
})
3939
}));
4040

41+
app.get('/get-data', (req, res) => {
42+
const message = req.body.message as string;
43+
res.json({ message: `Received: ${message}` });
44+
});
45+
4146
/**
4247
* ServeMany
4348
*/
@@ -122,6 +127,6 @@ app.post("/ci", serve(
122127
}
123128
))
124129

125-
app.listen(3000, () => {
126-
console.log('Server running on port 3000');
130+
app.listen(3001, () => {
131+
console.log('Server running on port 3001');
127132
});

platforms/express.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ function createExpressHandler<TInitialPayload = unknown, TResult = unknown>(
6565

6666
const response = await serveHandler(webRequest);
6767

68+
// set headers
69+
for (const [key, value] of response.headers.entries()) {
70+
res.setHeader(key, value);
71+
}
72+
6873
res.status(response.status).json(await response.json());
6974
};
7075
}
@@ -77,7 +82,7 @@ export function serve<TInitialPayload = unknown, TResult = unknown>(
7782

7883
const handler: RequestHandler = createExpressHandler([routeFunction, options]);
7984

80-
router.all("*", handler);
85+
router.all(/(.*)/, handler);
8186

8287
return router;
8388
}
@@ -108,6 +113,6 @@ export const serveMany = (
108113
options,
109114
});
110115

111-
router.all("*", handler);
116+
router.all(/(.*)/, handler);
112117
return router;
113118
};

platforms/nextjs.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ export const servePagesRouter = <TInitialPayload = unknown, TResult = unknown>(
105105
method: "POST",
106106
});
107107
const response = await serveHandler(request);
108+
109+
// set headers
110+
for (const [key, value] of response.headers.entries()) {
111+
res.setHeader(key, value);
112+
}
113+
108114
res.status(response.status).json(await response.json());
109115
};
110116

src/agents/adapters.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ describe("wrapTools", () => {
8181
headers: {
8282
"upstash-workflow-sdk-version": "1",
8383
"content-type": "application/json",
84-
"upstash-feature-set": "LazyFetch,InitialBody",
84+
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
8585
"upstash-forward-upstash-workflow-sdk-version": "1",
8686
"upstash-method": "POST",
8787
"upstash-workflow-init": "false",
@@ -129,7 +129,7 @@ describe("wrapTools", () => {
129129
headers: {
130130
"upstash-workflow-sdk-version": "1",
131131
"content-type": "application/json",
132-
"upstash-feature-set": "LazyFetch,InitialBody",
132+
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
133133
"upstash-forward-upstash-workflow-sdk-version": "1",
134134
"upstash-method": "POST",
135135
"upstash-workflow-init": "false",
@@ -198,7 +198,7 @@ describe("wrapTools", () => {
198198
headers: {
199199
"content-type": "application/json",
200200
"upstash-delay": "1000s",
201-
"upstash-feature-set": "LazyFetch,InitialBody",
201+
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
202202
"upstash-forward-upstash-workflow-sdk-version": "1",
203203
"upstash-method": "POST",
204204
"upstash-workflow-init": "false",
@@ -246,7 +246,7 @@ describe("wrapTools", () => {
246246
destination: WORKFLOW_ENDPOINT,
247247
headers: {
248248
"content-type": "application/json",
249-
"upstash-feature-set": "LazyFetch,InitialBody",
249+
"upstash-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
250250
"upstash-forward-upstash-workflow-sdk-version": "1",
251251
"upstash-method": "POST",
252252
"upstash-workflow-init": "false",

src/agents/agent.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ describe("agents", () => {
100100
"upstash-workflow-sdk-version": "1",
101101
"content-type": "application/json",
102102
"upstash-callback": "https://requestcatcher.com/api",
103-
"upstash-callback-feature-set": "LazyFetch,InitialBody",
103+
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
104104
"upstash-callback-forward-upstash-workflow-callback": "true",
105105
"upstash-callback-forward-upstash-workflow-concurrent": "1",
106106
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
@@ -169,7 +169,7 @@ describe("agents", () => {
169169
"upstash-workflow-sdk-version": "1",
170170
"content-type": "application/json",
171171
"upstash-callback": "https://requestcatcher.com/api",
172-
"upstash-callback-feature-set": "LazyFetch,InitialBody",
172+
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
173173
"upstash-callback-forward-upstash-workflow-callback": "true",
174174
"upstash-callback-forward-upstash-workflow-concurrent": "1",
175175
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
@@ -237,7 +237,7 @@ describe("agents", () => {
237237
"upstash-workflow-sdk-version": "1",
238238
"content-type": "application/json",
239239
"upstash-callback": "https://requestcatcher.com/api",
240-
"upstash-callback-feature-set": "LazyFetch,InitialBody",
240+
"upstash-callback-feature-set": "LazyFetch,InitialBody,WF_DetectTrigger",
241241
"upstash-callback-forward-upstash-workflow-callback": "true",
242242
"upstash-callback-forward-upstash-workflow-concurrent": "1",
243243
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",

0 commit comments

Comments
 (0)