stevearc / flywheel

Object mapper for Amazon's DynamoDB
MIT License
128 stars 25 forks source link

gen() iterator should support resumable paged queries/scans #34

Open natecode opened 8 years ago

natecode commented 8 years ago

I’d like to be able to provide an API to our users that is backed by a query that returns a large number of results. We’d like to be able to retrieve and store a cursor from a result set (query or scan), then create a new result set that picks up where the last one left off, possibly minutes later. I know that dynamo3 supports this internally, but it's not exported via flywheel yet.

Steve replied via email:

I think you'd have to have a new terminator on the query (like gen(), all()). I think you'd need page() to fetch a single page from DynamoDB so that you have some knowledge of where the database query left off. The default gen() implementation just provides a straight-up iterator that handles it all transparently. We could make page() return an object that extends list and has a 'cursor' attribute. Then you make the query terminators also take 'cursor' as a keyword argument, pass that through to dynamo3 and it should just work. I'll have to make a couple of changes in dynamo3 to support fetching pages instead of just iterators, but that should be pretty easy.

natecode commented 8 years ago

@stevearc This sounds good. I like that gen() would be implemented in terms of page(). Hopefully page size would be configurable as well in the first call.

stevearc commented 8 years ago

I ended up going about it in a slightly different way than initially planned, but it's pretty similar.

You can see an example of what the flow would look like in this test case

You should be able to configure the query behavior to do what you want with the Limit class. It's in dynamo3, but it's also imported in flywheel so you can import it directly from there.

Released flywheel 0.4.6. Let me know if this works for you!

natecode commented 8 years ago

I ran some tests on this code, and it seems to be basically working. I found some issues with the current approach I’m not sure how to handle.

  1. Getting the primary key for an index

Your test code shows how to use pkdict to get the primary key from the last item in a result set in order to restart the scan at that point. However, for a query on an index, it looks like I need to specify all the keys (and includes) from that index. How do I get the equivalent of pkdict for an index?

  1. Query/scan terminates late

