awslabs / emr-dynamodb-connector

Implementations of open source Apache Hadoop/Hive interfaces which allow for ingesting data from Amazon DynamoDB
Apache License 2.0
216 stars 135 forks source link

Limited operator support in DynamoDBFilterPushdown for table hash key #157

Open billonahill opened 2 years ago

billonahill commented 2 years ago

In my testing I'm seeing that GSIs are not always picked up when both the table hash and the range key exist in the predicate. More specifically I'm seeing the GSI only gets used when the table hash is a single EQ operator. So something this gets the GSI:

WHERE hash_key = 1 AND range_key > 10 and AND range_key < 20

But more complex expressions using IN or < > on hash_key fail to use the GSI, like:

WHERE hash_key IN (1) AND range_key > 10 and AND range_key < 20
or
WHERE hash_key > 0 AND hash_key < 2 AND range_key > 10 and AND range_key < 20
luyuanhao commented 2 years ago

That behavior is actually due to the unsorted nature of the hash/partition key so I guess the hash key condition is not even being pushed down at all in those cases. https://aws.amazon.com/blogs/database/choosing-the-right-dynamodb-partition-key/

Hash keys are distributed on different partition based on hash value instead of their sorting order. So there will be inevitably some kind of scans for hash key range-like predicate.

An alternative or optimization would be splitting the IN condition into multiple EQ sub-queries and union them to yield the final result. But this might need to be done in SQL level at this moment.

billonahill commented 2 years ago

Thanks for the quick response @luyuanhao. I suspected the same w.r.t. to my second example with < > (or BETWEEN) on the hash key, since the lookup is key based. But it seems like IN or OR/= should use the index, since they're discrete, yet neither does:

WHERE hash_key IN (1, 2) AND range_key ...
WHERE (hash_key = 1 OR hash_key = 2) AND range_key ...

Basically, the only way I can get the index selected is if there is one and only one = predicate for the hash key, plus a range key predicate, which seems like a bug.

luyuanhao commented 2 years ago

The current logic for the query execution is that the query will be translated into a single DynamoDB client call, and if any key condition can be used then it will be pushed down. But in the case you give, it can not be handled by a single range query since there are more than 1 hash key, so the DynamoDB client will use a table scan not matter if it is an explicit IN or an equivalent OR condition.

luyuanhao commented 2 years ago

We might need to alter the connector logic to make it able to send more than one client call for a single query, then the push down support for such query can be implemented.

luyuanhao commented 2 years ago

Btw I am no longer at aws and might not have enough personal time to work on this refactoring. So need to see if anyone from aws or someone else can pickup this request.

billonahill commented 2 years ago

So then with the current 1 query implementation that implies the Hive parallelism will always be 1 as well right? If so, yeah a design where the table keys in the predicate can be partitioned into multiple tasks/queries would be ideal.

luyuanhao commented 2 years ago

No. It has nothing to do with Hive parallelism. The single DynamoDB client call I mentioned is at per Hive split level.

As you can see in the following code, there is no way to assign more than one hash key EQ condition in a DynamoDB query request: https://github.com/awslabs/emr-dynamodb-connector/blob/be5320d483e46569fd2efa8d4e236c1c9023d444/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java#L176

billonahill commented 2 years ago

If there is only 1 dynamo call per query and there is a 1:1 mapping of dynamo call to hive split, then wouldn't that imply only 1 hive split, and a Hive parallelism of 1 in all cases?

klimber commented 11 months ago

I'm working on a solution for these kinds of queries, but I could not really find a way to work with the "one dynamoDB call per split" of the current StorageHandler. Thus I've actually created a hive UDTF, which can then be applied to fetch dynamoDB data when we have an existing table with hash keys. For example:

create temporary function ddb_query as 'com.klimber.hiveddbudtf.HiveDdbQueryUdtf';

create table ddb_data as
select 
    hash_keys.hash_key,
    ddb.*
from hash_keys
lateral view
    ddb_query(
        named_struct(
            'tableName', 'MyTable',
            'indexName', null,
            'hiveDdbColumnMapping', 'attribute_1:attribute1,attribute_2:attribute2',
            'hiveTypeMapping', 'string,bigint'
        ),
        struct(
            named_struct(
                'attribute', 'hashKeyAtt',
                'attributeType', 'S',
                'operator', 'EQ',
                'value', hash_keys.hash_key
            )
        )
    ) ddb;

Multiple filters can be passed if needed, both using hash/range keys and other attributes, for now all apply with "AND" logic.

The query above would result in a table called ddb_data with 3 columns: hash_key, attribute_1, attribute_2. Where attribute_1 is string and attribute_2 is bigint. Several other data types are also supported.

I'd like to, if possible, contribute it to emr-dynamodb-connector, please let me know if that would be possible and how to proceed.

EDIT: https://github.com/klimber/HiveDdbQueryUDTF