Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Or have a look at the general catalog below:
| <a id="promise-as-a-service">Durable Promises as a Service</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) |
| <a id="priority-queue">Priority Queue</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#priority-queue) |
| <a id="rate-limiting">Rate Limiting</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#rate-limiting) [<img src="https://skillicons.dev/icons?i=go" width="24" height="24">](go/patterns-use-cases/README.md#rate-limiting) |
| <a id="fixed-window-counter">Fixed Window Counter</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#fixed-window-counter) |
| <a id="ai">AI: agents, LLM calls, MCP, A2A,...</a> | [AI examples repo](https://github.com/restatedev/ai-examples) |

#### Integrations
Expand Down
1 change: 1 addition & 0 deletions typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Common tasks and patterns implemented with Restate:
- **[Durable Promises as a Service](patterns-use-cases/README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/promiseasaservice)
- **[Priority Queue](patterns-use-cases/README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/priorityqueue)
- **[Rate Limiting](patterns-use-cases/README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/ratelimit)
- **[Fixed Window Counter](patterns-use-cases/README.md#fixed-window-counter)**: Example of implementing a fixed window counter for tracking events over time periods. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/fixedwindowcounter)

## Integrations

Expand Down
26 changes: 26 additions & 0 deletions typescript/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Use Restate to build distributed coordination and synchronization constructs:
- **[Durable Promises as a Service](README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/promiseasaservice)
- **[Priority Queue](README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/priorityqueue)
- **[Rate Limiting](README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/ratelimit)
- **[Fixed Window Counter](README.md#fixed-window-counter)**: Example of implementing a fixed window counter for tracking events over time periods. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/fixedwindowcounter)

First, install the dependencies:

Expand Down Expand Up @@ -994,3 +995,28 @@ You should observe that only one request is processed per second. You can then t
and sending more requests.

</details>

## Fixed Window Counter
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/fixedwindowcounter)

An example of implementing a fixed window counter for tracking events over time periods using Restate state.

<details>
<summary><strong>Running the example</strong></summary>

Run the example with `npx tsx watch ./src/fixedwindowcounter/app.ts`.

You can track events like this:
```shell
# add a single event
curl localhost:8080/counter/myKey/add
# add lots
for i in $(seq 1 30); do curl localhost:8080/counter/myKey/add; done
```

You can then see the count over a particular period, eg 25 seconds:
```shell
curl http://localhost:8080/counter/myKey/count -H 'content-type:application/json' -d '{"periodMillis": 25000}'
```

</details>
125 changes: 125 additions & 0 deletions typescript/patterns-use-cases/src/fixedwindowcounter/counter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import {
endpoint,
handlers,
object,
ObjectContext,
ObjectSharedContext,
TerminalError,
} from "@restatedev/restate-sdk";

interface CounterState {
entries: Entry[];
}

type Entry = [/* bucket */ number, /* count */ number];

// here you determine how buckets are formed; in this case we have second-precision
function toBucket(unixMillis: number): number {
return Math.floor(unixMillis / 1000);
}

// here you limit the amount of history that we keep; we don't want state to become arbitrarily large so we remove
// the oldest buckets once the total number of buckets exceeds this number
// 300 buckets with 1 second precision -> 5 minutes
// in theory as long as your state for a given key is < 1mB you can push this number much higher. it must be high
// enough to cover the oldest start time you want to count from
const MAX_BUCKETS = 300;

interface AddRequest {
// the unix milli timestamp of the time of the event; optional, defaults to now
timeMillis?: number;
}

type CountRequest =
| {
// the unix milli timestamp of the start of the period in which to count
startMillis: number;

// the unix milli timestamp of the end of the period in which to count; optional, defaults to including all entries
// the end bucket is not included, so eg a 2000 milli period will mean two one-second buckets, not three
endMillis?: number;
}
| {
// how far in the past to count, in milliseconds
periodMillis: number;
};

const counter = object({
name: "counter",
handlers: {
add: async (ctx: ObjectContext<CounterState>, request?: AddRequest) => {
const bucket = toBucket(request?.timeMillis ?? (await ctx.date.now()));
const entries = (await ctx.get("entries")) ?? [];

// find the last entry that is lower or equal to the one we want
// we start at the end because generally we'd expect the insertion time to be very recent (but we don't rely on this)
const lastEntryIndex = entries.findLastIndex(
(entry) => entry[0] <= bucket,
);
if (lastEntryIndex == -1) {
// there are no lower or equal entries, this entry goes at the start
entries.splice(0, 0, [bucket, 1]);
} else if (entries[lastEntryIndex][0] == bucket) {
// this bucket already exists; increment it
entries[lastEntryIndex][1] += 1;
} else {
// this bucket does not exist; insert it
entries.splice(lastEntryIndex + 1, 0, [bucket, 1]);
}

// maintain history limit
if (entries.length > MAX_BUCKETS) {
entries.splice(0, entries.length - MAX_BUCKETS);
}

ctx.set("entries", entries);
},
// by making this a shared handler we can handle a lot more read throughput, however it means the count is based on snapshot of this object,
// so can be slightly out of date when there are concurrent calls to add. change it to a non-shared handler if thats a concern.
count: handlers.object.shared(
async (
ctx: ObjectSharedContext<CounterState>,
request: CountRequest,
): Promise<number> => {
let startBucket: number;
let endBucket: number | undefined;

if (request && "startMillis" in request) {
startBucket = toBucket(request.startMillis);
endBucket = request.endMillis
? toBucket(request.endMillis)
: undefined;
} else if (request && "periodMillis" in request) {
const now = await ctx.date.now();
startBucket = toBucket(now - request.periodMillis);
endBucket = undefined;
} else {
throw new TerminalError(
"count requires at least a parameter 'startMillis' or 'periodMillis'",
);
}

const entries = (await ctx.get("entries")) ?? [];

// find the first entry that is greater than or equal to the start
const startIndex =
entries.findLastIndex((entry) => entry[0] < startBucket) + 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions greater than or equal but the code uses <

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, its accurate though, because of the + 1; we find the last entry that is less than the start bucket, and add one, so we have found the first entry that is greater than or equal to the start bucket


// find the first entry that is greater than or equal to the end
// the entry will not be included
const endIndex = endBucket
? entries.findLastIndex((entry) => entry[0] < endBucket) + 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions greater than or equal but the code uses <

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, the comment is accurate because of the + 1

: entries.length;

let count = 0;
for (let i = startIndex; i < endIndex; i++) {
count += entries[i][1];
}

return count;
},
),
},
});

endpoint().bind(counter).listen();
Loading