Skip to content

Commit 18d8d1e

Browse files
committed
fix: add get waiters method and try to parse notify/wait response by defualt
1 parent ca0f0e9 commit 18d8d1e

File tree

7 files changed

+139
-56
lines changed

7 files changed

+139
-56
lines changed

src/client/index.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { NotifyResponse } from "../types";
1+
import { NotifyResponse, Waiter } from "../types";
22
import { Client as QStashClient } from "@upstash/qstash";
3-
import { makeNotifyRequest } from "./utils";
3+
import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
44

55
type ClientConfig = ConstructorParameters<typeof QStashClient>[0];
66

@@ -42,6 +42,15 @@ export class Client {
4242
eventId: string;
4343
eventData?: unknown;
4444
}): Promise<NotifyResponse[]> {
45-
return await makeNotifyRequest(this.client.http, eventId, eventData)
45+
return await makeNotifyRequest(this.client.http, eventId, eventData);
46+
}
47+
48+
/**
49+
* Check waiters of an event
50+
*
51+
* @param eventId event id to check
52+
*/
53+
public async getWaiters({ eventId }: { eventId: string }): Promise<Waiter[]> {
54+
return await makeGetWaitersRequest(this.client.http, eventId);
4655
}
4756
}

src/client/utils.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Client } from "@upstash/qstash";
2-
import { NotifyResponse } from "../types";
2+
import { NotifyResponse, Waiter } from "../types";
33

44
export const makeNotifyRequest = async (
55
requester: Client["http"],
@@ -13,4 +13,15 @@ export const makeNotifyRequest = async (
1313
})) as NotifyResponse[];
1414

1515
return result;
16-
}
16+
};
17+
18+
export const makeGetWaitersRequest = async (
19+
requester: Client["http"],
20+
eventId: string
21+
): Promise<Waiter[]> => {
22+
const result = (await requester.request({
23+
path: ["v2", "waiters", eventId],
24+
method: "GET",
25+
})) as Waiter[];
26+
return result;
27+
};

src/context/context.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -341,24 +341,33 @@ export class WorkflowContext<TInitialPayload = unknown> {
341341
)
342342
);
343343

344-
return result;
344+
try {
345+
return {
346+
...result,
347+
eventData: JSON.parse(result.eventData as string),
348+
};
349+
} catch {
350+
return result;
351+
}
345352
}
346353

347354
public async notify(
348355
stepName: string,
349356
eventId: string,
350-
eventData: string
357+
eventData: unknown
351358
): Promise<NotifyStepResponse> {
352359
const result = await this.addStep(
353-
new LazyNotifyStep(
354-
stepName,
355-
eventId,
356-
eventData,
357-
this.qstashClient.http
358-
)
360+
new LazyNotifyStep(stepName, eventId, eventData, this.qstashClient.http)
359361
);
360362

361-
return result;
363+
try {
364+
return {
365+
...result,
366+
eventData: JSON.parse(result.eventData as string),
367+
};
368+
} catch {
369+
return result;
370+
}
362371
}
363372

