cherweg / logstash-input-s3-sns-sqs

logstash input downloading files from s3 Bucket by OjectKey from SNS/SQS
Other
29 stars 35 forks source link

Cannot read Cloudtrail logs. Uncompleted message at the end of poller loop #51

Closed wpitt closed 4 years ago

wpitt commented 4 years ago

When I try to ingest logs using the s3snssqs input, I see no events output but no errors either. If I enable DEBUG logs, I see an "Inside Preprocess" message followed by a "Payload in Preprocess" message and then the following message for each message read from the SQS queue?

[2020-10-07T18:45:56,328][DEBUG][logstash.inputs.s3snssqs ][main][xxxx] [Worker xxxx/0] uncompleted message at the end of poller loop. We´ll throw skip_delete. {:message_count=>2729}

These messages are being accessed using a cross-account role that has been verified to access both the s3 buckets as well as the sqs queue using the AWS CLI.

Here is the current LS config: input{ s3snssqs { s3_role_session_name => "MY_ROLE_NAME" region => "us-west-2" s3_default_options => { "endpoint_discovery" => true } role_arn => "arn:aws:iam::ACCOUNT_NUMBER:role/MY_ROLE_NAME" queue => "QUEUE_NAME" queue_owner_aws_account_id => "ACCOUNT_NUMBER" type => "sqs-logs" sqs_skip_delete => true sqs_delete_on_failure => false from_sns => true } }

Any ideas what may be causing this?

christianherweg0807 commented 4 years ago

