awslabs / amazon-kinesis-scaling-utils

The Kinesis Scaling Utility is designed to give you the ability to scale Amazon Kinesis Streams in the same way that you scale EC2 Auto Scaling groups – up or down by a count or as a percentage of the total fleet. You can also simply scale to an exact number of Shards. There is no requirement for you to manage the allocation of the keyspace to Shards when using this API, as it is done automatically.
Apache License 2.0
338 stars 95 forks source link

Uneven shard key distribution #22

Closed jr314159 closed 9 years ago

jr314159 commented 9 years ago

We've been using this application to manage our stream, and I discovered that the hash key distributions for each shard are uneven:

Joel:s3logger jrosen$ aws kinesis describe-stream --stream-name profile_vertica_logs_production_localytics
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE", 
        "StreamName": "profile_vertica_logs_production_localytics", 
        "StreamARN": "arn:aws:kinesis:us-east-1:294150496849:stream/profile_vertica_logs_production_localytics", 
        "Shards": [
            {
                "ShardId": "shardId-000000000874", 
                "HashKeyRange": {
                    "EndingHashKey": "300249147351239883522035629954940250949", 
                    "StartingHashKey": "280232537532583562635223583154768590786"
                }, 
                "ParentShardId": "shardId-000000000870", 
                "AdjacentParentShardId": "shardId-000000000872", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468058145507846763513171201233418429267577761969826"
                }
            }, 
            {
                "ShardId": "shardId-000000000877", 
                "HashKeyRange": {
                    "EndingHashKey": "320265757169356079890549598711728836438", 
                    "StartingHashKey": "300249147351239883522035629954940250950"
                }, 
                "ParentShardId": "shardId-000000000873", 
                "AdjacentParentShardId": "shardId-000000000875", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554732340597232675420810692645089059174495303185217234"
                }
            }, 
            {
                "ShardId": "shardId-000000000878", 
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455", 
                    "StartingHashKey": "320265757169356079890549598711728836439"
                }, 
                "ParentShardId": "shardId-000000000876", 
                "AdjacentParentShardId": "shardId-000000000826", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468092131843529324182838901668065945370512876123874"
                }
            }, 
            {
                "ShardId": "shardId-000000000881", 
                "HashKeyRange": {
                    "EndingHashKey": "24305883360235455442838331915739612791", 
                    "StartingHashKey": "0"
                }, 
                "ParentShardId": "shardId-000000000829", 
                "AdjacentParentShardId": "shardId-000000000879", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468438306311246115045864960622812265886170210121490"
                }
            }, 
            {
                "ShardId": "shardId-000000000884", 
                "HashKeyRange": {
                    "EndingHashKey": "48611766697147529041415588084174029605", 
                    "StartingHashKey": "24305883360235455442838331915739612792"
                }, 
                "ParentShardId": "shardId-000000000880", 
                "AdjacentParentShardId": "shardId-000000000882", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468462993236180888445682640662940087622357330442050"
                }
            }, 
            {
                "ShardId": "shardId-000000000887", 
                "HashKeyRange": {
                    "EndingHashKey": "72917650050263527726213560516824835796", 
                    "StartingHashKey": "48611766697147529041415588084174029606"
                }, 
                "ParentShardId": "shardId-000000000883", 
                "AdjacentParentShardId": "shardId-000000000885", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468485896101499779395648997845606097489623972329330"
                }
            }, 
            {
                "ShardId": "shardId-000000000890", 
                "HashKeyRange": {
                    "EndingHashKey": "97223533390416380489178248897960958917", 
                    "StartingHashKey": "72917650050263527726213560516824835797"
                }, 
                "ParentShardId": "shardId-000000000886", 
                "AdjacentParentShardId": "shardId-000000000888", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468508798966818670345615355028272107356890614216610"
                }
            }, 
            {
                "ShardId": "shardId-000000000894", 
                "HashKeyRange": {
                    "EndingHashKey": "121529416757035643727222635693274104944", 
                    "StartingHashKey": "97223533390416380489178248897960958918"
                }, 
                "ParentShardId": "shardId-000000000891", 
                "AdjacentParentShardId": "shardId-000000000892", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468544569362117113465134378320381435328746206803938"
                }
            }, 
            {
                "ShardId": "shardId-000000000897", 
                "HashKeyRange": {
                    "EndingHashKey": "145835300102049674424392379258801301030", 
                    "StartingHashKey": "121529416757035643727222635693274104945"
                }, 
                "ParentShardId": "shardId-000000000893", 
                "AdjacentParentShardId": "shardId-000000000895", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468566044979743298455219677217077995700876465944594"
                }
            }, 
            {
                "ShardId": "shardId-000000000900", 
                "HashKeyRange": {
                    "EndingHashKey": "170141183450304490751314526534861009228", 
                    "StartingHashKey": "145835300102049674424392379258801301031"
                }, 
                "ParentShardId": "shardId-000000000896", 
                "AdjacentParentShardId": "shardId-000000000898", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468589304656985365895156298971236367941927203518530"
                }
            }, 
            {
                "ShardId": "shardId-000000000903", 
                "HashKeyRange": {
                    "EndingHashKey": "194447066811106967209212391235731229719", 
                    "StartingHashKey": "170141183450304490751314526534861009229"
                }, 
                "ParentShardId": "shardId-000000000899", 
                "AdjacentParentShardId": "shardId-000000000901", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468611850710381080355152391582410015435409749719154"
                }
            }, 
            {
                "ShardId": "shardId-000000000906", 
                "HashKeyRange": {
                    "EndingHashKey": "218752950167879235613208499400926401491", 
                    "StartingHashKey": "194447066811106967209212391235731229720"
                }, 
                "ParentShardId": "shardId-000000000902", 
                "AdjacentParentShardId": "shardId-000000000904", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468634039951853618325178219622091300555108200233122"
                }
            }, 
            {
                "ShardId": "shardId-000000000909", 
                "HashKeyRange": {
                    "EndingHashKey": "260215927711226584039780879896832127821", 
                    "StartingHashKey": "243058833536118888773728969223528864133"
                }, 
                "ParentShardId": "shardId-000000000868", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468656586005249332785174312233264948048590746433746"
                }
            }, 
            {
                "ShardId": "shardId-000000000910", 
                "HashKeyRange": {
                    "EndingHashKey": "243058833536118888773728969223528864132", 
                    "StartingHashKey": "218752950167879235613208499400926401492"
                }, 
                "ParentShardId": "shardId-000000000907", 
                "AdjacentParentShardId": "shardId-000000000908", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554732662107076202636804524165539395945923134904875234"
                }
            }, 
            {
                "ShardId": "shardId-000000000911", 
                "HashKeyRange": {
                    "EndingHashKey": "267364716887074354948759097083674869129", 
                    "StartingHashKey": "260215927711226584039780879896832127822"
                }, 
                "ParentShardId": "shardId-000000000871", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468679109757899848714547263308720322893711786653938"
                }
            }, 
            {
                "ShardId": "shardId-000000000912", 
                "HashKeyRange": {
                    "EndingHashKey": "280232537532583562635223583154768590785", 
                    "StartingHashKey": "267364716887074354948759097083674869130"
                }, 
                "ParentShardId": "shardId-000000000871", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554468678775246721870755200140272946233168289196947714"
                }
            }
        ]
    }
}