I’m running a query of an index with 4 matching items, with a Limit(scan_limit=2, strict=True). Debug log shows that each query is submitted properly and the results are what I expect, but the LastEvaluatedKey in the response for the 2nd query is non-null. (It is the last item in that result, item #4). According to the docs, it should be null instead when you hit the end of the results.

Sequence:

  1. Query(app_id=‘x’, ExclusiveStartKey=None).limit(2) -> two Items, last_evaluated_key=items[1]
  2. Query(app_id=‘x’, ExclusiveStartKey=items[1]).limit(2) -> two Items, last_evaluated_key=items[3] **
  3. Query(app_id=‘x’, last_evaluated_key=items[3]).limit(2) -> no Items, no last_evaluated_key

\ I expect LastEvaluatedKey to be null here

If I continue with the query, the 3rd query gets a response of 0 items, so my code terminates then. But it would be nice not to have an extra round trip each time just to find there are no more results. I’m not sure if this is a DDB issue or Flywheel.

Here’s my rough workaround:

last_evaluated_key = None
while True:
        # This calls .filter().limit().all() on the index
        versions = app.get_items(engine, last_evaluated_key)
        for x in versions:
            print x.id

        # Catch partial or no results = done
        if len(versions) < PAGE_SIZE:
            break

        # Get primary key for index query to resume
        dump = x.ddb_dump_()
        last_evaluated_key = {'id': dump['id'], 'oid': dump['oid'], 'date': dump['date']}
  1. I don’t quite understand item_limit vs. scan_limit

I assume item_limit is when you want a specific number of items and you don’t care how many per Query/Scan operation, nor do you want to handle pagination. In the scan_limit case, you want to control the number of items fetched. Is that right?

Thanks!

stevearc commented 8 years ago
  1. You should be able to just pass in the dict that has the hash key and optional range key of that particular index. I'll find a place to add a convenient method to do that for you.
  2. This seems to be a DynamoDB behavior. When you pass in a Limit and it happens to hit the limit right at the final item, it will return a LastEvaluatedKey. ¯\_(ツ)_/¯
  3. Yeah, it's not as clear as it could be. scan_limit is passed in directly to DynamoDB as the Limit parameter. A careful reading of the docs reveals that Limit doesn't do what you would expect, nor probably what you want most of the time. It sets a hard limit on the items scanned, which does not mean it will be the number of items returned. If you don't have any scan filters then it behaves as expected. If you have any scan filters it's possible for the query/scan to return no results even if some exist in the table, because Dynamo will scan up to the Limit and then return whatever it found.

This is why I added item_limit. If you pass in an item_limit dynamo3 will continue to query DynamoDB until either the item_limit is reached OR there are no more results in the DB. min_scan_limit is there for the case when you're fetching a small item_limit (say, 1). If you set the Limit to just be 1, you may have to do many many queries to finally retrieve a result. min_scan_limit is there to make sure you're always scanning a minimum amount of items. Since your results only come back in pages, the total results may exceed the item_limit (ex. you only need 1 more, you make another query and get back 5 results). By default it will return all of the results, but you can pass in strict=True to chop off the extras.

I hope this clears up some of the confusion. I still think item_limit isn't a great name, so if you have suggestions for a better one please let me know. Aaaaand I'll look for somewhere to put a method that will construct primary keys for indexes.

natecode commented 8 years ago

I actually had to pass in an ExclusiveStartKey comprised of 3 fields in the case of an includes index that had a hash key, range key, and one other included field. That surprised me since I thought it would just need the hash/range key for the index. Including anything less than those 3 fields gives this exception:

  File "bluesteel/models.py", line 347, in <module>
    main()
  File "bluesteel/models.py", line 345, in main
    last_key = {'oid': x.oid, 'date': x.ddb_dump_()['date']}
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/flywheel/query.py", line 79, in gen
    for result in results:
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/result.py", line 254, in __next__
    return six.next(self.iterator)
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/result.py", line 283, in fetch
    data = self.connection.call(*self.args, **self.kwargs)
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/connection.py", line 230, in call
    exc.re_raise()
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/exception.py", line 22, in re_raise
    six.reraise(type(self), self, self.exc_info[2])
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/connection.py", line 220, in call
    data = op(**kwargs)
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/botocore/client.py", line 310, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/botocore/client.py", line 396, in _make_api_call
    raise ClientError(parsed_response, operation_name)
dynamo3.exception.DynamoDBError: ValidationException: Exclusive Start Key must have same size as table's key schema
Args: {
 'ExclusiveStartKey': {'date': {'N': u'1450915200.000000'},
                       'oid': {'S': u'av-J4DFNhwnbDf'}},
 ...

Ignore the lazy use of ddb_dump_() as this was just prototype code.

This index was defined as:

    __metadata__ = {
        'global_indexes': [
            GlobalIndex.include('appversions-for-app', 'app_id', 'date',
                includes=['oid']),
        ],
    }

Good explanation on the various limits. That should definitely go in the docs. Maybe call it desired_page_size or something?

stevearc commented 8 years ago

I may not be reading this correctly, but from the example you gave it looks like you were passing in date and old, when you needed to pass in app_id and date. Have you tried those two and was the result the same? I tried it on a global index locally and it seemed to work fine.

stevearc commented 8 years ago

After a fantastic test case by Nate and some digging, it appears that the ExclusiveStartKey for an index query (global and local) needs to contain both the primary key of the index and the primary key of the table. This isn't documented anywhere I could find in the Dynamo docs because they just say "use the thing we give you as LastEvaluatedKey" and assume that you won't be constructing it yourself.

I'll account for that in the method I'm making for grabbing the primary key from an index and it should Just Work™

natecode commented 8 years ago

Very nice, thanks for tracking this down.

stevearc commented 8 years ago

Okay, I pushed out version 0.4.7 which has the index_pk_dict_() method on the model, and index_pk_dict() on the metadata object. See if that works for you.

natecode commented 8 years ago

Sorry, I’m getting a crash when using an index that has a datetime as part of the key. The key looks like this coming back from index_pk_dict() ok:

{'date': datetime.datetime(2015, 12, 20, 0, 0, tzinfo=<flywheel.fields.types.UTCTimezone object at 0x110c6c250>), 'oid': u'av-VmogFmEt6ey', 'app_id': u'app-IzGMSIS0p0m’}

But then the datetime is not being encoded, resulting in this exception:

File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/flywheel/query.py", line 115, in all exclusive_start_key=exclusive_start_key)) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/flywheel/query.py", line 78, in gen **kwargs) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/connection.py", line 1132, in query self.dynamizer.maybe_encode_keys(exclusive_start_key) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 170, in maybe_encode_keys ret[k] = self.encode(v) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 175, in encode return dict([self.raw_encode(value)]) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 156, in raw_encode (value, type(value))) ValueError: No encoder for value '2015-12-20 00:00:00+00:00' of type '<type 'datetime.datetime’>'

stevearc commented 8 years ago

If you're calling the index_pk_dict() method on the metadata object directly, ddb_dump defaults to False. Try passing in db_dump=True or using the index_pk_dict_() method on the model itself

On Thu, Dec 17, 2015 at 10:52 AM natecode notifications@github.com wrote:

Sorry, I’m getting a crash when using an index that has a datetime as part of the key. The key looks like this coming back from index_pk_dict() ok:

{'date': datetime.datetime(2015, 12, 20, 0, 0, tzinfo=<flywheel.fields.types.UTCTimezone object at 0x110c6c250>), 'oid': u'av-VmogFmEt6ey', 'app_id': u'app-IzGMSIS0p0m’}

But then the datetime is not being encoded, resulting in this exception:

File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/flywheel/query.py", line 115, in all exclusive_start_key=exclusive_start_key)) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/flywheel/query.py", line 78, in gen **kwargs) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/connection.py", line 1132, in query self.dynamizer.maybe_encode_keys(exclusive_start_key) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 170, in maybe_encode_keys ret[k] = self.encode(v) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 175, in encode return dict([self.raw_encode(value)]) File "/Users/nate/venvs/orangecrush/lib/python2.7/site-packages/dynamo3/types.py", line 156, in raw_encode (value, type(value))) ValueError: No encoder for value '2015-12-20 00:00:00+00:00' of type '<type 'datetime.datetime’>'

— Reply to this email directly or view it on GitHub https://github.com/mathcamp/flywheel/issues/34#issuecomment-165546356.