harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

ProvisionedThroughputExceededException Error #158

Open chenyin0126 opened 3 months ago

chenyin0126 commented 3 months ago

We have 2 pods, each pod is a consumer and relying on this library. We used .Scan to consume from all shards. We have 8 Kinesis shards, but we meet this error when 2 consumers runs together. We got

shard shardId-000000000010 error: get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: f450bd05-0d8a-739b-aa98-773a29ee48de, ProvisionedThroughputExceededException: Rate exceeded for Shard - 715119903224/command-data-stream/shardId-000000000010

I think the reason would be the consumer get records every 0.25 seconds as default, but one shard at AWS only allow 5 transactions per second. When 2 consumers works at the same time, both of them scan all shards, so it might happen that one shard got getRecords for 8 times per second, so it raises this error: ProvisionedThroughputExceededException

Any idea what is the best way to solve this?

chenyin0126 commented 3 months ago

cc: @harlow

luanruisong commented 2 months ago

mayby there is something wrong in func isRetriableError

error log

shard {shardId} error: get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard - {shard}
func isRetriableError(err error) bool {
    switch err.(type) {
    case *types.ExpiredIteratorException:
        return true
    case *types.ProvisionedThroughputExceededException:
        return true
    }
    return false
}

func Scan got error ProvisionedThroughputExceededException but this error maybe retry?

https://github.com/harlow/kinesis-consumer/blob/master/consumer.go#L188

i think it want to retry (wait the ticker),but it is finish the goroutine

maybe should change to

        if oe := (*types.ProvisionedThroughputExceededException)(nil); errors.As(err, &oe) {
        return true
    }
luanruisong commented 2 months ago

BTW there are some problems in commit: 6720a01733e68abbf0f45585c6c6ded81526c363 It starts the goroutine repeatedly to process the same shard until ProvisionedThroughputExceededException

error log (There are only two shards)

{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213184044,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118179013785811305901676174508770"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213184046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358002305044641501698271515509490"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213214045,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358045822747370169506734998029042"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213214045,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118207531136970195391339902796514"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213244044,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118236082338052034090689242333922"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213244046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358091472995244637521131381719794"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213274045,"msg":"[CONSUMER] start scan: {shardId} 49647579764499847879996118265124363016636329277354148578"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213274046,"msg":"[CONSUMER] start scan: {shardId} 49651624479223759978416358130257753389514056238450606834"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213285292,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213288257,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213294575,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}
{"level":"info","caller":"/kinesis/logger.go:22","ts":1718213298295,"msg":"[CONSUMER] get records error: operation error Kinesis: GetRecords, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: {requestId}, ProvisionedThroughputExceededException: Rate exceeded for Shard {{shardId}}"}

i think the problems in allgroup.go:121

on ticker channel triggered, it is also put the on runing shard into the channel shardc(no parentShard) in the second loop

    for _, shard := range shards {
        if _, ok := g.shards[*shard.ShardId]; ok {
            continue
        }
        g.shards[*shard.ShardId] = shard
        g.shardsClosed[*shard.ShardId] = make(chan struct{})
    }
    for _, shard := range shards {
        shard := shard // Shadow shard, since we use it in goroutine
        var parent1, parent2 <-chan struct{}
        if shard.ParentShardId != nil {
            parent1 = g.shardsClosed[*shard.ParentShardId]
        }
        if shard.AdjacentParentShardId != nil {
            parent2 = g.shardsClosed[*shard.AdjacentParentShardId]
        }
        go func() {
            // Asynchronously wait for all parents of this shard to be processed
            // before providing it out to our client.  Kinesis guarantees that a
            // given partition key's data will be provided to clients in-order,
            // but when splits or joins happen, we need to process all parents prior
            // to processing children or that ordering guarantee is not maintained.
            if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
                shardc <- shard
            }
        }()
    }