Skip to content

Commit a9397dd

Browse files
committed
fix: add context.call return parameters
1 parent 79e240e commit a9397dd

11 files changed

+145
-60
lines changed

src/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
22
export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
33
export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
44
export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure";
5+
export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set"
56

67
export const WORKFLOW_PROTOCOL_VERSION = "1";
78
export const WORKFLOW_PROTOCOL_VERSION_HEADER = "Upstash-Workflow-Sdk-Version";

src/context/auto-executor.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ describe("auto-executor", () => {
120120
{
121121
destination: WORKFLOW_ENDPOINT,
122122
headers: {
123+
"upstash-feature-set": "WF_NoDelete",
123124
"content-type": "application/json",
124125
"upstash-forward-upstash-workflow-sdk-version": "1",
125126
"upstash-method": "POST",
@@ -207,6 +208,7 @@ describe("auto-executor", () => {
207208
body: '{"stepId":0,"stepName":"sleep for some time","stepType":"SleepFor","sleepFor":123,"concurrent":2,"targetStep":1}',
208209
destination: WORKFLOW_ENDPOINT,
209210
headers: {
211+
"upstash-feature-set": "WF_NoDelete",
210212
"content-type": "application/json",
211213
"upstash-delay": "123s",
212214
"upstash-forward-upstash-workflow-sdk-version": "1",
@@ -221,6 +223,7 @@ describe("auto-executor", () => {
221223
body: '{"stepId":0,"stepName":"sleep until next day","stepType":"SleepUntil","sleepUntil":123123,"concurrent":2,"targetStep":2}',
222224
destination: WORKFLOW_ENDPOINT,
223225
headers: {
226+
"upstash-feature-set": "WF_NoDelete",
224227
"content-type": "application/json",
225228
"upstash-forward-upstash-workflow-sdk-version": "1",
226229
"upstash-method": "POST",
@@ -273,6 +276,7 @@ describe("auto-executor", () => {
273276
{
274277
destination: WORKFLOW_ENDPOINT,
275278
headers: {
279+
"upstash-feature-set": "WF_NoDelete",
276280
"content-type": "application/json",
277281
"upstash-forward-upstash-workflow-sdk-version": "1",
278282
"upstash-method": "POST",
@@ -325,6 +329,7 @@ describe("auto-executor", () => {
325329
{
326330
destination: WORKFLOW_ENDPOINT,
327331
headers: {
332+
"upstash-feature-set": "WF_NoDelete",
328333
"content-type": "application/json",
329334
"upstash-forward-upstash-workflow-sdk-version": "1",
330335
"upstash-method": "POST",

src/context/context.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ describe("context tests", () => {
9595

9696
const throws = async () => {
9797
await context.run("outer step", async () => {
98-
await context.call("inner call", { url: "https://some-url.com"});
98+
await context.call("inner call", { url: "https://some-url.com" });
9999
});
100100
};
101101
expect(throws).toThrow(
@@ -137,6 +137,7 @@ describe("context tests", () => {
137137
body: '{"stepId":1,"stepName":"my-step","stepType":"Run","out":"my-result","concurrent":1}',
138138
destination: WORKFLOW_ENDPOINT,
139139
headers: {
140+
"upstash-feature-set": "WF_NoDelete",
140141
"content-type": "application/json",
141142
"upstash-forward-upstash-workflow-sdk-version": "1",
142143
"upstash-method": "POST",
@@ -185,6 +186,7 @@ describe("context tests", () => {
185186
},
186187
timeout: "20s",
187188
timeoutHeaders: {
189+
"Upstash-Feature-Set": ["WF_NoDelete"],
188190
"Content-Type": ["application/json"],
189191
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"],
190192
"Upstash-Retries": ["3"],
@@ -234,6 +236,7 @@ describe("context tests", () => {
234236
body: '{"stepId":0,"stepName":"my-wait-step","stepType":"Wait","waitEventId":"my-event-id","timeout":"20s","concurrent":2,"targetStep":1}',
235237
destination: WORKFLOW_ENDPOINT,
236238
headers: {
239+
"upstash-feature-set": "WF_NoDelete",
237240
"content-type": "application/json",
238241
"upstash-forward-upstash-workflow-sdk-version": "1",
239242
"upstash-method": "POST",
@@ -248,6 +251,7 @@ describe("context tests", () => {
248251
body: '{"stepId":0,"stepName":"my-run-step","stepType":"Run","concurrent":2,"targetStep":2}',
249252
destination: WORKFLOW_ENDPOINT,
250253
headers: {
254+
"upstash-feature-set": "WF_NoDelete",
251255
"content-type": "application/json",
252256
"upstash-forward-upstash-workflow-sdk-version": "1",
253257
"upstash-method": "POST",

src/context/context.ts

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { WaitStepResponse, WorkflowClient } from "../types";
1+
import type { CallResponse, WaitStepResponse, WorkflowClient } from "../types";
22
import { type StepFunction, type Step } from "../types";
33
import { AutoExecutor } from "./auto-executor";
44
import type { BaseLazyStep } from "./steps";
@@ -169,7 +169,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
169169
rawInitialPayload?: string; // optional for tests
170170
env?: Record<string, string | undefined>;
171171
retries?: number;
172-
}) {
172+
}) {
173173
this.qstashClient = qstashClient;
174174
this.workflowRunId = workflowRunId;
175175
this.steps = steps;
@@ -277,32 +277,21 @@ export class WorkflowContext<TInitialPayload = unknown> {
277277
* @returns call result (parsed as JSON if possible)
278278
*/
279279
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters
280-
public async call<TResult = unknown, TBody = unknown>(
280+
public async call(
281281
stepName: string,
282282
callSettings: {
283-
url: string,
284-
method?: HTTPMethods,
285-
body?: TBody,
286-
headers?: Record<string, string>
283+
url: string;
284+
method?: HTTPMethods;
285+
body?: unknown;
286+
headers?: Record<string, string>;
287287
}
288288
) {
289-
290-
const {
291-
url,
292-
method = "GET",
293-
body,
294-
headers = {}
295-
} = callSettings;
289+
const { url, method = "GET", body, headers = {} } = callSettings;
296290

297291
const result = await this.addStep(
298-
new LazyCallStep<string>(stepName, url, method, body, headers )
292+
new LazyCallStep<CallResponse>(stepName, url, method, body, headers)
299293
);
300-
301-
try {
302-
return JSON.parse(result) as TResult;
303-
} catch {
304-
return result as TResult;
305-
}
294+
return result
306295
}
307296

308297
public async waitForEvent(

src/integration.test.ts

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,28 +392,22 @@ describe.skip("live serve tests", () => {
392392
return;
393393
}
394394

395-
const postResult = await context.call<string>(
396-
"post call",
397-
{
398-
url: LOCAL_THIRD_PARTY_URL,
399-
method: "POST",
400-
body: "post-payload",
401-
headers: postHeader
402-
}
403-
);
395+
const { body: postResult } = await context.call("post call", {
396+
url: LOCAL_THIRD_PARTY_URL,
397+
method: "POST",
398+
body: "post-payload",
399+
headers: postHeader,
400+
});
404401
expect(postResult).toBe(
405402
"called POST 'third-party-result' 'post-header-value-x' '\"post-payload\"'"
406403
);
407404

408405
await context.sleep("sleep 1", 2);
409406

410-
const getResult = await context.call<string>(
411-
"get call",
412-
{
413-
url: LOCAL_THIRD_PARTY_URL,
414-
headers: getHeader
415-
}
416-
);
407+
const { body: getResult } = await context.call("get call", {
408+
url: LOCAL_THIRD_PARTY_URL,
409+
headers: getHeader,
410+
});
417411

418412
expect(getResult).toBe("called GET 'third-party-result' 'get-header-value-x'");
419413
finishState.finish();
@@ -497,6 +491,46 @@ describe.skip("live serve tests", () => {
497491
}
498492
);
499493

494+
test(
495+
"call failure",
496+
async () => {
497+
const failingResponse = "failing-response"
498+
const payload = "my-payload"
499+
const thirdPartyServer = serve({
500+
async fetch(request) {
501+
const requestPayload = await request.json()
502+
return new Response(`${failingResponse} - ${requestPayload}`, { status: 400 });
503+
},
504+
port: THIRD_PARTY_PORT,
505+
});
506+
507+
const finishState = new FinishState();
508+
await testEndpoint({
509+
finalCount: 4,
510+
waitFor: 7000,
511+
initialPayload: payload,
512+
finishState,
513+
routeFunction: async (context) => {
514+
const input = context.requestPayload;
515+
const { status, body, header } = await context.call("failing call", {
516+
url: LOCAL_THIRD_PARTY_URL,
517+
body: input,
518+
method: "POST"
519+
});
520+
expect(status).toBe(400)
521+
expect(body).toBe(`${failingResponse} - ${payload}`)
522+
expect(header["Content-Length"]).toEqual([ "29" ])
523+
finishState.finish()
524+
}
525+
});
526+
527+
thirdPartyServer.stop();
528+
},
529+
{
530+
timeout: 8000,
531+
}
532+
);
533+
500534
test(
501535
"retry",
502536
async () => {

src/serve/authorization.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ describe("disabled workflow context", () => {
7474
let called = false;
7575
await mockQStashServer({
7676
execute: () => {
77-
const throws = disabledContext.call("call-step", { url: "some-url"});
77+
const throws = disabledContext.call("call-step", { url: "some-url" });
7878
expect(throws).rejects.toThrow(QStashWorkflowAbort);
7979
called = true;
8080
},
@@ -200,6 +200,7 @@ describe("disabled workflow context", () => {
200200
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
201201
destination: WORKFLOW_ENDPOINT,
202202
headers: {
203+
"upstash-feature-set": "WF_NoDelete",
203204
"content-type": "application/json",
204205
"upstash-forward-upstash-workflow-sdk-version": "1",
205206
"upstash-method": "POST",
@@ -247,6 +248,7 @@ describe("disabled workflow context", () => {
247248
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
248249
destination: WORKFLOW_ENDPOINT,
249250
headers: {
251+
"upstash-feature-set": "WF_NoDelete",
250252
"content-type": "application/json",
251253
"upstash-forward-upstash-workflow-sdk-version": "1",
252254
"upstash-method": "POST",
@@ -295,6 +297,7 @@ describe("disabled workflow context", () => {
295297
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
296298
destination: WORKFLOW_ENDPOINT,
297299
headers: {
300+
"upstash-feature-set": "WF_NoDelete",
298301
"content-type": "application/json",
299302
"upstash-forward-upstash-workflow-sdk-version": "1",
300303
"upstash-method": "POST",

src/serve/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export const serve = <
5656
* @returns A promise that resolves to a response.
5757
*/
5858
const handler = async (request: TRequest) => {
59+
await debug?.log("INFO", "ENDPOINT_START");
60+
5961
const { workflowUrl, workflowFailureUrl } = await determineUrls(
6062
request,
6163
url,
@@ -114,6 +116,7 @@ export const serve = <
114116
failureUrl: workflowFailureUrl,
115117
debug,
116118
env,
119+
retries
117120
});
118121

119122
// attempt running routeFunction until the first step

src/serve/serve.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ describe("serve", () => {
325325
destination: WORKFLOW_ENDPOINT,
326326
headers: {
327327
"content-type": "application/json",
328+
"upstash-feature-set": "WF_NoDelete",
328329
"upstash-forward-upstash-workflow-sdk-version": "1",
329330
"upstash-method": "POST",
330331
"upstash-retries": "3",

src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,9 @@ export type WaitStepResponse = {
269269
timeout: boolean;
270270
notifyBody: unknown;
271271
};
272+
273+
export type CallResponse = {
274+
status: number,
275+
body: unknown,
276+
header: Record<string, string[]>
277+
}

0 commit comments

Comments
 (0)