Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit 8f64a5d

Browse files
authored
fix: change testing strategdy to compare against graphql-ws (#34)
- fix error handling for validation and execution execution context errors to send an Error message - change the onSubscribe Callback to send an Error message to allow denying a subscription without trashing the connection - change the testing execution helper to have a stream of websocket events instead of using the graphql-ws client, this lets us more easily compare outputs between the servers - port testing to a branch of the arc sandbox that fully supports the api gateway management api (architect/sandbox#640) BREAKING CHANGE: onSubscribe now sends error messages instead of disconnecting
1 parent e93e318 commit 8f64a5d

File tree

12 files changed

+503
-221
lines changed

12 files changed

+503
-221
lines changed

lib/messages/connection_init.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import { deleteConnection } from '../utils/deleteConnection'
88
export const connection_init: MessageHandler<ConnectionInitMessage> =
99
async ({ server, event, message }) => {
1010
try {
11-
const res = server.onConnectionInit
12-
? await server.onConnectionInit({ event, message })
13-
: message.payload
11+
const payload = await server.onConnectionInit?.({ event, message }) ?? message.payload ?? {}
1412

1513
if (server.pingpong) {
1614
await new StepFunctions()
@@ -33,7 +31,7 @@ export const connection_init: MessageHandler<ConnectionInitMessage> =
3331
const connection = Object.assign(new server.model.Connection(), {
3432
id: event.requestContext.connectionId,
3533
requestContext: event.requestContext,
36-
payload: res,
34+
payload,
3735
})
3836
await server.mapper.put(connection)
3937
return sendMessage(server)({

lib/messages/subscribe-test.ts

Lines changed: 61 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ describe('messages/subscribe', () => {
6464
assert.include(subscriptions, { connectionId, subscriptionId: '1234' })
6565
})
6666

67-
it('disconnects on error', async () => {
67+
it('sends errors on error', async () => {
6868
const event: any = { requestContext: { connectedAt: 1628889982819, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'b6o5BPxb3', requestId: 'MaEe0DVon', requestTimeEpoch: 1628889983319, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"abcdefg","type":"subscribe","payload":{"query":"{ HIHOWEAREYOU }"}}' }
6969
const state: { delete: { ConnectionId: string }[], post: { ConnectionId: string, Data: string }[] } = { post: [], delete: [] }
7070
const server = await mockServerContext({
@@ -80,82 +80,87 @@ describe('messages/subscribe', () => {
8080
assert.deepEqual(state, {
8181
post: [
8282
{ ConnectionId, Data: JSON.stringify({ type: 'connection_ack' }) },
83+
{ ConnectionId, Data: JSON.stringify({ type: 'error', id: 'abcdefg', payload: [{
84+
message: 'Cannot query field "HIHOWEAREYOU" on type "Query".',
85+
locations: [{ line:1, column:3 }],
86+
},
87+
] }) },
8388
],
84-
delete: [
85-
{ ConnectionId },
86-
],
89+
delete: [],
8790
})
8891
})
89-
it('calls the global error callback on error', async () => {
90-
const event: any = { requestContext: { connectedAt: 1628889982819, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'b6o5BPxb3', requestId: 'MaEe0DVon', requestTimeEpoch: 1628889983319, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"abcdefg","type":"subscribe","payload":{"query":"{ HIHOWEAREYOU }"}}' }
92+
it('calls the global error callback server errors', async () => {
93+
const event: any = { requestContext: { connectedAt: 1628889982819, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'b6o5BPxb3', requestId: 'MaEe0DVon', requestTimeEpoch: 1628889983319, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"abcdefg","type":"subscribe","payload":{"query":"{ hello }"}}' }
9194
let error: any = null
95+
let sendErr = false
9296
const server = await mockServerContext({
9397
apiGatewayManagementApi: {
9498
// eslint-disable-next-line @typescript-eslint/no-empty-function
95-
postToConnection: () => ({ promise: async () => { } }),
99+
postToConnection: () => ({ promise: async () => { if(sendErr) { throw new Error('postToConnection Error') } } }),
96100
// eslint-disable-next-line @typescript-eslint/no-empty-function
97101
deleteConnection: () => ({ promise: async () => { } }),
98102
},
99103
// eslint-disable-next-line @typescript-eslint/no-empty-function
100104
onError: err => (error = err),
101105
})
102106
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
107+
sendErr = true
103108
await subscribe({ server, event, message: JSON.parse(event.body) })
104-
assert.match(error.message, /Cannot query field "HIHOWEAREYOU" on type "Query"/ )
109+
assert.match(error.message, /postToConnection Error/ )
105110
})
106111
describe('callbacks', () => {
107-
it('fires onSubscribe before subscribing', async () => {
108-
109-
const onSubscribe: string[] = []
112+
// this test doesn't make sense anymore because these error are now sent as error messages
113+
// it('fires onSubscribe before subscribing', async () => {
110114

111-
const typeDefs = `
112-
type Query {
113-
hello: String
114-
}
115-
type Subscription {
116-
greetings: String
117-
}
118-
`
119-
const resolvers = {
120-
Query: {
121-
hello: () => 'Hello World!',
122-
},
123-
Subscription: {
124-
greetings:{
125-
subscribe: pubsubSubscribe('greetings', {
126-
onSubscribe() {
127-
onSubscribe.push('We did it!')
128-
throw new Error('don\'t subscribe!')
129-
},
130-
}),
131-
resolve: ({ payload }) => {
132-
return payload
133-
},
134-
},
135-
},
136-
}
115+
// const onSubscribe: string[] = []
137116

138-
const schema = makeExecutableSchema({
139-
typeDefs,
140-
resolvers,
141-
})
142-
const server = await mockServerContext({
143-
schema,
144-
})
145-
const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
117+
// const typeDefs = `
118+
// type Query {
119+
// hello: String
120+
// }
121+
// type Subscription {
122+
// greetings: String
123+
// }
124+
// `
125+
// const resolvers = {
126+
// Query: {
127+
// hello: () => 'Hello World!',
128+
// },
129+
// Subscription: {
130+
// greetings:{
131+
// subscribe: pubsubSubscribe('greetings', {
132+
// onSubscribe() {
133+
// onSubscribe.push('We did it!')
134+
// throw new Error('don\'t subscribe!')
135+
// },
136+
// }),
137+
// resolve: ({ payload }) => {
138+
// return payload
139+
// },
140+
// },
141+
// },
142+
// }
146143

147-
await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
148-
try {
149-
await subscribe({ server, event, message: JSON.parse(event.body) })
150-
throw new Error('should not have subscribed')
151-
} catch (error) {
152-
assert.equal(error.message, 'don\'t subscribe!')
153-
}
154-
assert.deepEqual(onSubscribe, ['We did it!'])
155-
const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
156-
assert.isEmpty(subscriptions)
144+
// const schema = makeExecutableSchema({
145+
// typeDefs,
146+
// resolvers,
147+
// })
148+
// const server = await mockServerContext({
149+
// schema,
150+
// })
151+
// const event: any = { requestContext: { connectedAt: 1628889984369, connectionId, domainName: 'localhost:3339', eventType: 'MESSAGE', messageDirection: 'IN', messageId: 'el4MNdOJy', requestId: '0yd7bkvXz', requestTimeEpoch: 1628889984774, routeKey: '$default', stage: 'testing' }, isBase64Encoded: false, body: '{"id":"1234","type":"subscribe","payload":{"query":"subscription { greetings }"}}' }
157152

158-
})
153+
// await connection_init({ server, event: connectionInitEvent, message: JSON.parse(connectionInitEvent.body) })
154+
// try {
155+
// await subscribe({ server, event, message: JSON.parse(event.body) })
156+
// throw new Error('should not have subscribed')
157+
// } catch (error) {
158+
// assert.equal(error.message, 'don\'t subscribe!')
159+
// }
160+
// assert.deepEqual(onSubscribe, ['We did it!'])
161+
// const subscriptions = await collect(server.mapper.query(server.model.Subscription, { connectionId: equals(event.requestContext.connectionId) }, { indexName: 'ConnectionIndex' }))
162+
// assert.isEmpty(subscriptions)
163+
// })
159164
it('fires onAfterSubscribe after subscribing', async () => {
160165
const events: string[] = []
161166

lib/messages/subscribe.ts

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import AggregateError from 'aggregate-error'
21
import { SubscribeMessage, MessageType } from 'graphql-ws'
3-
import { validate, parse } from 'graphql'
2+
import { validate, parse, GraphQLError } from 'graphql'
43
import {
54
buildExecutionContext,
65
assertValidExecutionArguments,
@@ -26,22 +25,30 @@ export const subscribe: MessageHandler<SubscribeMessage> =
2625

2726
const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, event, message }) => {
2827
const connectionId = event.requestContext.connectionId
28+
server.log('subscribe %j', { connectionId, query: message.payload.query })
2929

3030
const connection = await server.mapper.get(
3131
Object.assign(new server.model.Connection(), {
3232
id: connectionId,
3333
}),
3434
)
35-
const connectionParams = connection.payload || {}
3635

3736
// Check for variable errors
3837
const errors = validateMessage(server)(message)
3938

4039
if (errors) {
41-
throw new AggregateError(errors)
40+
server.log('subscribe:validateError', errors)
41+
return sendMessage(server)({
42+
...event.requestContext,
43+
message: {
44+
type: MessageType.Error,
45+
id: message.id,
46+
payload: errors,
47+
},
48+
})
4249
}
4350

44-
const contextValue = await constructContext({ server, connectionParams, connectionId })
51+
const contextValue = await constructContext({ server, connectionParams: connection.payload, connectionId })
4552

4653
const execContext = buildExecutionContext(
4754
server.schema,
@@ -57,11 +64,9 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
5764
return sendMessage(server)({
5865
...event.requestContext,
5966
message: {
60-
type: MessageType.Next,
67+
type: MessageType.Error,
6168
id: message.id,
62-
payload: {
63-
errors: execContext,
64-
},
69+
payload: execContext,
6570
},
6671
})
6772
}
@@ -78,23 +83,35 @@ const setupSubscription: MessageHandler<SubscribeMessage> = async ({ server, eve
7883

7984
const { topicDefinitions, onSubscribe, onAfterSubscribe } = field.subscribe as SubscribePseudoIterable<PubSubEvent>
8085

81-
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
82-
await onSubscribe?.(root, args, context, info)
86+
try {
87+
server.log('onSubscribe', { onSubscribe: !!onSubscribe })
88+
await onSubscribe?.(root, args, context, info)
89+
} catch (error) {
90+
server.log('onSubscribe', { error })
91+
return sendMessage(server)({
92+
...event.requestContext,
93+
message: {
94+
type: MessageType.Error,
95+
id: message.id,
96+
payload: [new GraphQLError(error.message)],
97+
},
98+
})
99+
}
83100

84101
await Promise.all(topicDefinitions.map(async ({ topic, filter }) => {
85102
const filterData = typeof filter === 'function' ? await filter(root, args, context, info) : filter
86103

87104
const subscription = Object.assign(new server.model.Subscription(), {
88-
id: `${connectionId}|${message.id}`,
105+
id: `${connection.id}|${message.id}`,
89106
topic,
90107
filter: filterData || {},
91108
subscriptionId: message.id,
92109
subscription: {
93110
variableValues: args,
94111
...message.payload,
95112
},
96-
connectionId,
97-
connectionParams,
113+
connectionId: connection.id,
114+
connectionParams: connection.payload,
98115
requestContext: event.requestContext,
99116
ttl: connection.ttl,
100117
})
@@ -127,7 +144,7 @@ const validateMessage = (server: ServerClosure) => (message: SubscribeMessage) =
127144

128145
// eslint-disable-next-line @typescript-eslint/no-explicit-any
129146
async function executeQuery(server: ServerClosure, message: SubscribeMessage, contextValue: any, event: APIGatewayWebSocketEvent) {
130-
server.log('executeQuery', { connectionId: event.requestContext.connectionId })
147+
server.log('executeQuery', { connectionId: event.requestContext.connectionId, query: message.payload.query })
131148

132149
const result = await execute(
133150
server.schema,

0 commit comments

Comments
 (0)