diff --git a/INSTALL.md b/INSTALL.md index 4140fd0..a15e641 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -7,7 +7,7 @@ 1. Make sure you have the AWS CLI installed 1. Make sure you have a modern boto3 1. Make sure you have jq installed. -1. Create an S3 Bucket to act as the Antiope bucket +1. Create an [S3 Bucket](docs/AntiopeBucket.md) to act as the Antiope bucket. A CloudFormation template exists to do this. * **It is important that the bucket be created in the region you intend to run Antiope.** * This bucket contains all the packaged code, discovered resources and Reports. 1. You'll need cftdeploy python package & scripts: diff --git a/cloudformation/SplunkHEC-Template.yaml b/cloudformation/SplunkHEC-Template.yaml new file mode 100644 index 0000000..309bab7 --- /dev/null +++ b/cloudformation/SplunkHEC-Template.yaml @@ -0,0 +1,315 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: Deploy Splunk HEC push support for Antiope +Transform: AWS::Serverless-2016-10-31 + + +Parameters: + + pBucketName: + Description: Name of the Antiope Bucket + Type: String + + pAWSInventoryLambdaLayer: + Description: ARN Antiope AWS Lambda Layer + Type: String + + pSplunkHECSecret: + Description: Name of the AWS Secrets Manager secret with the HEC Token & Endpoint + Type: String + + pS3EventNotificationTopicArn: + Description: SNS Topic for the Splunk Ingest SQS Queue to subscribe to. + Type: String + Default: None + + pSQSMessageAlarmThreshold: + Description: If the Queue contains more than this number of message, fire an alarm + Type: Number + Default: 20000 + + +Resources: + + # + # Ingest Lambda + # + IngestLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + Path: / + Policies: + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Action: + - s3:* + Effect: Allow + Resource: + - !Sub "arn:aws:s3:::${pBucketName}/*" + - !Sub "arn:aws:s3:::${pBucketName}" + - Action: + - s3:ListAllMyBuckets + - s3:GetBucketLocation + Effect: Allow + Resource: '*' + - PolicyName: LambdaLogging + PolicyDocument: + Version: '2012-10-17' + Statement: + - Resource: '*' + Action: + - logs:* + Effect: Allow + - PolicyName: GetMessages + PolicyDocument: + Version: '2012-10-17' + Statement: + - Resource: !GetAtt SplunkHECEventQueue.Arn + Action: + - sqs:* + Effect: Allow + - PolicyName: GetSecret + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: "Allow" + Action: + - secretsmanager:GetSecret* + Resource: !Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${pSplunkHECSecret}-*" + + # + # Ingestion Function Functions + # + SplunkHECS3Function: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub "${AWS::StackName}-push-to-splunk" + Description: AWS Lamdba to pull data from S3 to index into Splunk + Handler: index.handler + Runtime: python3.6 + Timeout: 180 + MemorySize: 1024 + Role: !GetAtt IngestLambdaRole.Arn + Layers: + - !Ref pAWSInventoryLambdaLayer + ReservedConcurrentExecutions: 5 # Limit the concurrency on this function to avoid slamming Splunk too hard + Environment: + Variables: + INVENTORY_BUCKET: !Ref pBucketName + SQS_QUEUE_URL: !Ref SplunkHECEventQueue + LOG_LEVEL: INFO + HEC_DATA: !Ref pSplunkHECSecret + SECRET_REGION: !Ref AWS::Region + Events: + BatchWriteResources: + Type: SQS + Properties: + BatchSize: 500 + MaximumBatchingWindowInSeconds: 30 + Queue: !GetAtt SplunkHECEventQueue.Arn + InlineCode: | + import json + import os + import boto3 + from botocore.exceptions import ClientError + import urllib3 + from urllib.parse import unquote + + import logging + logger = logging.getLogger() + logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', default='INFO'))) + logging.getLogger('botocore').setLevel(logging.WARNING) + logging.getLogger('boto3').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + + def handler(event, _context): + logger.debug("Received event: " + json.dumps(event, sort_keys=True)) + + if hec_data is None: + logger.critical(f"Unable to fetch secret {os.environ['HEC_DATA']}") + raise Exception + logger.debug(f"HEC Endpoint: {hec_data['HECEndpoint']}") + count = 0 + s3 = boto3.client('s3') + + # Multiple layers of nesting to unpack with S3 Events, to SNS to SQS + for sns_record in event['Records']: + sns_message = json.loads(sns_record['body']) + sns_message2 = json.loads(sns_message['Message']) + logger.debug(f"sns_message2: {sns_message2}") + + for s3_record in sns_message2['Records']: + resource_to_index = get_object(s3_record['s3']['bucket']['name'], s3_record['s3']['object']['key'], s3) + if resource_to_index is None: + continue + + push_event(resource_to_index) + count += 1 + + logger.info(f"Wrote {count} resources to Splunk") + return() + + + def push_event(message): + + headers = {'Authorization': 'Splunk '+ hec_data['HECToken']} + payload = { "host": hec_data['HECEndpoint'], "event": message } + data=json.dumps(payload, default=str) + + try: + logger.debug(f"Sending data {data} to {hec_data['HECEndpoint']}") + r = http.request('POST', hec_data['HECEndpoint'], headers=headers, body=data) + if r.status != 200: + logger.critical(f"Error: {r.data}") + raise(Exception(f"HEC Error: {r.data}")) + else: + logger.debug(f"Success: {r.data}") + except Exception as e: + logger.critical(f"Error: {e}") + raise + + def get_secret(secret_name, region): + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name='secretsmanager', region_name=region) + + try: + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + except ClientError as e: + logger.critical(f"Client error {e} getting secret") + raise e + + else: + # Decrypts secret using the associated KMS CMK. + # Depending on whether the secret is a string or binary, one of these + # fields will be populated. + if 'SecretString' in get_secret_value_response: + secret = get_secret_value_response['SecretString'] + return json.loads(secret) + else: + decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary']) + return(decoded_binary_secret) + return None + + # Get the secret once per lambda container rather than on each invocation. + hec_data = get_secret(os.environ['HEC_DATA'], os.environ['SECRET_REGION']) + if hec_data is None: + logger.critical(f"Unable to fetch secret {os.environ['HEC_DATA']}") + raise Exception + # Reuse the PoolManager across invocations + http = urllib3.PoolManager() + + def get_object(bucket, obj_key, s3): + '''get the object to index from S3 and return the parsed json''' + try: + response = s3.get_object( + Bucket=bucket, + Key=unquote(obj_key) + ) + return(json.loads(response['Body'].read())) + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + logger.error("Unable to find resource s3://{}/{}".format(bucket, obj_key)) + else: + logger.error("Error getting resource s3://{}/{}: {}".format(bucket, obj_key, e)) + return(None) + + ### EOF ### + + + SplunkHECEventQueue: + Type: AWS::SQS::Queue + Properties: + # Note: SNS to SQS doesn't work if KMS is enabled. Since this SQS Queue only contains + # Account ID, Bucket and Object names, it's reasonably safe to not encrypt. YYMV + MessageRetentionPeriod: 36000 # Any messages older than ten hours are probably out-of-date + ReceiveMessageWaitTimeSeconds: 10 + VisibilityTimeout: 300 + + SplunkHECEventQueuePolicy: + Type: AWS::SQS::QueuePolicy + Properties: + Queues: + - !Ref SplunkHECEventQueue + PolicyDocument: + Version: '2012-10-17' + Id: AllowS3 + Statement: + - Sid: AllowS3EventNotification + Effect: Allow + Principal: + AWS: '*' + Action: + - SQS:SendMessage + Resource: !GetAtt SplunkHECEventQueue.Arn + Condition: + ArnLike: + aws:SourceArn: !Sub "arn:aws:s3:*:*:${pBucketName}" + - Sid: Allow-SNS-SendMessage + Effect: Allow + Principal: + AWS: '*' + Action: + - SQS:SendMessage + Resource: !GetAtt SplunkHECEventQueue.Arn + Condition: + ArnEquals: + aws:SourceArn: !Ref pS3EventNotificationTopicArn + + SplunkHECQueueSubscription: + Type: AWS::SNS::Subscription + Properties: + Endpoint: !GetAtt SplunkHECEventQueue.Arn + Protocol: sqs + TopicArn: !Ref pS3EventNotificationTopicArn + + # SplunkHECS3FunctionMapping: + # Type: AWS::Lambda::EventSourceMapping + # Properties: + # BatchSize: 10 # 10 is Max + # Enabled: True + # EventSourceArn: !GetAtt SplunkHECEventQueue.Arn + # FunctionName: !GetAtt SplunkHECS3Function.Arn + + SplunkHECEventQueueAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + ActionsEnabled: True + # AlarmActions: + # - TODO + AlarmDescription: "Alert when Queue doesn't properly drain" + AlarmName: !Sub "${AWS::StackName}-SearchQueueFull" + ComparisonOperator: GreaterThanOrEqualToThreshold + Dimensions: + - Name: QueueName + Value: !GetAtt SplunkHECEventQueue.QueueName + EvaluationPeriods: 1 + MetricName: ApproximateNumberOfMessagesVisible + Namespace: AWS/SQS + Period: 300 + Statistic: Average + Threshold: !Ref pSQSMessageAlarmThreshold + TreatMissingData: missing + +Outputs: + StackName: + Description: Name of this Stack + Value: !Ref AWS::StackName + + SplunkHECEventQueueArn: + Description: Arn of the SQS Queue S3 should send new events notifications to + Value: !GetAtt SplunkHECEventQueue.Arn + + SplunkHECEventQueueUrl: + Description: Arn of the SQS Queue S3 should send new events notifications to + Value: !Ref SplunkHECEventQueue diff --git a/cloudformation/antiope-Template.yaml b/cloudformation/antiope-Template.yaml index 69c7fad..85e70b0 100644 --- a/cloudformation/antiope-Template.yaml +++ b/cloudformation/antiope-Template.yaml @@ -181,6 +181,11 @@ Parameters: Description: Name of the Antiope ES Domain Type: String + pS3EventNotificationTopicArn: + Description: SNS Topic from the Antiope Bucket Stack. + Type: String + Default: None + Conditions: cDeployElasticSearch: !Equals [ !Ref pDeployElasticSearch, True] cDeployCustomStack: !Not [ !Equals [ !Ref pDeployCustomStackStateMachineArn, "NONE"] ] @@ -272,6 +277,7 @@ Resources: pDomainName: !Ref AWS::StackName pResourcePrefix: !Sub "${AWS::StackName}-search-cluster" pElasticSearchVersion: !Ref pElasticSearchVersion + pS3EventNotificationTopicArn: !Ref pS3EventNotificationTopicArn TemplateURL: ../search-cluster/cloudformation/SearchCluster-Template.yaml TimeoutInMinutes: 30 diff --git a/cloudformation/antiope-bucket-ImportTemplate.yaml b/cloudformation/antiope-bucket-ImportTemplate.yaml new file mode 100644 index 0000000..5ef6595 --- /dev/null +++ b/cloudformation/antiope-bucket-ImportTemplate.yaml @@ -0,0 +1,23 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: Create and Manage the Antiope S3 Bucket (and event notifications) + +Parameters: + + pBucketName: + Description: Name of the Antiope Bucket to hold all the data + Type: String + +Resources: + AntiopeBucket: + Type: AWS::S3::Bucket + DeletionPolicy: Retain + # DependsOn: AntiopeBucketNotificationTopicPolicy + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + BucketName: !Ref pBucketName + + diff --git a/cloudformation/antiope-bucket-Template.yaml b/cloudformation/antiope-bucket-Template.yaml new file mode 100644 index 0000000..ef4d69e --- /dev/null +++ b/cloudformation/antiope-bucket-Template.yaml @@ -0,0 +1,101 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: Create and Manage the Antiope S3 Bucket (and event notifications) + +Parameters: + + pBucketName: + Description: Name of the Antiope Bucket to hold all the data + Type: String + +Resources: + + AntiopeBucket: + Type: AWS::S3::Bucket + DeletionPolicy: Retain + DependsOn: AntiopeBucketNotificationTopicPolicy + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + BucketName: !Ref pBucketName + # Additional Configuration options to come back and revisit. + # LifecycleConfiguration: <- I don't think we'd ever want to expire resources, but maybe over time? + # LoggingConfiguration: <- Probably unnecessary, but if someone needs it for compliance + # MetricsConfigurations: <- Might be useful to see metrics on the primary keys of the bucket + # InventoryConfiguration: <- Might be useful to pull out the Resources/ objects into a specific report + NotificationConfiguration: + TopicConfigurations: + - Event: 's3:ObjectCreated:*' + Topic: !Ref ResourceNotificationTopic + Filter: + S3Key: + Rules: + - Name: prefix + Value: "Resources/" + - Name: suffix + Value: ".json" + OwnershipControls: + Rules: + - ObjectOwnership: BucketOwnerPreferred + PublicAccessBlockConfiguration: + BlockPublicAcls: True + BlockPublicPolicy: True + IgnorePublicAcls: True + RestrictPublicBuckets: False # This rule also prohibits Cross-Account bucket access + + # TODO + # What Bucket Policy is needed? + + ResourceNotificationTopic: + Type: AWS::SNS::Topic + Properties: + DisplayName: !Sub "Destination of PutObject calls from ${pBucketName}" + TopicName: !Sub "${pBucketName}-Resources-PutObject" + + # This Policy can be reused for any future Topics + AntiopeBucketNotificationTopicPolicy: + Type: AWS::SNS::TopicPolicy + Properties: + Topics: + - !Ref ResourceNotificationTopic + PolicyDocument: + Version: '2012-10-17' + Id: AllowAntiopeBucket + Statement: + - Sid: AllowAntiopeBucketPublish + Effect: Allow + Principal: + AWS: "*" + Action: + - SNS:Publish + Resource: + - !Ref ResourceNotificationTopic + Condition: + ArnLike: + aws:SourceArn: !Sub "arn:aws:s3:*:*:${pBucketName}" + StringEquals: + aws:SourceAccount: !Ref AWS::AccountId + +Outputs: + + Bucket: + Value: !Ref pBucketName + Description: Antiope Bucket Name + + BucketArn: + Value: !GetAtt AntiopeBucket.Arn + Description: Antiope Bucket ARN + + BucketDomainName: + Value: !GetAtt AntiopeBucket.DomainName + Description: The IPv4 DNS name of the Antiope Bucket + + ResourceNotificationTopicArn: + Value: !Ref ResourceNotificationTopic + Description: ARN of the Topic where Resources PutObject events are Sent + + ResourceNotificationTopicName: + Value: !GetAtt ResourceNotificationTopic.TopicName + Description: Name of the Topic where Resources PutObject events are Sent \ No newline at end of file diff --git a/docs/AntiopeBucket.md b/docs/AntiopeBucket.md new file mode 100644 index 0000000..04408ef --- /dev/null +++ b/docs/AntiopeBucket.md @@ -0,0 +1,54 @@ +# Managing the Antiope Bucket + + +## Creating a New Antiope Bucket + +To create a fresh antiope bucket, leverage the CFT in [cloudformation/antiope-bucket-Template.yaml](../cloudformation/antiope-bucket-Template.yaml). + +Steps to deploy: +1. Generate a manifest: +```bash +cft-generate-manifest -m config-files/antiope-bucket-Manifest.yaml -t cloudformation/antiope-bucket-Template.yaml +``` +2. Edit the `config-files/antiope-bucket-Manifest.yaml` and set the stackname and pBucketName +3. Deploy the CloudFormation Stack with: +```bash +cft-deploy -m config-files/antiope-bucket-Manifest.yaml +``` + +Now you can proceed to deploy the rest of Antiope + +## Importing an existing Antiope Bucket into Cloudformation + +If for whatever reason, you have an existing Antiope bucket you wish to use, you can use CloudFormation import to import the existing Antiope Bucket into CloudFormation, then update the bucket stack to include the other resources. + +CloudFormation import has some significant limitations. Not all resources _can_ be imported, and all resources in a template _must_ be imported. To work around these limitations, there is a barebones CFT that can be used to import the existing bucket into CloudFormation. Once imported, the stack can be updated to use the main template. The steps to import an existing bucket are as follows: + +1. Create an import change set: +```bash +aws cloudformation create-change-set --output text \ + --stack-name antiope-bucket \ + --change-set-name bucket-import \ + --parameters ParameterKey=pBucketName,ParameterValue=REPLACE_WITH_YOUR_BUCKET_NAME \ + --template-body file://cloudformation/antiope-bucket-ImportTemplate.yaml \ + --change-set-type IMPORT \ + --resources-to-import ResourceType=AWS::S3::Bucket,LogicalResourceId=AntiopeBucket,ResourceIdentifier={BucketName=REPLACE_WITH_YOUR_BUCKET_NAME} +``` +2. Review the change set +```bash +aws cloudformation describe-change-set --change-set-name bucket-import --stack-name antiope-bucket +``` +3. Execute the change set +```bash +aws cloudformation execute-change-set --change-set-name bucket-import --stack-name antiope-bucket +``` +4. Validate the new stack is in `IMPORT_COMPLETE` state +5. Now update the new stack with the full-featured template. First Generate a manifest: +```bash +cft-generate-manifest -m config-files/antiope-bucket-Manifest.yaml -t cloudformation/antiope-bucket-Template.yaml +``` +6. Edit the `config-files/antiope-bucket-Manifest.yaml` and set the stackname and pBucketName to the values used for the import +7. Deploy the CloudFormation Stack with: +```bash +cft-deploy -m config-files/antiope-bucket-Manifest.yaml --force +``` \ No newline at end of file diff --git a/docs/SplunkHEC.md b/docs/SplunkHEC.md new file mode 100644 index 0000000..8bfc300 --- /dev/null +++ b/docs/SplunkHEC.md @@ -0,0 +1,74 @@ +# Adding support to send Antiope events to Splunk via HEC + +Antiope supports the ability to push new resources into Splunk by way of the HTTP Event Collector. In order to do this, an SNS topic is created on the S3 bucket and an SQS Queue is subscribed to that Topic. Events on the SQS Queue consist of Object Put notifications for events written to the Antiope Bucket under Resources/. + +A Lambda is invoked from these messages to read the S3 object and push to HEC. The Lambda requires an AWS Secrets Manager secret to exist with the HEC endpoint & token. + +Since the S3 bucket writes to an SNS Topic, this design also supports the Antiope ElasticSearch cluster. + + +## Installation + +1. Make sure the S3 bucket is deployed via the [Antiope Bucket Template](AntiopeBucket.md). Instructions for importing an existing bucket are also in that ReadMe file. +2. Update the main Antiope Manifest to source the SNS Topic created by the Antiope Bucket stack +```yaml +########### +# These stacks are needed by the SourcedParameters section +########### +DependentStacks: + AntiopeBucket: antiope-bucket + +########### +# Parameters that come from other deployed stacks. +# Valid Sections are Resources, Outputs Parameters +# +# Hint. Get your list of resources this way: +# aws cloudformation describe-stack-resources --stack-name stack_name_for_other_stack --output text +SourcedParameters: + + # SNS Topic from the Antiope Bucket Stack. + pS3EventNotificationTopicArn: AntiopeBucket.Outputs.ResourceNotificationTopicArn +``` +3. Install or update Antiope with `make deploy env=PROD`. This will reconfigure the search cluster to use SNS if it's enabled. +4. Create a AWS Secrets Manager Secret with the format of: +```json +{ + "HECEndpoint": "https://YOUR-SPLUNK-HEC-HOSTNAME/services/collector/event", + "HECToken": "THIS-IS-SECRET" +} +``` +5. Generate Manifest for the Splunk HEC Cluster +```bash +cft-generate-manifest -m config-files/antiope-splunk-Manifest.yaml -t cloudformation/SplunkHEC-Template.yaml +``` +6. Edit the Manifest file and set the pSplunkHECSecret name and adjust the alarm threshold +7. Move the `pBucketName`, `pS3EventNotificationTopicArn`, and `pAWSInventoryLambdaLayer` entries from Parameters to SourcedParameters, and set them to the values below. Also set the DependentStacks for `Bucket` and `Antiope` to the CloudFormation stacknames you're using. The `DependentStacks` and `SourceParameters` should look like: +```yaml +########### +# These stacks are needed by the SourcedParameters section +########### +DependentStacks: + Bucket: antiope-bucket + Antiope: antiope + +########### +# Parameters that come from other deployed stacks. +# Valid Sections are Resources, Outputs Parameters +# +# Hint. Get your list of resources this way: +# aws cloudformation describe-stack-resources --stack-name stack_name_for_other_stack --output text +SourcedParameters: + + # Name of the Antiope Bucket + pBucketName: Bucket.Outputs.Bucket + + # SNS Topic for the Splunk Ingest SQS Queue to subscribe to. + pS3EventNotificationTopicArn: Bucket.Outputs.ResourceNotificationTopicArn + + # ARN Antiope AWS Lambda Layer + pAWSInventoryLambdaLayer: Antiope.Resources.AWSInventoryLambdaLayer +``` +8. Deploy stack to send all Resources written to the S3 bucket to Splunk +```bash +cft-deploy -m config-files/antiope-splunk-Manifest.yaml +``` \ No newline at end of file diff --git a/search-cluster/cloudformation/SearchCluster-Template.yaml b/search-cluster/cloudformation/SearchCluster-Template.yaml index c300156..3d102bd 100644 --- a/search-cluster/cloudformation/SearchCluster-Template.yaml +++ b/search-cluster/cloudformation/SearchCluster-Template.yaml @@ -69,11 +69,17 @@ Parameters: - "6.4" - "6.3" + pS3EventNotificationTopicArn: + Description: SNS Topic for the Elasticsearch SQS Queue to subscribe to. + Type: String + Default: None + Conditions: EncryptionEnabled: !Equals [ !Ref pClusterEncryption, True ] ReIndex: !Equals [ !Ref pReIndexSettings, True ] - + SNS2SQS: !Not [ !Equals [ !Ref pS3EventNotificationTopicArn, "None"] ] + SQSEncryptionEnabled: !And [ !Equals [ !Ref pClusterEncryption, True ], !Equals [ !Ref pS3EventNotificationTopicArn, "None"] ] Resources: @@ -230,7 +236,7 @@ Resources: CodeUri: ../lambda FunctionName: !Sub "${pResourcePrefix}-ingest-s3" Description: AWS Lamdba to pull data from S3 to index into Elasticsearch - Handler: ingest_s3.lambda_handler + Handler: !If [ SNS2SQS, ingest_sns_sqs.lambda_handler, ingest_sqs_direct.lambda_handler ] Runtime: python3.6 Timeout: !If [ ReIndex, 900, 180] MemorySize: 768 @@ -251,14 +257,17 @@ Resources: SearchIngestEventQueue: Type: AWS::SQS::Queue Properties: + # Note: SNS to SQS doesn't work if KMS is enabled. Since this SQS Queue only contains + # Account ID, Bucket and Object names, it's reasonably safe to not encrypt. YYMV + # We only support SQS Encryption if Encryption is enabled and SNS support is not. KmsMasterKeyId: Fn::If: - - EncryptionEnabled + - SQSEncryptionEnabled - Ref: SearchClusterKMSKey - Ref: AWS::NoValue KmsDataKeyReusePeriodSeconds: Fn::If: - - EncryptionEnabled + - SQSEncryptionEnabled - 86400 - Ref: AWS::NoValue MessageRetentionPeriod: !If [ ReIndex, 10800, 3600] # Any messages older than an hour are probably out-of-date @@ -276,7 +285,8 @@ Resources: Version: '2012-10-17' Id: AllowS3 Statement: - - Effect: Allow + - Sid: AllowS3EventNotification + Effect: Allow Principal: AWS: '*' Action: @@ -285,6 +295,26 @@ Resources: Condition: ArnLike: aws:SourceArn: !Sub "arn:aws:s3:*:*:${pBucketName}" + - !If + - SNS2SQS + - Sid: Allow-SNS-SendMessage + Effect: Allow + Principal: "*" + Action: + - sqs:SendMessage + Resource: !GetAtt SearchIngestEventQueue.Arn + Condition: + ArnEquals: + aws:SourceArn: !Ref pS3EventNotificationTopicArn + - !Ref AWS::NoValue + + SearchIngestQueueSubscription: + Type: AWS::SNS::Subscription + Condition: SNS2SQS + Properties: + Endpoint: !GetAtt SearchIngestEventQueue.Arn + Protocol: sqs + TopicArn: !Ref pS3EventNotificationTopicArn SearchIngestS3FunctionMapping: Type: AWS::Lambda::EventSourceMapping diff --git a/search-cluster/lambda/Makefile b/search-cluster/lambda/Makefile index e353685..3d51320 100644 --- a/search-cluster/lambda/Makefile +++ b/search-cluster/lambda/Makefile @@ -4,7 +4,7 @@ PYTHON=python3 PIP=pip3 -FILES=ingest_s3.py +FILES=ingest_sns_sqs.py ingest_sqs_direct.py DEPENDENCIES= diff --git a/search-cluster/lambda/ingest_sns_sqs.py b/search-cluster/lambda/ingest_sns_sqs.py new file mode 100644 index 0000000..fc0a691 --- /dev/null +++ b/search-cluster/lambda/ingest_sns_sqs.py @@ -0,0 +1,192 @@ +import boto3 +from botocore.exceptions import ClientError +import re +import requests +from requests_aws4auth import AWS4Auth +import json +import os +import time +import datetime +from dateutil import tz +from urllib.parse import unquote + +import logging +logger = logging.getLogger() +logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', default='INFO'))) +logging.getLogger('botocore').setLevel(logging.WARNING) +logging.getLogger('boto3').setLevel(logging.WARNING) +logging.getLogger('urllib3').setLevel(logging.WARNING) + + +# Lambda execution starts here +def lambda_handler(event, context): + logger.debug("Received event: " + json.dumps(event, sort_keys=True)) + + region = os.environ['AWS_REGION'] + service = 'es' + credentials = boto3.Session().get_credentials() + awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) + + host = "https://{}".format(os.environ['ES_DOMAIN_ENDPOINT']) + es_type = "_doc" # This is what es is moving to after deprecating types in 6.0 + headers = {"Content-Type": "application/json"} + + bulk_ingest_body = "" + count = 0 + + for record in event['Records']: + sns_message = json.loads(record['body']) + if 'Message' in sns_message: + sns_message2 = json.loads(sns_message['Message']) + s3_record_list = sns_message2['Records'] + else: + s3_record_list = message['Records'] + + + for s3_record in s3_record_list: + bucket = s3_record['s3']['bucket']['name'] + obj_key = s3_record['s3']['object']['key'] + + resource_to_index = get_object(bucket, obj_key) + if resource_to_index is None: + continue + + # This is a shitty hack to get around the fact Principal can be "*" or {"AWS": "*"} in an IAM Statement + modified_resource_to_index = fix_principal(resource_to_index) + + # Now we need to build the ES command. We need the index and document name from the object_key + key_parts = obj_key.split("/") + # The Elastic Search document id, is the object_name minus the file suffix + es_id = key_parts.pop().replace(".json", "") + + # The Elastic Search Index is the remaining object prefix, all lowercase with the "/" replaced by "_" + index = "_".join(key_parts).lower() + + # Now concat that all together for the Bulk API + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + + command = {"index": {"_index": index, "_type": "_doc", "_id": es_id}} + command_str = json.dumps(command, separators=(',', ':')) + document = json.dumps(modified_resource_to_index, separators=(',', ':')) + bulk_ingest_body += f"{command_str}\n{document}\n" + count += 1 + + # Don't call ES if there is nothing to do. + if count == 0: + logger.warning("No objects to index.") + return(event) + + bulk_ingest_body += "\n" + + # all done processing the SQS messages. Send it to ES + logger.debug(bulk_ingest_body) + + requeue_keys = [] + + try: + # Now index the document + r = requests.post(f"{host}/_bulk", auth=awsauth, data=bulk_ingest_body, headers=headers) + + if not r.ok: + logger.error(f"Bulk Error: {r.status_code} took {r.elapsed} sec - {r.text}") + raise Exception + + else: # We need to make sure all the elements succeeded + response = r.json() + logger.info(f"Bulk ingest of {count} documents request took {r.elapsed} sec and processing took {response['took']} ms with errors: {response['errors']}") + if response['errors'] is False: + return(event) # all done here + + for item in response['items']: + if 'index' not in item: + logger.error(f"Item {item} was not of type index. Huh?") + continue + if item['index']['status'] != 201 and item['index']['status'] != 200: + logger.error(f"Bulk Ingest Failure: Index {item['index']['_index']} ID {item['index']['_id']} Status {item['index']['status']} - {item}") + requeue_keys.append(process_requeue(item)) + + except Exception as e: + logger.critical("General Exception Indexing s3://{}/{}: {}".format(bucket, obj_key, e)) + raise + + if len(requeue_keys) > 0: + requeue_objects(os.environ['INVENTORY_BUCKET'], requeue_keys) + + +def process_requeue(item): + # We must reverse the munge of the object key + prefix = item['index']['_index'].replace("_", "/").replace("resources", "Resources") + key = f"{prefix}/{item['index']['_id']}.json" + logger.warning(f"Requeueing {key} : {item}") + return(key) + + +def fix_principal(json_doc): + """ + WTF are we doing here? Good Question! + ElasticSearch has an oddity where it can't handle a attribute being a literal or another level of nesting. This becomes and issue when the "Principal" in a statement + can be one of: + "Principal": "*" ; or + "Principal": { "AWS": "*" } ; or + "Principal": { "Service": "someservice.amazonaws.com" } + + You end up with an error for the first condition that states: + 'type': 'mapper_parsing_exception', + 'reason': 'object mapping for [supplementaryConfiguration.Policy.Statement.Principal] tried to parse field [Principal] as object, but found a concrete value' + + What this function will do is a total hack. It will modify the "Principal": "*" case to be "Principal": { "ALL": "*" } + That will permit the document to be indexed and offers some level of indication as to what is happening. I hate it, but it's the best idea I've got right now. + + Note: I believe that there is a distinction between Principal: * and Principal: AWS: * - the former indicates no AWS auth is occuring at all , whereas the AWS: * means any AWS Customer (having previously authenticated to their own account). Both are bad. + """ + + string_to_match = '"Principal":"*"' + string_to_sub = '"Principal": { "ALL": "*"}' + + # Convert to String, Make sure there are no spaces between json elements (so it can match with string_to_match) + json_string = json.dumps(json_doc, separators=(',', ':'), indent=None) + + # print(f"In fix principal, json_string is {json_string}") + + # Do the replace + modified_json_string = json_string.replace(string_to_match, string_to_sub) + + # Convert back to dict + modified_json_doc = json.loads(modified_json_string) + + return(modified_json_doc) + + +def requeue_objects(bucket, objects): + '''Drop any objects that were rejected because of thread issues back into the SQS queue to get ingested later''' + + sqs_client = boto3.client('sqs') + queue_url = os.environ['SQS_QUEUE_URL'] + + body = { + 'Records': [] + } + + for o in objects: + body['Records'].append({'s3': {'bucket': {'name': bucket}, 'object': {'key': o}}}) + + logger.warning(f"Re-queuing {len(objects)} Objects") + response = sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(body)) + return(len(objects)) + + +def get_object(bucket, obj_key): + '''get the object to index from S3 and return the parsed json''' + s3 = boto3.client('s3') + try: + response = s3.get_object( + Bucket=bucket, + Key=unquote(obj_key) + ) + return(json.loads(response['Body'].read())) + except ClientError as e: + if e.response['Error']['Code'] == 'NoSuchKey': + logger.error("Unable to find resource s3://{}/{}".format(bucket, obj_key)) + else: + logger.error("Error getting resource s3://{}/{}: {}".format(bucket, obj_key, e)) + return(None) diff --git a/search-cluster/lambda/ingest_s3.py b/search-cluster/lambda/ingest_sqs_direct.py similarity index 100% rename from search-cluster/lambda/ingest_s3.py rename to search-cluster/lambda/ingest_sqs_direct.py