Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 119 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.1.8",
"version": "1.1.9",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down Expand Up @@ -71,6 +71,7 @@
"@aws-sdk/client-sts": "^3.450.0",
"@aws-sdk/client-timestream-write": "^3.450.0",
"@aws-sdk/lib-dynamodb": "^3.450.0",
"@aws-sdk/s3-request-presigner": "3.490.0",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it matters since this is listed here as a fixed version, the version of this package currently available in the lambda runtime is 3.850.0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should this be added as an optional peer dependency so that folks using lambda stream can still install it without having this dependency installed, but IF this dep is installed, it enforces the version/s specified here I believe

"@aws-sdk/signature-v4-crt": "^3.450.0",
"@aws-sdk/util-dynamodb": "^3.450.0",
"@babel/cli": "^7.10.0",
Expand Down
49 changes: 48 additions & 1 deletion src/connectors/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
import { Readable } from 'stream';
import {
CopyObjectCommand,
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectVersionsCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import Promise from 'bluebird';
Expand Down Expand Up @@ -104,6 +110,23 @@ class Connector {
.then((response) => Readable.from(response.Body));
}

getSignedUrl(operation, Key, other = {}) {
const params = {
Bucket: this.bucketName,
Key,
...other,
};

return import('@aws-sdk/s3-request-presigner')
.then(({ getSignedUrl }) => Promise.resolve(getSignedUrl(this.client,
operation === 'putObject'
? new PutObjectCommand(params)
: new GetObjectCommand(params),
other))
.tap(this.debug)
.tapCatch(this.debug));
}

listObjects(inputParams, ctx) {
const params = {
Bucket: this.bucketName,
Expand All @@ -113,6 +136,30 @@ class Connector {
return this._sendCommand(new ListObjectsV2Command(params), ctx);
}

listObjectVersions({
last, limit, Bucket, Prefix,
}, ctx) {
const params = {
Bucket: Bucket || this.bucketName,
Prefix,
MaxKeys: limit,
...(last
? /* istanbul ignore next */ JSON.parse(Buffer.from(last, 'base64').toString())
: {}),
};

return this._sendCommand(new ListObjectVersionsCommand(params), ctx)
.then((data) => ({
last: data.IsTruncated
? /* istanbul ignore next */ Buffer.from(JSON.stringify({
KeyMarker: data.NextKeyMarker,
VersionIdMarker: data.NextVersionIdMarker,
})).toString('base64')
: undefined,
data,
}));
}

copyObject(inputParams, ctx) {
const params = {
Bucket: this.bucketName,
Expand Down
36 changes: 36 additions & 0 deletions src/from/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,32 @@ export const fromS3 = (event) =>
record,
}));

export const fromSqsS3 = (event) =>
_(event.Records)
// sqs
.map((record) =>
// create a unit-of-work for each message
// so we can correlate related work for error handling
({
record: {
sqs: record,
},
}))
// s3
.map((uow) => ({
record: {
...uow.record,
s3: JSON.parse(uow.record.sqs.body),
},
}))
.flatMap((uow) => fromS3(uow.record.s3)
.map((uow2) => ({
record: {
...uow.record,
s3: uow2.record,
},
})));

export const fromSqsSnsS3 = (event) =>
_(event.Records)
// sqs
Expand Down Expand Up @@ -109,6 +135,16 @@ export const toS3Records = (notifications) => ({
})),
});

export const toSqsS3Records = (notifications) => ({
Records: ([{
body: JSON.stringify({
Records: notifications.map((s3) => ({
s3,
})),
}),
}]),
});

export const toSqsSnsS3Records = (notifications) => ({
Records: ([{
body: JSON.stringify({
Expand Down
81 changes: 80 additions & 1 deletion test/unit/connectors/s3.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ import { Readable } from 'stream';
import { mockClient } from 'aws-sdk-client-mock';
import {
CopyObjectCommand,
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
ListObjectVersionsCommand,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import { sdkStreamMixin } from '@smithy/util-stream';
import * as s3RequestPresigner from '@aws-sdk/s3-request-presigner/dist-cjs/getSignedUrl';

import { v4 } from 'uuid';
import Connector from '../../../src/connectors/s3';
Expand All @@ -27,6 +34,55 @@ describe('connectors/s3.js', () => {
sinon.restore();
});

it('should get a signed url', async () => {
// const spy = sinon.spy(() => 'https://123/456');
// mockS3.on(PutObjectCommand).callsFake(spy);
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');

const data = await new Connector({ debug: debug('s3') })
.getSignedUrl('putObject', '1/2');
expect(spy).to.have.been.calledOnce;
// expect(spy).to.have.been.calledWith('putObject', {
// Bucket: 'b1',
// Key: '1/2',
// });
expect(data).to.equal('https://123/456');
});

it('should get a signed url for putObject', async () => {
// const spy = sinon.spy(() => 'https://123/456');
// mockS3.on(PutObjectCommand).callsFake(spy);
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');

const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
.getSignedUrl('putObject', '1/2');

expect(spy).to.have.been.calledOnce;
// expect(spy).to.have.been.calledWith('putObject', {
// Bucket: 'b1',
// Key: '1/2',
// // ContentType: 'application/pdf',
// // ACL: 'private',
// });
expect(data).to.equal('https://123/456');
});

it('should get a signed url for getObject', async () => {
// const spy = sinon.spy(() => 'https://123/456');
// mockS3.on(GetObjectCommand).callsFake(spy);
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');

const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
.getSignedUrl('getObject', '1/2');

expect(spy).to.have.been.calledOnce;
// expect(spy).to.have.been.calledWith('getObject', {
// Bucket: 'b1',
// Key: '1/2',
// });
expect(data).to.equal('https://123/456');
});

it('should reuse client per pipeline', () => {
const client1 = Connector.getClient('test1', debug('test'));
const client2 = Connector.getClient('test1', debug('test'));
Expand Down Expand Up @@ -161,6 +217,29 @@ describe('connectors/s3.js', () => {
expect(data).to.deep.equal({ DeleteMarker: false });
});

it('should list object versions', async () => {
const spy = sinon.spy(() => [{ VersionId: 'v1' }]);
mockS3.on(ListObjectVersionsCommand).callsFake(spy);

const inputParams = {
Prefix: 'k1',
limit: 20,
};

const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
.listObjectVersions(inputParams);

expect(spy).to.have.been.calledWith({
Bucket: 'b1',
Prefix: 'k1',
MaxKeys: 20,
});
expect(data).to.deep.equal({
last: undefined,
data: [{ VersionId: 'v1' }],
});
});

it('should list objects', async () => {
const spy = sinon.spy(() => ({
IsTruncated: false,
Expand Down
Loading