You should set a codec, because default is plain text. Here an example:


    s3snssqs {
        region                     => "myawsregion"
        queue                      => "myqueuname"
        queue_owner_aws_account_id => "theownersid"
        s3_default_options         => { "endpoint_discovery" => true }
        type                       => "mydefaulttype"
        tags                       => []
        sqs_skip_delete            => false
        codec                      => "json"
        s3_options_by_bucket   => [
            { 
                bucket_name => "my-bucket-name"
                folders => [
                    { 
                        key => ".*AWSLogs.*"
                        codec => "cloudtrail"
                        type => "cloudtrail"
                    }
                ]
            }
        ]
    }```
wpitt commented 4 years ago

Thanks for taking the time to get back to me Christian. I tried that and I'm still getting the same issue.

  s3snssqs {
    region                     => "us-west-2"
    queue                      => "MY_QUEUE"
    queue_owner_aws_account_id => "MY_ACCOUNT"
    type                       => "sqs-logs"
    from_sns                   => true
    codec                      => "json"
    sqs_skip_delete            => false
    sqs_delete_on_failure      => false
    s3_default_options         => { "endpoint_discovery" => true }
    role_arn                   => "arn:aws:iam::MY_ACCOUNT:role/MY_ROLE"
    delete_on_success          => false
    tags                       => "cloudtrail"
    s3_options_by_bucket   => [
      {
        bucket_name => "BUCKET_NAME"
        folders => [
          {
            key => ".*AWSLogs.*"
            codec => "cloudtrail"
            type => "cloudtrail"
          }
        ]
      }
    ]
  }

Below is a snippet from the errors I'm receiving. Any help is much appreciated.

[2020-10-12T15:10:56,813][DEBUG][logstash.inputs.s3snssqs ]
[main][xxxx9f87] 
Inside Preprocess: Start {
  :event=>#<struct Aws::SQS::Types::Message 
  message_id="xxxx001c", 
  receipt_handle="xxxxAo2A==", 
  md5_of_body="xxxxad74", 
  body="{\n  
    \"Type\" : \"Notification\",\n  
    \"MessageId\" : \"xxxx7e50\",\n  
    \"TopicArn\" : \"arn:aws:sns:us-west-2:MY_ACCOUNT:SNS_TOPIC_NAME\",\n  
    \"Message\" : \"{
      \\\"s3Bucket\\\":\\\"S3_BUCKET_NAME\\\",
      \\\"s3ObjectKey\\\":[
        \\\"AWSLogs/MY_ACCOUNT/CloudTrail/us-east-1/2020/10/08/OBJECT_NAME.json.gz\\\"
      ]
    }\",\n  
    \"Timestamp\" : \"2020-10-08T15:35:36.961Z\",\n  
    \"SignatureVersion\" : \"1\",\n  
    \"Signature\" : \"XXXXX\",\n  
    \"SigningCertURL\" : \"XXXXX",\n  
    \"UnsubscribeURL\" : \"XXXXX\"\n}", 
    attributes={
      "SenderId"=>"XXXXX", 
      "ApproximateFirstReceiveTimestamp"=>"1602514087731", 
      "ApproximateReceiveCount"=>"2", 
      "SentTimestamp"=>"1602171336996"
    }, 
    md5_of_message_attributes=nil, 
    message_attributes={}>
  }

[2020-10-12T15:10:56,814][DEBUG][logstash.inputs.s3snssqs ]
[main][xxxx9f87] 
Payload in Preprocess:  {
  :payload=>{
    "s3Bucket"=>"S3_BUCKET_NAME", 
    "s3ObjectKey"=>[
      "AWSLogs/MY_ACCOUNT/CloudTrail/us-east-1/2020/10/08/OBJECT_NAME.json.gz"
    ]
  }
}

[2020-10-12T15:10:56,814][DEBUG][logstash.inputs.s3snssqs ][main][a72336ad12a35ae97e7821f48f520a3e7d33b2f90c5a74cdac31ca078b099f87] 
[Worker xxxx9f87/0] 
uncompleted message at the end of poller loop. We´ll throw skip_delete. {:message_count=>50}
christianherweg0807 commented 4 years ago

Ohh I see. I know whats wrong, butg why....: { "Type": "Notification", "MessageId": "xxx", "TopicArn": "arn:aws:sns:xxx", "Subject": "Amazon S3 Notification", "Message": "{\"Records\":[...

You message is not compatible with the Amazon S3 Notification...The Records Part is missing...

 payload = JSON.parse(message.body)
    payload = JSON.parse(payload['Message']) if @from_sns
    @logger.debug("Payload in Preprocess: ", :payload => payload)
    return nil unless payload['Records']

Please see: https://docs.aws.amazon.com/de_de/AmazonS3/latest/dev/notification-content-structure.html

wpitt commented 4 years ago

Is this maybe due to the CloudTrail logs being encrypted? Is this supported by the plugin if the user/role has the appropriate decrypt permissions.

christianherweg0807 commented 4 years ago

Yes decryption is no problem...As you can see, you have a SQS Message after extracting body.Message. Your Payload:

"s3Bucket"=>"S3_BUCKET_NAME", 
    "s3ObjectKey"=>[
      "AWSLogs/MY_ACCOUNT/CloudTrail/us-east-1/2020/10/08/OBJECT_NAME.json.gz"
    ]

Should be:

   "Records":[  
      {  
         "s3":{  
            "bucket":{  
               "name":"bucket-name",
               "arn":"bucket-ARN"
            },
            "object":{  
               "key":"object-key",
              "size":"size"
            }
         }
      }
   ]

This is still the preprocess phase, trying to identify the download target.

The plugin searches for a ['Records'] entry after body.Message or returns nil.

    return nil unless payload['Records']
    payload['Records'].each do |record|
      @logger.debug("We found a record", :record => record)

Please watch out for the "We found a record" debug line...

wpitt commented 4 years ago

If I manually pull the object returned in the SQS queue, it contains a Records entry, but it is just a list of events, not a nested s3 entry with buckets and objects.

{
    "Records": [
        {
            "awsRegion": "us-east-1",
            "eventID": "XXXX",
            "eventName": "ListRoles",
            "eventSource": "iam.amazonaws.com",
            "eventTime": "2020-10-10T04:22:48Z",
            "eventType": "AwsApiCall",
            "eventVersion": "1.05",
            "recipientAccountId": "XXXX",
            "requestID": "XXXX",
            "requestParameters": {
                "marker": "XXXX"
            },
            "responseElements": null,
            "sourceIPAddress": "XXXX",
            "userAgent": "XXXX",
            "userIdentity": {
                "accessKeyId": "XXXX",
                "accountId": "XXXX",
                "arn": "arn:aws:sts::XXXX:assumed-role/XXXX/AssumeRoleSession",
                "principalId": "XXXX:AssumeRoleSession",
                "sessionContext": {
                    "attributes": {
                        "creationDate": "2020-10-10T04:02:31Z",
                        "mfaAuthenticated": "false"
                    },
                    "sessionIssuer": {
                        "accountId": "XXXX",
                        "arn": "arn:aws:iam::XXXX:role/XXXX",
                        "principalId": "XXXX",
                        "type": "Role",
                        "userName": "XXXX"
                    },
                    "webIdFederationData": {}
                },
                "type": "AssumedRole"
            }
        },
        {
            "awsRegion": "us-east-1",
            "eventID": "XXXX",
            "eventName": "GetAccountPasswordPolicy",
            "eventSource": "iam.amazonaws.com",
            "eventTime": "2020-10-10T04:22:48Z",
            "eventType": "AwsApiCall",
            "eventVersion": "1.05",
            "recipientAccountId": "XXXX",
            "requestID": "XXXX",
            "requestParameters": null,
            "responseElements": null,
            "sourceIPAddress": "XXXX",
            "userAgent": "XXXX",
            "userIdentity": {
                "accessKeyId": "XXXX",
                "accountId": "XXXX",
                "arn": "arn:aws:sts::XXXX:assumed-role/XXXX/AssumeRoleSession",
                "principalId": "XXXX:AssumeRoleSession",
                "sessionContext": {
                    "attributes": {
                        "creationDate": "2020-10-10T04:02:31Z",
                        "mfaAuthenticated": "false"
                    },
                    "sessionIssuer": {
                        "accountId": "XXXX",
                        "arn": "arn:aws:iam::XXXX:role/XXXX",
                        "principalId": "XXXX",
                        "type": "Role",
                        "userName": "XXXX"
                    },
                    "webIdFederationData": {}
                },
                "type": "AssumedRole"
            }
        },
.......
christianherweg0807 commented 4 years ago

Thats ok... It is not the content of the file. The SQS Message is not in the "Amazon S3 Notification" format. How do you generate the messages in you sqs queue? https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html

wpitt commented 4 years ago

I think we finally solved this issue. Changed notifications from Cloudtrail -> SNS -> SQS to S3 -> SQS

Looks like the format issues were due to the notifications being generated directly from Cloudtrail and not from S3.

We changed to S3 notifications monitoring the Cloudtrail object paths and sent directly to SQS queue. Once that was in place the plugin started working.

Thanks for all the help in figuring out the format discrepancy. Would there be any benefit to modifying the plugin to support Cloudtrail generated notifications?