|
| 1 | +import { |
| 2 | + endpoint, |
| 3 | + handlers, |
| 4 | + object, |
| 5 | + ObjectContext, |
| 6 | + ObjectSharedContext, |
| 7 | + TerminalError, |
| 8 | +} from "@restatedev/restate-sdk"; |
| 9 | + |
| 10 | +interface CounterState { |
| 11 | + entries: Entry[]; |
| 12 | +} |
| 13 | + |
| 14 | +type Entry = [/* bucket */ number, /* count */ number]; |
| 15 | + |
| 16 | +// here you determine how buckets are formed; in this case we have second-precision |
| 17 | +function toBucket(unixMillis: number): number { |
| 18 | + return Math.floor(unixMillis / 1000); |
| 19 | +} |
| 20 | + |
| 21 | +// here you limit the amount of history that we keep; we don't want state to become arbitrarily large so we remove |
| 22 | +// the oldest buckets once the total number of buckets exceeds this number |
| 23 | +// 300 buckets with 1 second precision -> 5 minutes |
| 24 | +// in theory as long as your state for a given key is < 1mB you can push this number much higher. it must be high |
| 25 | +// enough to cover the oldest start time you want to count from |
| 26 | +const MAX_BUCKETS = 300; |
| 27 | + |
| 28 | +interface AddRequest { |
| 29 | + // the unix milli timestamp of the time of the event; optional, defaults to now |
| 30 | + timeMillis?: number; |
| 31 | +} |
| 32 | + |
| 33 | +type CountRequest = |
| 34 | + | { |
| 35 | + // the unix milli timestamp of the start of the period in which to count |
| 36 | + startMillis: number; |
| 37 | + |
| 38 | + // the unix milli timestamp of the end of the period in which to count; optional, defaults to including all entries |
| 39 | + // the end bucket is not included, so eg a 2000 milli period will mean two one-second buckets, not three |
| 40 | + endMillis?: number; |
| 41 | + } |
| 42 | + | { |
| 43 | + // how far in the past to count, in milliseconds |
| 44 | + periodMillis: number; |
| 45 | + }; |
| 46 | + |
| 47 | +const counter = object({ |
| 48 | + name: "counter", |
| 49 | + handlers: { |
| 50 | + add: async (ctx: ObjectContext<CounterState>, request?: AddRequest) => { |
| 51 | + const bucket = toBucket(request?.timeMillis ?? (await ctx.date.now())); |
| 52 | + const entries = await getEntries(ctx); |
| 53 | + |
| 54 | + // find the last entry that is lower or equal to the one we want |
| 55 | + // we start at the end because generally we'd expect the insertion time to be very recent (but we don't rely on this) |
| 56 | + const lastEntryIndex = entries.findLastIndex( |
| 57 | + (entry) => entry[0] <= bucket, |
| 58 | + ); |
| 59 | + if (lastEntryIndex == -1) { |
| 60 | + // there are no lower or equal entries, this entry goes at the start |
| 61 | + entries.splice(0, 0, [bucket, 1]); |
| 62 | + } else if (entries[lastEntryIndex][0] == bucket) { |
| 63 | + // this bucket already exists; increment it |
| 64 | + entries[lastEntryIndex][1] += 1; |
| 65 | + } else { |
| 66 | + // this bucket does not exist; insert it |
| 67 | + entries.splice(lastEntryIndex + 1, 0, [bucket, 1]); |
| 68 | + } |
| 69 | + |
| 70 | + // maintain history limit |
| 71 | + if (entries.length > MAX_BUCKETS) { |
| 72 | + entries.splice(0, entries.length - MAX_BUCKETS); |
| 73 | + } |
| 74 | + |
| 75 | + ctx.set("entries", entries); |
| 76 | + }, |
| 77 | + // by making this a shared handler we can handle a lot more read throughput, however it means the count is based on snapshot of this object, |
| 78 | + // so can be slightly out of date when there are concurrent calls to add. change it to a non-shared handler if thats a concern. |
| 79 | + count: handlers.object.shared( |
| 80 | + async ( |
| 81 | + ctx: ObjectSharedContext<CounterState>, |
| 82 | + request: CountRequest, |
| 83 | + ): Promise<number> => { |
| 84 | + let startBucket: number; |
| 85 | + let endBucket: number | undefined; |
| 86 | + |
| 87 | + if (request && "startMillis" in request) { |
| 88 | + startBucket = toBucket(request.startMillis); |
| 89 | + endBucket = request.endMillis |
| 90 | + ? toBucket(request.endMillis) |
| 91 | + : undefined; |
| 92 | + } else if (request && "periodMillis" in request) { |
| 93 | + const now = await ctx.date.now(); |
| 94 | + startBucket = toBucket(now - request.periodMillis); |
| 95 | + endBucket = undefined; |
| 96 | + } else { |
| 97 | + throw new TerminalError( |
| 98 | + "count requires at least a parameter 'startMillis' or 'periodMillis'", |
| 99 | + ); |
| 100 | + } |
| 101 | + |
| 102 | + const entries = await getEntries(ctx); |
| 103 | + |
| 104 | + // find the first entry that is greater than or equal to the start |
| 105 | + const startIndex = |
| 106 | + entries.findLastIndex((entry) => entry[0] < startBucket) + 1; |
| 107 | + |
| 108 | + // find the first entry that is greater than or equal to the end |
| 109 | + // the entry will not be included |
| 110 | + const endIndex = endBucket |
| 111 | + ? entries.findLastIndex((entry) => entry[0] < endBucket) + 1 |
| 112 | + : entries.length; |
| 113 | + |
| 114 | + let count = 0; |
| 115 | + for (let i = startIndex; i < endIndex; i++) { |
| 116 | + count += entries[i][1]; |
| 117 | + } |
| 118 | + |
| 119 | + return count; |
| 120 | + }, |
| 121 | + ), |
| 122 | + }, |
| 123 | +}); |
| 124 | + |
| 125 | +async function getEntries( |
| 126 | + ctx: ObjectSharedContext<CounterState>, |
| 127 | +): Promise<Entry[]> { |
| 128 | + const events = await ctx.get("entries"); |
| 129 | + return events ?? []; |
| 130 | +} |
| 131 | + |
| 132 | +endpoint().bind(counter).listen(); |
0 commit comments