@@ -22,10 +22,13 @@ import {
22
22
ProtocolMode ,
23
23
ServiceDiscoveryResponse ,
24
24
} from "../generated/proto/discovery" ;
25
+ import { Event } from "../types/types" ;
26
+ import { StringKeyedEvent } from "../generated/dev/restate/events" ;
25
27
import {
26
28
FileDescriptorProto ,
27
29
UninterpretedOption ,
28
30
} from "../generated/google/protobuf/descriptor" ;
31
+ import { Empty } from "../generated/google/protobuf/empty" ;
29
32
import {
30
33
FileDescriptorProto as FileDescriptorProto1 ,
31
34
ServiceDescriptorProto as ServiceDescriptorProto1 ,
@@ -45,6 +48,7 @@ import { RestateContext, useContext } from "../restate_context";
45
48
import { RpcContextImpl } from "../restate_context_impl" ;
46
49
import { verifyAssumptions } from "../utils/assumpsions" ;
47
50
import { TerminalError } from "../public_api" ;
51
+ import { isEventHandler } from "../types/router" ;
48
52
49
53
export interface ServiceOpts {
50
54
descriptor : ProtoMetadata ;
@@ -148,6 +152,78 @@ export abstract class BaseRestateServer {
148
152
}
149
153
}
150
154
155
+ rpcHandler (
156
+ keyed : boolean ,
157
+ route : string ,
158
+ handler : Function
159
+ ) : {
160
+ descriptor : MethodDescriptorProto1 ;
161
+ method : GrpcServiceMethod < unknown , unknown > ;
162
+ } {
163
+ const descriptor = createRpcMethodDescriptor ( route ) ;
164
+
165
+ const localMethod = ( instance : unknown , input : RpcRequest ) => {
166
+ const ctx = useContext ( instance ) ;
167
+ if ( keyed ) {
168
+ return dispatchKeyedRpcHandler ( ctx , input , handler ) ;
169
+ } else {
170
+ return dispatchUnkeyedRpcHandler ( ctx , input , handler ) ;
171
+ }
172
+ } ;
173
+
174
+ const decoder = RpcRequest . decode ;
175
+ const encoder = ( message : RpcResponse ) =>
176
+ RpcResponse . encode ( message ) . finish ( ) ;
177
+
178
+ const method = new GrpcServiceMethod < RpcRequest , RpcResponse > (
179
+ route ,
180
+ route ,
181
+ localMethod ,
182
+ decoder ,
183
+ encoder
184
+ ) ;
185
+
186
+ return {
187
+ descriptor : descriptor ,
188
+ method : method as GrpcServiceMethod < unknown , unknown > ,
189
+ } ;
190
+ }
191
+
192
+ stringKeyedEventHandler (
193
+ keyed : boolean ,
194
+ route : string ,
195
+ handler : Function
196
+ ) : {
197
+ descriptor : MethodDescriptorProto1 ;
198
+ method : GrpcServiceMethod < unknown , unknown > ;
199
+ } {
200
+ if ( ! keyed ) {
201
+ // TODO: support unkeyed rpc event handler
202
+ throw new TerminalError ( "Unkeyed Event handlers are not yet supported." ) ;
203
+ }
204
+ const descriptor = createStringKeyedMethodDescriptor ( route ) ;
205
+ const localMethod = ( instance : unknown , input : StringKeyedEvent ) => {
206
+ const ctx = useContext ( instance ) ;
207
+ return dispatchKeyedEventHandler ( ctx , input , handler ) ;
208
+ } ;
209
+
210
+ const decoder = StringKeyedEvent . decode ;
211
+ const encoder = ( message : Empty ) => Empty . encode ( message ) . finish ( ) ;
212
+
213
+ const method = new GrpcServiceMethod < StringKeyedEvent , Empty > (
214
+ route ,
215
+ route ,
216
+ localMethod ,
217
+ decoder ,
218
+ encoder
219
+ ) ;
220
+
221
+ return {
222
+ descriptor,
223
+ method : method as GrpcServiceMethod < unknown , unknown > ,
224
+ } ;
225
+ }
226
+
151
227
protected bindRpcService ( name : string , router : RpcRouter , keyed : boolean ) {
152
228
const lastDot = name . indexOf ( "." ) ;
153
229
const serviceName = lastDot === - 1 ? name : name . substring ( lastDot + 1 ) ;
@@ -161,40 +237,33 @@ export abstract class BaseRestateServer {
161
237
? pushKeyedService ( desc , name )
162
238
: pushUnKeyedService ( desc , name ) ;
163
239
164
- const decoder = RpcRequest . decode ;
165
- const encoder = ( message : RpcResponse ) =>
166
- RpcResponse . encode ( message ) . finish ( ) ;
167
-
168
240
for ( const [ route , handler ] of Object . entries ( router ) ) {
169
- serviceGrpcSpec . method . push ( createRpcMethodDescriptor ( route ) ) ;
170
-
171
- const localFn = ( instance : unknown , input : RpcRequest ) => {
172
- const ctx = useContext ( instance ) ;
173
- if ( keyed ) {
174
- return dispatchKeyedRpcHandler ( ctx , input , handler ) ;
175
- } else {
176
- return dispatchUnkeyedRpcHandler ( ctx , input , handler ) ;
177
- }
241
+ let registration : {
242
+ descriptor : MethodDescriptorProto1 ;
243
+ method : GrpcServiceMethod < unknown , unknown > ;
178
244
} ;
179
245
180
- const method = new GrpcServiceMethod < RpcRequest , RpcResponse > (
181
- route ,
182
- route ,
183
- localFn ,
184
- decoder ,
185
- encoder
186
- ) ;
187
-
246
+ if ( isEventHandler ( handler ) ) {
247
+ const theHandler = handler . handler ;
248
+ registration = this . stringKeyedEventHandler ( keyed , route , theHandler ) ;
249
+ } else {
250
+ registration = this . rpcHandler ( keyed , route , handler ) ;
251
+ }
252
+ serviceGrpcSpec . method . push ( registration . descriptor ) ;
188
253
const url = `/invoke/${ name } /${ route } ` ;
189
254
this . methods [ url ] = new HostedGrpcServiceMethod (
190
255
{ } , // we don't actually execute on any class instance
191
256
servicePackage ,
192
257
serviceName ,
193
- method
258
+ registration . method
194
259
) as HostedGrpcServiceMethod < unknown , unknown > ;
195
260
196
261
rlog . info (
197
- `Registering: ${ url } -> ${ JSON . stringify ( method , null , "\t" ) } `
262
+ `Registering: ${ url } -> ${ JSON . stringify (
263
+ registration . method ,
264
+ null ,
265
+ "\t"
266
+ ) } `
198
267
) ;
199
268
}
200
269
@@ -376,6 +445,25 @@ async function dispatchUnkeyedRpcHandler(
376
445
return RpcResponse . create ( { response : result } ) ;
377
446
}
378
447
448
+ async function dispatchKeyedEventHandler (
449
+ origCtx : RestateContext ,
450
+ req : StringKeyedEvent ,
451
+ handler : Function
452
+ ) : Promise < Empty > {
453
+ const ctx = new RpcContextImpl ( origCtx ) ;
454
+ const key = req . key ;
455
+ if ( typeof key !== "string" || key . length === 0 ) {
456
+ // we throw a terminal error here, because this cannot be patched by updating code:
457
+ // if the request is wrong (missing a key), the request can never make it
458
+ throw new TerminalError (
459
+ "Keyed handlers must recieve a non null or empty string key"
460
+ ) ;
461
+ }
462
+ const jsEvent = new Event ( key , req . payload , req . source , req . attributes ) ;
463
+ await handler ( ctx , jsEvent ) ;
464
+ return Empty . create ( { } ) ;
465
+ }
466
+
379
467
function copyProtoMetadata (
380
468
original : RpcServiceProtoMetadata
381
469
) : RpcServiceProtoMetadata {
@@ -458,4 +546,14 @@ function createRpcMethodDescriptor(methodName: string): MethodDescriptorProto1 {
458
546
return desc ;
459
547
}
460
548
549
+ function createStringKeyedMethodDescriptor (
550
+ methodName : string
551
+ ) : MethodDescriptorProto1 {
552
+ const desc = {
553
+ ...rpcServiceProtoMetadata . fileDescriptor . service [ 0 ] . method [ 1 ] ,
554
+ } as MethodDescriptorProto1 ;
555
+ desc . name = methodName ;
556
+ return desc ;
557
+ }
558
+
461
559
const dynrpcDescriptor = copyProtoMetadata ( rpcServiceProtoMetadata ) ;
0 commit comments