Skip to content

Commit 0ca7af7

Browse files
authored
Merge pull request #90 from teunw/feature/failure-destination-kafka
feat: add a failure destination to the Kafka configuration
2 parents 2d70134 + 14b2f26 commit 0ca7af7

File tree

4 files changed

+64
-0
lines changed

4 files changed

+64
-0
lines changed

docs/events/kafka.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,25 @@ functions:
153153
- eventName: INSERT
154154
```
155155

156+
## Failure destinations
157+
158+
A failure destination allows you to send messages that failed processing repeatedly to be sent to an SNS Topic, SQS Queue or another lambda function.
159+
By default, Lambda discards oversized records (> 6MB) and records that fail all retry attempts.
160+
Configuring an on-failure destination ensures that you don't lose data when your Lambda function encounters errors.
161+
For more information, see [capturing discarded batches for a self-managed Apache Kafka event source](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka-on-failure.html)
162+
163+
```yml
164+
functions:
165+
compute:
166+
handler: handler.compute
167+
events:
168+
- kafka:
169+
accessConfigurations:
170+
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
171+
topic: MySelfManagedKafkaTopic
172+
onFailureDestination: arn:aws:sqs:us-east-1:01234567890:my-queue
173+
```
174+
156175
## IAM Permissions
157176

158177
The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) for more information about IAM Permissions for Kafka events.

lib/plugins/aws/package/compile/events/kafka.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class AwsCompileKafkaEvents {
9494
topic: {
9595
type: 'string',
9696
},
97+
onFailureDestination: {
98+
type: 'string',
99+
},
97100
consumerGroupId: {
98101
type: 'string',
99102
maxLength: 200,
@@ -286,6 +289,15 @@ class AwsCompileKafkaEvents {
286289
};
287290
}
288291

292+
const onFailureDestination = event.kafka.onFailureDestination;
293+
if (onFailureDestination) {
294+
kafkaResource.Properties.DestinationConfig = {
295+
OnFailure: {
296+
Destination: onFailureDestination,
297+
},
298+
};
299+
}
300+
289301
const provisionedPollerConfig = event.kafka.provisionedPollerConfig;
290302
if (provisionedPollerConfig) {
291303
kafkaResource.Properties.ProvisionedPollerConfig = {

test/unit/lib/plugins/aws/package/compile/events/kafka.test.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,38 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
550550
});
551551
});
552552

553+
describe('onFailureDestination', () => {
554+
it('should correctly compile EventSourceMapping resource properties for onFailureDestination', async () => {
555+
const { awsNaming, cfTemplate } = await runServerless({
556+
fixture: 'function',
557+
configExt: {
558+
functions: {
559+
basic: {
560+
role: { 'Fn::ImportValue': 'MyImportedRole' },
561+
events: [
562+
{
563+
kafka: {
564+
topic,
565+
bootstrapServers: ['abc.xyz:9092'],
566+
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
567+
onFailureDestination: 'arn:aws:sqs:eu-central-1:000000000000:some-queue',
568+
},
569+
},
570+
],
571+
},
572+
},
573+
},
574+
command: 'package',
575+
});
576+
577+
const eventSourceMappingResource =
578+
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
579+
expect(
580+
eventSourceMappingResource.Properties.DestinationConfig.OnFailure.Destination
581+
).to.equal('arn:aws:sqs:eu-central-1:000000000000:some-queue');
582+
});
583+
});
584+
553585
describe('startingPositionTimestamp', () => {
554586
it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => {
555587
await expect(

types/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ export interface AWS {
384384
topic: string;
385385
consumerGroupId?: string;
386386
filterPatterns?: FilterPatterns;
387+
onFailureDestination?: string;
387388
};
388389
}
389390
| {

0 commit comments

Comments
 (0)