I did a little math and found that some shards have a range of 24305883360235455442838331915739612791 values, while the smallest shard only takes a range of 7148789175847770908978217186842741307 values.

This is causing data to be distributed unevenly across the shards, and our consumer application takes much longer to keep up to real time on the larger shards.

Here is our config:

[
  {
   "streamName":"profile_vertica_logs_production_localytics",
   "region":"us-east-1",
   "scaleOnOperation":"PUT",
   "minShards":12,
   "maxShards":40,
   "scaleUp": {
       "scaleThresholdPct":80,
       "scaleAfterMins":1,
       "scalePct":50
   },
   "scaleDown":{
       "scaleThresholdPct": 50,
       "scaleAfterMins": 30,
       "scalePct": 20,
       "coolOffMins": 15
   }
  }
]

I'm not sure how the scaling operation works. Does it try to size shards evenly? I'm concerned this may be causing us production issues.

IanMeyers commented 9 years ago

Hi,

It does always try to evenly balance the keyspace, so this is not what we'd expect to see. Can you please run the following command to generate the ScalingClient report for this stream, and paste it back into this issue?

java -cp target/KinesisScalingUtils.jar-complete.jar -Dstream-name=<stream name> -Dregion=<region name> -Dscaling-action=report ScalingClient

Thanks,

Ian

joshcough commented 9 years ago

