mhart / kinesalite

An implementation of Amazon's Kinesis built on LevelDB
MIT License
808 stars 86 forks source link

Add margin for future iterator detection #110

Closed bobwanglyft closed 3 years ago

bobwanglyft commented 3 years ago

I am running localstack on docker-desktop to simulate and read records from Dynamostreams. However, I notice that the clock is not strictly moving forward between two GetRecords calls. To demonstrate this, I hacked both localstack and kinesalite locally to print out more information. Here is what I have found:

This is the response of GetRecords from nth request to kinesis.

localstack             | {'Records': [], 'NextShardIterator': 'AAAAAAAAAAG7pzbcqokO/DSfokz5+Jx8fAH+4UtLFhwLcoMyvciGsBMqnaFsF/Ok9kUbgAG6xk06mv5QuiZbYxtUg5yIMRutUhuvB9gHkVsvsuNgTiSs+bs1utlznffDJ6cJpcNk7ih83RTiks06vpU+Ofkfqz5s7C2RdQz4Q5aUX4j25pLZMLIWRBDP3C7pvh8QePYwf3f4r3gwoQWJQceff+tYzET0'}

We use the NextShardIterator from the above response in (n+1)th request to get next batch of records, but get following InvalidArgumentException.

localstack             | {'response': {'Error': {'Message': 'now is 1612305062032 iteratorTime is 1612305062033', 'Code': 'InvalidArgumentException'}, 'ResponseMetadata': {'RequestId': '546cb901-65a6-11eb-b525-19cc2cbfb0ba', 'HTTPStatusCode': 400, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'content-length': '100', 'x-amzn-requestid': '546cb901-65a6-11eb-b525-19cc2cbfb0ba', 'x-amz-id-2': 'iynAA7keJMuh9R6OGeERlg91sqzIKa82+asli2UqWC37GQKFQ8/JDTf/uTf8FVbJbgYXDQ4gZCp6ik/Uju5WAid1ZkdcFjBk', 'connection': 'close', 'access-control-allow-origin': '*', 'access-control-allow-methods': 'HEAD,GET,PUT,POST,DELETE,OPTIONS,PATCH', 'access-control-allow-headers': 'authorization,content-type,content-length,content-md5,cache-control,x-amz-content-sha256,x-amz-date,x-amz-security-token,x-amz-user-agent,x-amz-target,x-amz-acl,x-amz-version-id,x-localstack-target,x-amz-tagging', 'access-control-expose-headers': 'x-amz-version-id', 'date': 'Tue, 02 Feb 2021 22:31:02 GMT', 'server': 'hypercorn-h11'}, 'RetryAttempts': 0}, 'message': 'now is 1612305062032 iteratorTime is 1612305062033'}, 'operation_name': 'GetRecords'}
localstack             | An error occurred (InvalidArgumentException) when calling the GetRecords operation: now is 1612305062032 iteratorTime is 1612305062033
localstack             | shard iterator used is {'ShardIterator': 'AAAAAAAAAAG7pzbcqokO/DSfokz5+Jx8fAH+4UtLFhwLcoMyvciGsBMqnaFsF/Ok9kUbgAG6xk06mv5QuiZbYxtUg5yIMRutUhuvB9gHkVsvsuNgTiSs+bs1utlznffDJ6cJpcNk7ih83RTiks06vpU+Ofkfqz5s7C2RdQz4Q5aUX4j25pLZMLIWRBDP3C7pvh8QePYwf3f4r3gwoQWJQceff+tYzET0'}
localstack             | 2021-02-02T22:31:02:ERROR:ddb_streams_api: Exception on / [POST]
localstack             | Traceback (most recent call last):
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
localstack             |     response = self.full_dispatch_request()
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
localstack             |     rv = self.handle_user_exception(e)
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask_cors/extension.py", line 165, in wrapped_function
localstack             |     return cors_after_request(app.make_response(f(*args, **kwargs)))
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
localstack             |     reraise(exc_type, exc_value, tb)
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
localstack             |     raise value
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
localstack             |     rv = self.dispatch_request()
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
localstack             |     return self.view_functions[rule.endpoint](**req.view_args)
localstack             |   File "/opt/code/localstack/localstack/services/dynamodbstreams/dynamodbstreams_api.py", line 129, in post_request
localstack             |     raise exc
localstack             |   File "/opt/code/localstack/localstack/services/dynamodbstreams/dynamodbstreams_api.py", line 125, in post_request
localstack             |     kinesis_records = kinesis.get_records(**data)
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/botocore/client.py", line 357, in _api_call
localstack             |     return self._make_api_call(operation_name, kwargs)
localstack             |   File "/opt/code/localstack/.venv/lib/python3.8/site-packages/botocore/client.py", line 676, in _make_api_call
localstack             |     raise error_class(parsed_response, operation_name)
localstack             | botocore.errorfactory.InvalidArgumentException: An error occurred (InvalidArgumentException) when calling the GetRecords operation: now is 1612305062032 iteratorTime is 1612305062033

However, retrying (n+1)th request succeeded without any issue.

localstack             | {'ShardIterator': 'AAAAAAAAAAG7pzbcqokO/DSfokz5+Jx8fAH+4UtLFhwLcoMyvciGsBMqnaFsF/Ok9kUbgAG6xk06mv5QuiZbYxtUg5yIMRutUhuvB9gHkVsvsuNgTiSs+bs1utlznffDJ6cJpcNk7ih83RTiks06vpU+Ofkfqz5s7C2RdQz4Q5aUX4j25pLZMLIWRBDP3C7pvh8QePYwf3f4r3gwoQWJQceff+tYzET0'}
localstack             | {'Records': [], 'NextShardIterator': 'AAAAAAAAAAEWv7SU/Yr0tzNCERos4oOd8E+oEKCJPZCXYyXWNMEQqCWL4eLTasHF2Rtdqgw4+b93bZIy/PMIAJoCbRJQRK4aImCbxnhWmFE9h8LgdSzoT85sESIG0f/L392Edm3uqhHTJzve5I7MMUa2RJ1jwWqSuSG5EWh/CZZRNaZy04Neb+HPXtAZMit4eT3WeF0BvUnow/ZTpOQimYl3T/is3Q1j'}

Notice the message (I added locally :p) now is 1612305062032 iteratorTime is 1612305062033 reveals that the request failed at this check. It seems that the clock is not synced between two getRecords calls and I suggest adding 100 milliseconds margin to account for that (value of margin is open for discussion).

Let me know if there is a better solution, thanks in advance! :)