diff --git a/package-lock.json b/package-lock.json index b5796d6..e7a6cf2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.8", + "version": "1.1.9", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.8", + "version": "1.1.9", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" @@ -26,6 +26,7 @@ "@aws-sdk/client-sts": "^3.450.0", "@aws-sdk/client-timestream-write": "^3.450.0", "@aws-sdk/lib-dynamodb": "^3.450.0", + "@aws-sdk/s3-request-presigner": "3.490.0", "@aws-sdk/signature-v4-crt": "^3.450.0", "@aws-sdk/util-dynamodb": "^3.450.0", "@babel/cli": "^7.10.0", @@ -5018,6 +5019,92 @@ "node": ">=14.0.0" } }, + "node_modules/@aws-sdk/s3-request-presigner": { + "version": "3.490.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/s3-request-presigner/-/s3-request-presigner-3.490.0.tgz", + "integrity": "sha512-ZHs+FlcTv9MKMM0b9svxxQio4FiRxDNstKYG8sbm9YEoahYV25h3K3butUiThaiOeYePOD7jHdbdXz4/XasxXg==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/signature-v4-multi-region": "3.489.0", + "@aws-sdk/types": "3.489.0", + "@aws-sdk/util-format-url": "3.489.0", + "@smithy/middleware-endpoint": "^2.3.0", + "@smithy/protocol-http": "^3.0.12", + "@smithy/smithy-client": "^2.2.1", + "@smithy/types": "^2.8.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@aws-sdk/s3-request-presigner/node_modules/@aws-sdk/middleware-sdk-s3": { + "version": "3.489.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-sdk-s3/-/middleware-sdk-s3-3.489.0.tgz", + "integrity": "sha512-/GGASx7mK9qEgy1znvleYMZKVqm3sOdGghqKdy2zgoGcH2jH+fZrLM0lDMT9bvdITmOCbJJs2rVHP3xm/ZWcXg==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/types": "3.489.0", + "@aws-sdk/util-arn-parser": "3.465.0", + "@smithy/node-config-provider": "^2.1.9", + "@smithy/protocol-http": "^3.0.12", + "@smithy/signature-v4": "^2.0.0", + "@smithy/smithy-client": "^2.2.1", + "@smithy/types": "^2.8.0", + "@smithy/util-config-provider": "^2.1.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@aws-sdk/s3-request-presigner/node_modules/@aws-sdk/signature-v4-multi-region": { + "version": "3.489.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/signature-v4-multi-region/-/signature-v4-multi-region-3.489.0.tgz", + "integrity": "sha512-kYFM7Opu36EkFlzXdVNOBFpQApgnuaTu/U/qYhGyuzeD+HNnYgZEsd/tDro1DQ074jVy3GN9ttJSYxq5I4oTkA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/middleware-sdk-s3": "3.489.0", + "@aws-sdk/types": "3.489.0", + "@smithy/protocol-http": "^3.0.12", + "@smithy/signature-v4": "^2.0.0", + "@smithy/types": "^2.8.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@aws-sdk/s3-request-presigner/node_modules/@aws-sdk/types": { + "version": "3.489.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/types/-/types-3.489.0.tgz", + "integrity": "sha512-kcDtLfKog/p0tC4gAeqJqWxAiEzfe2LRPnKamvSG2Mjbthx4R/alE2dxyIq/wW+nvRv0fqR3OD5kD1+eVfdr/w==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@smithy/types": "^2.8.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@aws-sdk/s3-request-presigner/node_modules/@aws-sdk/util-arn-parser": { + "version": "3.465.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/util-arn-parser/-/util-arn-parser-3.465.0.tgz", + "integrity": "sha512-zOJ82vzDJFqBX9yZBlNeHHrul/kpx/DCoxzW5UBbZeb26kfV53QhMSoEmY8/lEbBqlqargJ/sgRC845GFhHNQw==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@aws-sdk/signature-v4-crt": { "version": "3.816.0", "resolved": "https://registry.npmjs.org/@aws-sdk/signature-v4-crt/-/signature-v4-crt-3.816.0.tgz", @@ -5657,6 +5744,36 @@ "node": ">=14.0.0" } }, + "node_modules/@aws-sdk/util-format-url": { + "version": "3.489.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/util-format-url/-/util-format-url-3.489.0.tgz", + "integrity": "sha512-yqIf9RMdOSxMUrv1BVDmrYp5kjLh4RxA17BTqzcQK8cXkRBqBP8ydbCQXENSv8LZSMH7AnrXNHBD1eiVuKRzZw==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/types": "3.489.0", + "@smithy/querystring-builder": "^2.0.16", + "@smithy/types": "^2.8.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@aws-sdk/util-format-url/node_modules/@aws-sdk/types": { + "version": "3.489.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/types/-/types-3.489.0.tgz", + "integrity": "sha512-kcDtLfKog/p0tC4gAeqJqWxAiEzfe2LRPnKamvSG2Mjbthx4R/alE2dxyIq/wW+nvRv0fqR3OD5kD1+eVfdr/w==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@smithy/types": "^2.8.0", + "tslib": "^2.5.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/@aws-sdk/util-locate-window": { "version": "3.495.0", "resolved": "https://registry.npmjs.org/@aws-sdk/util-locate-window/-/util-locate-window-3.495.0.tgz", diff --git a/package.json b/package.json index ac1b263..645da34 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.8", + "version": "1.1.9", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", @@ -71,6 +71,7 @@ "@aws-sdk/client-sts": "^3.450.0", "@aws-sdk/client-timestream-write": "^3.450.0", "@aws-sdk/lib-dynamodb": "^3.450.0", + "@aws-sdk/s3-request-presigner": "3.490.0", "@aws-sdk/signature-v4-crt": "^3.450.0", "@aws-sdk/util-dynamodb": "^3.450.0", "@babel/cli": "^7.10.0", diff --git a/src/connectors/s3.js b/src/connectors/s3.js index 2993a2d..e897a17 100644 --- a/src/connectors/s3.js +++ b/src/connectors/s3.js @@ -3,7 +3,13 @@ import { Readable } from 'stream'; import { CopyObjectCommand, - DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, + DeleteObjectCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectVersionsCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, } from '@aws-sdk/client-s3'; import { NodeHttpHandler } from '@smithy/node-http-handler'; import Promise from 'bluebird'; @@ -104,6 +110,23 @@ class Connector { .then((response) => Readable.from(response.Body)); } + getSignedUrl(operation, Key, other = {}) { + const params = { + Bucket: this.bucketName, + Key, + ...other, + }; + + return import('@aws-sdk/s3-request-presigner') + .then(({ getSignedUrl }) => Promise.resolve(getSignedUrl(this.client, + operation === 'putObject' + ? new PutObjectCommand(params) + : new GetObjectCommand(params), + other)) + .tap(this.debug) + .tapCatch(this.debug)); + } + listObjects(inputParams, ctx) { const params = { Bucket: this.bucketName, @@ -113,6 +136,30 @@ class Connector { return this._sendCommand(new ListObjectsV2Command(params), ctx); } + listObjectVersions({ + last, limit, Bucket, Prefix, + }, ctx) { + const params = { + Bucket: Bucket || this.bucketName, + Prefix, + MaxKeys: limit, + ...(last + ? /* istanbul ignore next */ JSON.parse(Buffer.from(last, 'base64').toString()) + : {}), + }; + + return this._sendCommand(new ListObjectVersionsCommand(params), ctx) + .then((data) => ({ + last: data.IsTruncated + ? /* istanbul ignore next */ Buffer.from(JSON.stringify({ + KeyMarker: data.NextKeyMarker, + VersionIdMarker: data.NextVersionIdMarker, + })).toString('base64') + : undefined, + data, + })); + } + copyObject(inputParams, ctx) { const params = { Bucket: this.bucketName, diff --git a/src/from/s3.js b/src/from/s3.js index dea552b..9d563f2 100644 --- a/src/from/s3.js +++ b/src/from/s3.js @@ -15,6 +15,32 @@ export const fromS3 = (event) => record, })); +export const fromSqsS3 = (event) => + _(event.Records) + // sqs + .map((record) => + // create a unit-of-work for each message + // so we can correlate related work for error handling + ({ + record: { + sqs: record, + }, + })) + // s3 + .map((uow) => ({ + record: { + ...uow.record, + s3: JSON.parse(uow.record.sqs.body), + }, + })) + .flatMap((uow) => fromS3(uow.record.s3) + .map((uow2) => ({ + record: { + ...uow.record, + s3: uow2.record, + }, + }))); + export const fromSqsSnsS3 = (event) => _(event.Records) // sqs @@ -109,6 +135,16 @@ export const toS3Records = (notifications) => ({ })), }); +export const toSqsS3Records = (notifications) => ({ + Records: ([{ + body: JSON.stringify({ + Records: notifications.map((s3) => ({ + s3, + })), + }), + }]), +}); + export const toSqsSnsS3Records = (notifications) => ({ Records: ([{ body: JSON.stringify({ diff --git a/test/unit/connectors/s3.test.js b/test/unit/connectors/s3.test.js index baf11be..461fe04 100644 --- a/test/unit/connectors/s3.test.js +++ b/test/unit/connectors/s3.test.js @@ -6,9 +6,16 @@ import { Readable } from 'stream'; import { mockClient } from 'aws-sdk-client-mock'; import { CopyObjectCommand, - DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, + DeleteObjectCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectsV2Command, + ListObjectVersionsCommand, + PutObjectCommand, + S3Client, } from '@aws-sdk/client-s3'; import { sdkStreamMixin } from '@smithy/util-stream'; +import * as s3RequestPresigner from '@aws-sdk/s3-request-presigner/dist-cjs/getSignedUrl'; import { v4 } from 'uuid'; import Connector from '../../../src/connectors/s3'; @@ -27,6 +34,55 @@ describe('connectors/s3.js', () => { sinon.restore(); }); + it('should get a signed url', async () => { + // const spy = sinon.spy(() => 'https://123/456'); + // mockS3.on(PutObjectCommand).callsFake(spy); + const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456'); + + const data = await new Connector({ debug: debug('s3') }) + .getSignedUrl('putObject', '1/2'); + expect(spy).to.have.been.calledOnce; + // expect(spy).to.have.been.calledWith('putObject', { + // Bucket: 'b1', + // Key: '1/2', + // }); + expect(data).to.equal('https://123/456'); + }); + + it('should get a signed url for putObject', async () => { + // const spy = sinon.spy(() => 'https://123/456'); + // mockS3.on(PutObjectCommand).callsFake(spy); + const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456'); + + const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' }) + .getSignedUrl('putObject', '1/2'); + + expect(spy).to.have.been.calledOnce; + // expect(spy).to.have.been.calledWith('putObject', { + // Bucket: 'b1', + // Key: '1/2', + // // ContentType: 'application/pdf', + // // ACL: 'private', + // }); + expect(data).to.equal('https://123/456'); + }); + + it('should get a signed url for getObject', async () => { + // const spy = sinon.spy(() => 'https://123/456'); + // mockS3.on(GetObjectCommand).callsFake(spy); + const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456'); + + const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' }) + .getSignedUrl('getObject', '1/2'); + + expect(spy).to.have.been.calledOnce; + // expect(spy).to.have.been.calledWith('getObject', { + // Bucket: 'b1', + // Key: '1/2', + // }); + expect(data).to.equal('https://123/456'); + }); + it('should reuse client per pipeline', () => { const client1 = Connector.getClient('test1', debug('test')); const client2 = Connector.getClient('test1', debug('test')); @@ -161,6 +217,29 @@ describe('connectors/s3.js', () => { expect(data).to.deep.equal({ DeleteMarker: false }); }); + it('should list object versions', async () => { + const spy = sinon.spy(() => [{ VersionId: 'v1' }]); + mockS3.on(ListObjectVersionsCommand).callsFake(spy); + + const inputParams = { + Prefix: 'k1', + limit: 20, + }; + + const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' }) + .listObjectVersions(inputParams); + + expect(spy).to.have.been.calledWith({ + Bucket: 'b1', + Prefix: 'k1', + MaxKeys: 20, + }); + expect(data).to.deep.equal({ + last: undefined, + data: [{ VersionId: 'v1' }], + }); + }); + it('should list objects', async () => { const spy = sinon.spy(() => ({ IsTruncated: false, diff --git a/test/unit/from/s3.test.js b/test/unit/from/s3.test.js index ca2f0a1..e5b94ff 100644 --- a/test/unit/from/s3.test.js +++ b/test/unit/from/s3.test.js @@ -4,7 +4,7 @@ import sinon from 'sinon'; import debug from 'debug'; import { - fromS3, fromSqsSnsS3, fromS3Event, toS3Records, toSqsSnsS3Records, + fromS3, fromSqsS3, fromSqsSnsS3, fromS3Event, toS3Records, toSqsS3Records, toSqsSnsS3Records, } from '../../../src/from/s3'; import Connector from '../../../src/connectors/s3'; @@ -54,6 +54,36 @@ describe('from/s3s.js', () => { .done(done); }); + it('should parse sqs, s3 records', (done) => { + const event = toSqsS3Records([{ + bucket: { + name: 'mfe-main-stg-websitebucket-wrivy8kb9743', + }, + object: { + key: 'mfe-main/stg/mfe.json', + }, + }]); + + fromSqsS3(event) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(1); + expect(collected[0].record.s3).to.deep.equal({ + s3: { + bucket: { + name: 'mfe-main-stg-websitebucket-wrivy8kb9743', + }, + object: { + key: 'mfe-main/stg/mfe.json', + }, + }, + }); + }) + .done(done); + }); + it('should parse sqs sns, s3 records', (done) => { // const event = { // Records: [