Shard shardId-000000000881 - Start: 0, End: 24305883360235455442838331915739612791, Keyspace Width: 24305883360235455442838331915739612791 (7%) Shard shardId-000000000884 - Start: 24305883360235455442838331915739612792, End: 48611766697147529041415588084174029605, Keyspace Width: 24305883336912073598577256168434416813 (7%) Shard shardId-000000000887 - Start: 48611766697147529041415588084174029606, End: 72917650050263527726213560516824835796, Keyspace Width: 24305883353115998684797972432650806190 (7%) Shard shardId-000000000890 - Start: 72917650050263527726213560516824835797, End: 97223533390416380489178248897960958917, Keyspace Width: 24305883340152852762964688381136123120 (7%) Shard shardId-000000000894 - Start: 97223533390416380489178248897960958918, End: 121529416757035643727222635693274104944, Keyspace Width: 24305883366619263238044386795313146026 (7%) Shard shardId-000000000897 - Start: 121529416757035643727222635693274104945, End: 145835300102049674424392379258801301030, Keyspace Width: 24305883345014030697169743565527196085 (7%) Shard shardId-000000000900 - Start: 145835300102049674424392379258801301031, End: 170141183450304490751314526534861009228, Keyspace Width: 24305883348254816326922147276059708197 (7%) Shard shardId-000000000903 - Start: 170141183450304490751314526534861009229, End: 194447066811106967209212391235731229719, Keyspace Width: 24305883360802476457897864700870220490 (7%) Shard shardId-000000000906 - Start: 194447066811106967209212391235731229720, End: 218752950167879235613208499400926401491, Keyspace Width: 24305883356772268403996108165195171771 (7%) Shard shardId-000000000910 - Start: 218752950167879235613208499400926401492, End: 243058833536118888773728969223528864132, Keyspace Width: 24305883368239653160520469822602462640 (7%) Shard shardId-000000000909 - Start: 243058833536118888773728969223528864133, End: 260215927711226584039780879896832127821, Keyspace Width: 17157094175107695266051910673303263688 (5%) Shard shardId-000000000911 - Start: 260215927711226584039780879896832127822, End: 267364716887074354948759097083674869129, Keyspace Width: 7148789175847770908978217186842741307 (2%) Shard shardId-000000000912 - Start: 267364716887074354948759097083674869130, End: 280232537532583562635223583154768590785, Keyspace Width: 12867820645509207686464486071093721655 (4%) Shard shardId-000000000874 - Start: 280232537532583562635223583154768590786, End: 300249147351239883522035629954940250949, Keyspace Width: 20016609818656320886812046800171660163 (6%) Shard shardId-000000000877 - Start: 300249147351239883522035629954940250950, End: 320265757169356079890549598711728836438, Keyspace Width: 20016609818116196368513968756788585488 (6%) Shard shardId-000000000878 - Start: 320265757169356079890549598711728836439, End: 340282366920938463463374607431768211455, Keyspace Width: 20016609751582383572825008720039375016 (6%)

IanMeyers commented 9 years ago

Thanks for that. Interesting, and I can't reproduce why this keyspace would be unevenly divided for only a small portion.

I would advise one of two options:

  1. Merge Shard shardId-000000000911 and shardId-000000000912, which will create a new shard of size 6%, which would only leave you a very little unbalanced - 1/3rd of your stream would have 5% more capacity than the other 2/3rds, which isn't likely to create any issues.
  2. Alternatively, you can run a rebalance using the scaling utility, which will result in most of your shards to be modified to meet the 6.25% balancing requirement. To do this, please run:

java -cp target/KinesisScalingUtils.jar-complete.jar -Dstream-name=<your stream name> -Dregion=<your stream region> -Dscaling-action=resize -Dcount=16 ScalingClient

Please note that this will retire a large number of shards and create new ones, which is something that will affect the runtime of all your Kinesis applications, and something that you should extensively test in a safe environment before running.

I would really recommend that you select option 1, as it will cause much less impact on your applications and is much less invasive, and the level of inbalance in the stream is not likely to cause an issue.

Furthermore, unless you are seeing issues, you absolutely can leave the stream as it is - the only impact you have today is that shardId-000000000911 has 3x more capacity than your smaller shards.

jr314159 commented 9 years ago

Hmm, it looks like our application must have crashed a while ago. We have it running under Elastic Beanstalk and I suppose haven't set up proper application monitoring. I rebooted the app, and it promptly scaled down to 13 evenly distributed shards. I suppose if the application crashed in the middle of a scaling action, that could account for why our shards were left in this unbalanced state. Thanks for the help!