diff --git a/README.md b/README.md index 72b952de..771b6552 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,7 @@ Or have a look at the general catalog below: | Durable Promises as a Service | [](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) | | Priority Queue | [](typescript/patterns-use-cases/README.md#priority-queue) | | Rate Limiting | [](typescript/patterns-use-cases/README.md#rate-limiting) [](go/patterns-use-cases/README.md#rate-limiting) | +| Fixed Window Counter | [](typescript/patterns-use-cases/README.md#fixed-window-counter) | | AI: agents, LLM calls, MCP, A2A,... | [AI examples repo](https://github.com/restatedev/ai-examples) | #### Integrations diff --git a/typescript/README.md b/typescript/README.md index 5a25591d..5adcb518 100644 --- a/typescript/README.md +++ b/typescript/README.md @@ -44,6 +44,7 @@ Common tasks and patterns implemented with Restate: - **[Durable Promises as a Service](patterns-use-cases/README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [](patterns-use-cases/src/promiseasaservice) - **[Priority Queue](patterns-use-cases/README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [](patterns-use-cases/src/priorityqueue) - **[Rate Limiting](patterns-use-cases/README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [](patterns-use-cases/src/ratelimit) +- **[Fixed Window Counter](patterns-use-cases/README.md#fixed-window-counter)**: Example of implementing a fixed window counter for tracking events over time periods. [](patterns-use-cases/src/fixedwindowcounter) ## Integrations diff --git a/typescript/patterns-use-cases/README.md b/typescript/patterns-use-cases/README.md index 3cb89b16..d9b4488b 100644 --- a/typescript/patterns-use-cases/README.md +++ b/typescript/patterns-use-cases/README.md @@ -30,6 +30,7 @@ Use Restate to build distributed coordination and synchronization constructs: - **[Durable Promises as a Service](README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [](src/promiseasaservice) - **[Priority Queue](README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [](src/priorityqueue) - **[Rate Limiting](README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [](src/ratelimit) +- **[Fixed Window Counter](README.md#fixed-window-counter)**: Example of implementing a fixed window counter for tracking events over time periods. [](src/fixedwindowcounter) First, install the dependencies: @@ -994,3 +995,28 @@ You should observe that only one request is processed per second. You can then t and sending more requests. + +## Fixed Window Counter +[](src/fixedwindowcounter) + +An example of implementing a fixed window counter for tracking events over time periods using Restate state. + +
+Running the example + +Run the example with `npx tsx watch ./src/fixedwindowcounter/app.ts`. + +You can track events like this: +```shell +# add a single event +curl localhost:8080/counter/myKey/add +# add lots +for i in $(seq 1 30); do curl localhost:8080/counter/myKey/add; done +``` + +You can then see the count over a particular period, eg 25 seconds: +```shell +curl http://localhost:8080/counter/myKey/count -H 'content-type:application/json' -d '{"periodMillis": 25000}' +``` + +
diff --git a/typescript/patterns-use-cases/src/fixedwindowcounter/counter.ts b/typescript/patterns-use-cases/src/fixedwindowcounter/counter.ts new file mode 100644 index 00000000..ba7a50be --- /dev/null +++ b/typescript/patterns-use-cases/src/fixedwindowcounter/counter.ts @@ -0,0 +1,125 @@ +import { + endpoint, + handlers, + object, + ObjectContext, + ObjectSharedContext, + TerminalError, +} from "@restatedev/restate-sdk"; + +interface CounterState { + entries: Entry[]; +} + +type Entry = [/* bucket */ number, /* count */ number]; + +// here you determine how buckets are formed; in this case we have second-precision +function toBucket(unixMillis: number): number { + return Math.floor(unixMillis / 1000); +} + +// here you limit the amount of history that we keep; we don't want state to become arbitrarily large so we remove +// the oldest buckets once the total number of buckets exceeds this number +// 300 buckets with 1 second precision -> 5 minutes +// in theory as long as your state for a given key is < 1mB you can push this number much higher. it must be high +// enough to cover the oldest start time you want to count from +const MAX_BUCKETS = 300; + +interface AddRequest { + // the unix milli timestamp of the time of the event; optional, defaults to now + timeMillis?: number; +} + +type CountRequest = + | { + // the unix milli timestamp of the start of the period in which to count + startMillis: number; + + // the unix milli timestamp of the end of the period in which to count; optional, defaults to including all entries + // the end bucket is not included, so eg a 2000 milli period will mean two one-second buckets, not three + endMillis?: number; + } + | { + // how far in the past to count, in milliseconds + periodMillis: number; + }; + +const counter = object({ + name: "counter", + handlers: { + add: async (ctx: ObjectContext, request?: AddRequest) => { + const bucket = toBucket(request?.timeMillis ?? (await ctx.date.now())); + const entries = (await ctx.get("entries")) ?? []; + + // find the last entry that is lower or equal to the one we want + // we start at the end because generally we'd expect the insertion time to be very recent (but we don't rely on this) + const lastEntryIndex = entries.findLastIndex( + (entry) => entry[0] <= bucket, + ); + if (lastEntryIndex == -1) { + // there are no lower or equal entries, this entry goes at the start + entries.splice(0, 0, [bucket, 1]); + } else if (entries[lastEntryIndex][0] == bucket) { + // this bucket already exists; increment it + entries[lastEntryIndex][1] += 1; + } else { + // this bucket does not exist; insert it + entries.splice(lastEntryIndex + 1, 0, [bucket, 1]); + } + + // maintain history limit + if (entries.length > MAX_BUCKETS) { + entries.splice(0, entries.length - MAX_BUCKETS); + } + + ctx.set("entries", entries); + }, + // by making this a shared handler we can handle a lot more read throughput, however it means the count is based on snapshot of this object, + // so can be slightly out of date when there are concurrent calls to add. change it to a non-shared handler if thats a concern. + count: handlers.object.shared( + async ( + ctx: ObjectSharedContext, + request: CountRequest, + ): Promise => { + let startBucket: number; + let endBucket: number | undefined; + + if (request && "startMillis" in request) { + startBucket = toBucket(request.startMillis); + endBucket = request.endMillis + ? toBucket(request.endMillis) + : undefined; + } else if (request && "periodMillis" in request) { + const now = await ctx.date.now(); + startBucket = toBucket(now - request.periodMillis); + endBucket = undefined; + } else { + throw new TerminalError( + "count requires at least a parameter 'startMillis' or 'periodMillis'", + ); + } + + const entries = (await ctx.get("entries")) ?? []; + + // find the first entry that is greater than or equal to the start + const startIndex = + entries.findLastIndex((entry) => entry[0] < startBucket) + 1; + + // find the first entry that is greater than or equal to the end + // the entry will not be included + const endIndex = endBucket + ? entries.findLastIndex((entry) => entry[0] < endBucket) + 1 + : entries.length; + + let count = 0; + for (let i = startIndex; i < endIndex; i++) { + count += entries[i][1]; + } + + return count; + }, + ), + }, +}); + +endpoint().bind(counter).listen();