Graylog2 / graylog-plugin-aws

Several bundled Graylog plugins to integrate with different AWS services like CloudTrail and FlowLogs.
Other
91 stars 37 forks source link

Application didn't checkpoint at end of shard #102

Closed ion9 closed 4 years ago

ion9 commented 5 years ago

After increasing the shard count of a steam I find errors like:

2019-01-04 15:45:54,480 ERROR: com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask - Application exception.
java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000000
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:113) [graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [graylog-plugin-aws-2.4.6.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

info on shard from diffrent log:

2019-01-04 15:45:08,430 INFO : com.amazonaws.services.kinesis.leases.impl.LeaseRenewer - Worker graylog-node-4389c7e0b22d3e2939192e5bdf92067b15bfd07ecf940989f67c3fb2d4b2f2e5 found lease
{
    "leaseKey": "shardId-000000000002",
    "leaseOwner": "graylog-node-4389c7e0b22d3e2939192e5bdf92067b15bfd07ecf940989f67c3fb2d4b2f2e5",
    "leaseCounter": 1262,
    "concurrencyToken": null,
    "lastCounterIncrementNanos": null,
    "checkpoint": {
        "sequenceNumber": "TRIM_HORIZON",
        "subSequenceNumber": 0
    },
    "pendingCheckpoint": null,
    "ownerSwitchesSinceCheckpoint": 3,
    "parentShardIds": [
        "shardId-000000000000"
    ]
}

Looks like: https://github.com/awslabs/amazon-kinesis-client/pull/345

Other information: https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing After the reshard has occurred and the stream is again in an ACTIVE state, you could immediately begin to read data from the child shards. However, the parent shards that remain after the reshard could still contain data that you haven't read yet that was added to the stream before the reshard. If you read data from the child shards before having read all data from the parent shards, you could read data for a particular hash key out of the order given by the data records' sequence numbers. Therefore, assuming that the order of the data is important, you should, after a reshard, always continue to read data from the parent shards until it is exhausted. Only then should you begin reading data from the child shards. When getRecordsResult.getNextShardIterator returns null, it indicates that you have read all the data in the parent shard. If you are reading data using the Kinesis Client Library, the library ensures that you receive the data in order even if a reshard occurs.

ion9 commented 5 years ago

describe-steam on the problem stream.

aws kinesis describe-stream --stream-name CloudWatch  --profile shared --region us-east-1
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49590726756760425157998356762744788985497854313259597826",
                    "EndingSequenceNumber": "49590734542050927318432283276999739773632144383749914626"
                }
            },
            {
                "ShardId": "shardId-000000000001",
                "ParentShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "170141183460469231731687303715884105727"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591682063617123246853875533725686181781598824321515538",
                    "EndingSequenceNumber": "49591682063628273619453140845295245115103214534221889554"
                }
            },
            {
                "ShardId": "shardId-000000000002",
                "ParentShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "170141183460469231731687303715884105728",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591682063639423992052406156867221900054247185827495970",
                    "EndingSequenceNumber": "49591682063650574364651671468436780833375862895727869986"
                }
            },
            {
                "ShardId": "shardId-000000000003",
                "ParentShardId": "shardId-000000000001",
                "AdjacentParentShardId": "shardId-000000000002",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591707538248979922752363819090454830008117285923848242"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:????????????:stream/CloudWatch",
        "StreamName": "CloudWatch",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": [
                    "IncomingBytes",
                    "OutgoingRecords",
                    "IncomingRecords",
                    "ReadProvisionedThroughputExceeded",
                    "WriteProvisionedThroughputExceeded",
                    "OutgoingBytes"
                ]
            }
        ],
        "EncryptionType": "KMS",
        "KeyId": "alias/aws/kinesis",
        "StreamCreationTimestamp": 1543865029.0
    }
}
ion9 commented 5 years ago

Deleting and remaking the stream did not help:

