NerdWalletOSS / kinesis-python

Low level, multiprocessing based AWS Kinesis producer & consumer library
Other
117 stars 50 forks source link

How to have multiple consumers? #33

Open jrmeland opened 1 year ago

jrmeland commented 1 year ago

Thank you for your work!

I recently implemented the DynamoDB backend, but when I go to run multiple instances (via Kubernetes) I find that one of the instances errors out, while another picks up. The end result is that only one instance is processing at a time. Is there something I am missing?

Here is the error:

Traceback (most recent call last):
--
File "/usr/local/lib/python3.9/site-packages/kinesis/consumer.py", line 205, in __iter__
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
File "/usr/local/lib/python3.9/site-packages/kinesis/state.py", line 41, in checkpoint
self.dynamo_table.update_item(
File "/usr/local/lib/python3.9/site-packages/boto3/resources/factory.py", line 580, in do_action
response = action(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ConditionalCheckFailedException: An error occurred (ConditionalCheckFailedException) when calling the UpdateItem operation: The conditional request failed

Based on the update expression:

            self.dynamo_table.update_item(
                Key={'shard': shard_id},
                UpdateExpression="set seq = :seq",
                ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
                ExpressionAttributeValues={
                    ':fqdn': fqdn,
                    ':seq': seq,
                }
            )

It seems likely that the condition check that is failing is the fqdn check. Which would seem reasonable if another consumer came in and claimed the shard. But seems like that is a reasonable thing to happen, so I am not sure why there wouldn't be a graceful relinquish of a shard.

Any help is appreciated! Thanks!