364373
/**

src/context/steps.test.ts

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,11 @@ describe("test steps", () => {
186186

187187
describe("notify step", () => {
188188
const eventId = "my-event-id";
189-
const eventData = {data: "my-event-data"};
189+
const eventData = { data: "my-event-data" };
190190

191191
// get client
192192
const token = nanoid();
193-
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token })
193+
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });
194194

195195
const step = new LazyNotifyStep(stepName, eventId, eventData, client.http);
196196

@@ -209,7 +209,7 @@ describe("test steps", () => {
209209
});
210210

211211
test("should create result step", async () => {
212-
let called = false
212+
let called = false;
213213
const notifyResponse: NotifyResponse[] = [
214214
{
215215
error: "no-error",
@@ -224,27 +224,27 @@ describe("test steps", () => {
224224
"my-header": ["value"],
225225
},
226226
timeoutUrl: "url",
227-
url: "url"
228-
}
229-
}
230-
]
227+
url: "url",
228+
},
229+
},
230+
];
231231
const stepResponse: NotifyStepResponse = {
232232
eventId,
233233
eventData,
234-
notifyResponse
235-
}
234+
notifyResponse,
235+
};
236236

237237
await mockQStashServer({
238238
execute: async () => {
239-
const result = await step.getResultStep(4, stepId)
239+
const result = await step.getResultStep(4, stepId);
240240
expect(result).toEqual({
241241
concurrent: 4,
242242
stepId,
243243
out: stepResponse,
244244
stepName,
245245
stepType: "Notify",
246246
});
247-
called = true
247+
called = true;
248248
},
249249
responseFields: {
250250
status: 200,
@@ -256,10 +256,9 @@ describe("test steps", () => {
256256
token,
257257
body: eventData,
258258
},
259-
})
259+
});
260260

261-
expect(called).toBeTrue()
261+
expect(called).toBeTrue();
262262
});
263-
264-
})
263+
});
265264
});

src/context/steps.ts

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -227,26 +227,17 @@ export class LazyWaitForEventStep extends BaseLazyStep<WaitStepResponse> {
227227
}
228228

229229
export class LazyNotifyStep extends LazyFunctionStep<NotifyStepResponse> {
230-
stepType: StepType = "Notify"
230+
stepType: StepType = "Notify";
231231

232-
constructor(
233-
stepName: string,
234-
eventId: string,
235-
eventData: unknown,
236-
requester: Client["http"]
237-
) {
232+
constructor(stepName: string, eventId: string, eventData: unknown, requester: Client["http"]) {
238233
super(stepName, async () => {
239-
const notifyResponse = await makeNotifyRequest(
240-
requester,
241-
eventId,
242-
eventData
243-
)
234+
const notifyResponse = await makeNotifyRequest(requester, eventId, eventData);
244235

245236
return {
246237
eventId,
247238
eventData,
248-
notifyResponse
249-
}
250-
})
239+
notifyResponse,
240+
};
241+
});
251242
}
252243
}

src/integration.test.ts

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ import type { RouteFunction, WaitStepResponse, WorkflowServeOptions } from "./ty
5555
import type { NextRequest } from "next/server";
5656
import { Client } from "./client";
5757
import { nanoid } from "./utils";
58+
import { makeGetWaitersRequest } from "./client/utils";
5859

5960
const WORKFLOW_PORT = "3000";
6061
const THIRD_PARTY_PORT = "3001";
61-
const LOCAL_WORKFLOW_URL = `http://localhost:${WORKFLOW_PORT}`;
6262
const LOCAL_THIRD_PARTY_URL = `http://localhost:${THIRD_PARTY_PORT}`;
6363

6464
const someWork = (input: string) => {
@@ -109,21 +109,23 @@ const testEndpoint = async <TInitialPayload = unknown>({
109109
finishState,
110110
failureFunction,
111111
retries,
112+
port = WORKFLOW_PORT,
112113
}: {
113-
finalCount: number;
114+
finalCount?: number;
114115
waitFor: number;
115116
initialPayload: TInitialPayload;
116117
routeFunction: RouteFunction<TInitialPayload>;
117118
finishState: FinishState;
118119
failureFunction?: WorkflowServeOptions["failureFunction"];
119120
retries?: number;
121+
port?: string;
120122
}) => {
121123
let counter = 0;
122124

123125
const endpoint = workflowServe<TInitialPayload>(routeFunction, {
124126
qstashClient,
125-
url: LOCAL_WORKFLOW_URL,
126-
verbose: true,
127+
url: `http://localhost:${port}`,
128+
// verbose: true,
127129
failureFunction,
128130
retries,
129131
});
@@ -133,7 +135,7 @@ const testEndpoint = async <TInitialPayload = unknown>({
133135
counter += 1;
134136
return await endpoint(request as NextRequest);
135137
},
136-
port: WORKFLOW_PORT,
138+
port: port,
137139
});
138140

139141
await qstashClient.publishJSON({
@@ -142,15 +144,17 @@ const testEndpoint = async <TInitialPayload = unknown>({
142144
headers: {
143145
Authentication: "Bearer secretPassword",
144146
},
145-
url: `http://localhost:${WORKFLOW_PORT}`,
147+
url: `http://localhost:${port}`,
146148
});
147149

148150
await new Promise((resolve) => setTimeout(resolve, waitFor));
149151

150152
server.stop();
151153

152154
finishState.check();
153-
expect(counter).toBe(finalCount);
155+
if (finalCount) {
156+
expect(counter).toBe(finalCount);
157+
}
154158
};
155159

156160
describe.skip("live serve tests", () => {
@@ -590,8 +594,9 @@ describe.skip("live serve tests", () => {
590594
context.waitForEvent("wait-event-step", eventId, 3),
591595
]);
592596
expect(runResponse).toBe(runResult);
593-
expect(waitResponse.eventData).toBe(expectedWaitResponse.eventData);
594597
expect(waitResponse.timeout).toBe(expectedWaitResponse.timeout);
598+
expect(waitResponse.eventData).toEqual(expectedWaitResponse.eventData);
599+
expect(typeof waitResponse.eventData).toBe(typeof expectedWaitResponse.eventData);
595600
finishState.finish();
596601
},
597602
});
@@ -651,5 +656,64 @@ describe.skip("live serve tests", () => {
651656
},
652657
{ timeout: 17_000 }
653658
);
659+
660+
describe("should notify from inside a function", () => {
661+
const testNotifyWithContext = async (payload: unknown) => {
662+
const eventId = `my-event-id-${nanoid()}`;
663+
664+
const waitingEndpoint = testWaitEndpoint(
665+
{
666+
eventData: payload,
667+
timeout: false,
668+
},
669+
eventId
670+
);
671+
672+
const finishState = new FinishState();
673+
const notifyingEndpoint = testEndpoint({
674+
finishState,
675+
initialPayload: undefined,
676+
waitFor: 15000,
677+
port: "3002",
678+
routeFunction: async (context) => {
679+
// wait to avoid notifying the first waitForEvent
680+
await context.sleep("sleep for first timeout", 3);
681+
682+
while (true) {
683+
const waiterExists = await context.run("check waiters", async () => {
684+
const waiters = await makeGetWaitersRequest(context.qstashClient.http, eventId);
685+
686+
return Boolean(waiters);
687+
});
688+
689+
if (waiterExists) {
690+
break;
691+
}
692+
}
693+
const { notifyResponse } = await context.notify("notify-step", eventId, payload);
694+
expect(notifyResponse.length).toBeTruthy();
695+
finishState.finish();
696+
},
697+
});
698+
699+
await Promise.all([waitingEndpoint, notifyingEndpoint]);
700+
};
701+
702+
test(
703+
"should handle string event data",
704+
async () => {
705+
await testNotifyWithContext("event-data");
706+
},
707+
{ timeout: 170000 }
708+
);
709+
710+
test(
711+
"should handle object event data",
712+
async () => {
713+
await testNotifyWithContext({ event: "data" });
714+
},
715+
{ timeout: 170000 }
716+
);
717+
});
654718
});
655719
});

src/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,13 @@ export type NotifyStepResponse = {
275275
/**
276276
* notified event id
277277
*/
278-
eventId: string,
278+
eventId: string;
279279
/**
280280
* event data sent with notify
281281
*/
282-
eventData: unknown,
282+
eventData: unknown;
283283
/**
284284
* response from notify
285285
*/
286-
notifyResponse: NotifyResponse[]
287-
}
286+
notifyResponse: NotifyResponse[];
287+
};

0 commit comments

Comments
 (0)