2019-01-04 16:38:07,454 ERROR: com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask - Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException: Parent shard shardId-000000000000 exists but not the child shard shardId-000000000002
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.isCandidateForCleanup(ShardSyncer.java:663) ~[graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.cleanupGarbageLeases(ShardSyncer.java:609) ~[graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.syncShardLeases(ShardSyncer.java:152) ~[graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.checkAndCreateLeasesForNewShards(ShardSyncer.java:90) ~[graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:71) [graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:504) [graylog-plugin-aws-2.4.6.jar:?]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:436) [graylog-plugin-aws-2.4.6.jar:?]
at org.graylog.aws.kinesis.KinesisConsumer.run(KinesisConsumer.java:168) [graylog-plugin-aws-2.4.6.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
ion9 commented 5 years ago

Found a work around: remove the items form the dynamodb table for the input.

danotorrey commented 5 years ago

Hi @ion9, Thanks for all of the details and investigation. Which type of logs are you reading from AWS (CloudWatch logs, or Flow Logs)? We would just like to know for reference.

When an AWS Logs/Flow Logs input in Graylog is created and starts processing logs, it tracks the progress in DynamoDB. It looks like something went wrong with progress tracking after the re-shard operation. It makes senes that deleting the DynamoDB table would resolve the issue, but I believe that would also cause Graylog to go back and re-read previously read log messages.

One option that we are considering for future AWS development is removing the need for a Kenesis stream between the flow log messages in Cloud Watch to Graylog. It should be technically possible for us to read the messages directly from CloudWatch (no Kinesis). The main goal being to reduce complexity and cost. Do you see any issue with this potential approach?

ion9 commented 5 years ago

I'm using it for both, the problem was on a CloudWatch logs stream that I added a shard to with the input running.

Re-reading would be a problem, but I had remade the kinesis stream before I found the table.

I would stick with the kinesis setup, as it enables people who would like to do more with the data pipeline to do so.... (I like to think of kinesis as the AWS kafka, and that lets me run graylog in ECS with the disk journal disabled and the inputs set to throttling_allowed: true ).

This is the layout i have been working with.

ALB -> s3 -> lambda ↘
        awslogsd  -> CWL -> cross-account subscription ->  kinesis -> graylog -> ES
                                                                 ↘ firehost -> s3

Source for ALB to CWL https://github.com/amazon-archives/cloudwatch-logs-centralize-logs

danotorrey commented 5 years ago

@ion9 Understood! We appreciate the info on the use case. We will definitely keep this in mind with future AWS development.

We're planning to use the latest AWS SDK client version 2.0 with our upcoming AWS development, which should hopefully resolve the checkpoint issues on increasing the shard count.

Is increasing the shard count a frequent operation? Does it have to be performed live (or can it be done in a maintenance period)? I just want to make sure we understand the use case.

I am leaving this issue open, so we can test if we can successfully adjust the shard count after completing our latest round of AWS improvements.

ion9 commented 5 years ago

I would push to keep it as kinesis, after you grow out of an account you will be looking at how to do cross account logging and be right back to kiensis.

I have a bit of everything in the streams, I have cloudtrail, flowlogs, awslogd from some old school ec2's, our ecs cluster hosts, all the app logs from the ecs tasks, lambda logs, ALB logs, redshift, elasticsearch, directoryservices, code pipeline, and code build.

It is not something that happens often, I have not made a lambda for it.

danotorrey commented 4 years ago

@ion9

The new AWS Kinesis/Cloud Watch integration should solve this error. It also supports Kinesis streams that contain data that originated from sources other than Cloud Watch (as long as the Kinesis message payload is in encoded text with new line delimiters).

Please let us know if you still have any trouble using it.

Please see these references for more info: https://docs.graylog.org/en/3.3/pages/integrations/inputs/aws_kinesis_cloudwatch_input.html https://www.youtube.com/watch?v=2OHiMWWcp7I