dbt-labs / dbt-athena

The athena adapter plugin for dbt (https://getdbt.com)
https://dbt-athena.github.io
Apache License 2.0
228 stars 100 forks source link

[Feature] add 'force_batch_from_source=true' to batch right from the start to allow handling large source and large target tables #697

Open pransito opened 3 months ago

pransito commented 3 months ago

Is this your first time submitting a feature request?

Describe the feature

When one has a very large source table and one has a query on that source table that leads to a large target table then one can run into query timeout problems on Athena.

force_batch=true is good for handling many partitions and not run into the limit of 100 open partitions. However, in force_batch=true the first thing that happens is that the model gets written without partitions into a temp table. If that table is very big, that can time out, because that can be a query that means copying in one go many many GB of data.

In that case (or in fact, always) the partitions should be leveraged right from the get go. I.e. each partition should be used right from the start and written into the target (staging) table (using threading).

I believe THIS CODE must be adapted.

I am not sure if force_batch=true should be reworked to show the above behavior always or if a new flag force_batch_from_source=true should be introduced.

Describe alternatives you've considered

No response

Who will this benefit?

This will benefit people that work with very large partitioned Athena tables (100GB plus) that they want to write to another Athena table that can have on top many partitions.

Are you interested in contributing this feature?

Yes.

Anything else?

No response

nicor88 commented 3 months ago

To implement what you suggested, the partitions you are dealing with must be known beforehand, making the implementation hard, hence the solution proposed by the adapter. The partitions cannot be known beforehand without running the query provided by the user, but if you look at some materialization like likeinsert_by_period, see here the partition is sort of know beforehand allowing a partition iteration/filter in the base user SQL query, therefore I suggest to look into that instead of changing the default adapter behavior.

Using force_batch_from_source might work only when dealing with one table, and might not suit all the use cases, where for example a big table is joined with another smal table (e.g. for enrichment) - and again, I believe that something like insert_by_period might be better if you are dealing with date partitions.

Edit: if we plan to introduce force_batch_from_source (to deal with not partition dates) I suggest to make it as a dict: force_batch_from_source={'enabled': true, 'source_iteration_columns': ['product_id'], batch_size: '30'} pretty much we could consider to use a dictionary to specifcy how the iteration on those partitions looks like

pransito commented 3 months ago

@nicor88 Thank you for your comment!

mmmh I wrote the below code to override one of the default macros (create_table_as_with_partitions). And the partitions I know by running the SQL that is supposed to be materialized (with LIMIT 0), which should be less heavy then creating the full table in a temp location, no (get_columns_from_sql)? see here:

{% macro get_columns_from_sql(compiled_sql) %}
    {%- set limited_sql = "WITH subquery AS (" ~ compiled_sql ~ ") SELECT * FROM subquery LIMIT 0" -%}
    {%- set result = run_query(limited_sql) -%}

    {%- if execute %}
        {%- set column_names = [] -%}
        {%- for column in result.columns -%}
            {%- do column_names.append(column.name) -%}
        {%- endfor -%}
        {{ return(column_names) }}
    {%- endif -%}
{% endmacro %}

{% macro create_table_as_with_partitions(temporary, relation, compiled_code, language='sql') -%}

    -- this will override the built-in macro

    {%- do log('!!! HELLO NEW MACRO FOR BATCHING FROM SOURCE !!!: ' ~ relation) -%}

    {% set partitions_batches = get_partition_batches(sql=compiled_code, as_subquery=True) %}
    {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

    {%- set dest_columns = get_columns_from_sql(compiled_code) -%}
    {% do log('COLUMNS ARE: ' ~ dest_columns | join(', ')) %}
    {%- set dest_cols_csv = dest_columns | join(', ') -%}
    {% do log('dest_cols_csv is: ' ~ dest_cols_csv) %}

    {%- for batch in partitions_batches -%}
        {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}

        {%- if loop.index == 1 -%}
            {%- set create_target_relation_sql -%}
                select {{ dest_cols_csv }}
                from ({{ compiled_code }})
                where {{ batch }}
            {%- endset -%}
            {%- do run_query(create_table_as(temporary, relation, create_target_relation_sql, language)) -%}
        {%- else -%}
            {%- set insert_batch_partitions_sql -%}
                insert into {{ relation }} ({{ dest_cols_csv }})
                select {{ dest_cols_csv }}
                from ({{ compiled_code }})
                where {{ batch }}
            {%- endset -%}

            {%- do run_query(insert_batch_partitions_sql) -%}
        {%- endif -%}

    {%- endfor -%}

    select 'SUCCESSFULLY CREATED TABLE {{ relation }}'

{%- endmacro %}

What do you think?

Note: what's not nice of this approach is that it is a for loop. Threads are not used. A threaded materialization where all insert into's run in parallel would be nicer. However, the single-threadedness is also the case in the original implementation of create_table_as_with_partitions, correct?

nicor88 commented 3 months ago

The implementation that you suggested was similar to the original implementation. Then we decided to persist the data in a not partitioned table, and then use that table as a source for the inserts statements run afterwards.

The problem with the solution that you proposed is that the cost will be really high if the batch filter is executed on not partitioned column, causing the same query to run multiple time over and over, scanning the same bytes multiple times. Of course if the same partition used in the partition_by of the target table is the same used in the source table that's different, the partition pruning will work, and less bytes will be scanned.

Said so, we can allow a behavior similar to what you proposed via another flag, but if that's the case I would like to have also control over the name of the source partition column used for filtering - pretty much what was suggested in my message above. NOTE: the current behavior of the materialization MUST be kept for the reason explained above, if a user mistakenly use not partitions columns from the source table that causes high cost.

Regarding the multithreading, I discourage from using multiple inserts concurrently due to Athena limitations on amount of queries per seconds, therefore the current behavior/and the one that we have in the macro can be kept.

pransito commented 3 months ago

@nicor88 thank you for your response. I understand!

In our case the source table is huge so copying it first to an unpartitioned table (even if filtered to just a few days of data) is not possible. Further we work with a partition projected table as the source which (in most cases) enforces mentioning the partitioning columns in the where clause (and we do). We use a macro which always fill the base filter on the partition projected partitions. But yeah if one could somehow limit the usage of force_batch_from_source=true to only cases where a proper base filtering on partition columns is always done, then it would be safer. Is that what you meant?

Othwerise, insert_by_period is an option, I guess.

nicor88 commented 3 months ago

What I mean is that the

select {{ dest_cols_csv }}
from ({{ compiled_code }})
where {{ batch }}

reduce the byte scanned if the {{batch}} filter condition acts on the source table partitions. Said so, the {{batch}} filter condition to be effective must be aware of the partitions in the source, and everything gets complicated if the compiled code contains join across tables.

Overall I'm fine to add force_batch_from_source but with this conditions:

Are you up to an alligment call to discuss the above? Looks like you are in Germany like me, we could find a slot. Feel free to reach me out in dbt slack channel to allign on this. Thanks.

CommonCrisis commented 3 months ago

btw maybe this helps: https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main

We are using the insert by period macro cause our source tables are really really big and we need to split the sources for each incremental run.

This macro adds custom filter values to further reduce the temp. table created for the merge into the target. We have the limitation of 30 min Athena runs and processing one day can take quite a while.

Idk if it helps processing your source tables - for us it is working like a charm for months now.

jessedobbelaere commented 3 months ago

Nice to see you added Iceberg support @CommonCrisis 👌 I wonder if there's a future in the dbt-athena project for that macro or if we should keep it third-party. It has helped us a lot as well to process big source tables that otherwise timeout on initial run

CommonCrisis commented 3 months ago

Nice to see you added Iceberg support @CommonCrisis 👌 I wonder if there's a future in the dbt-athena project for that macro or if we should keep it third-party. It has helped us a lot as well to process big source tables that otherwise timeout on initial run

I guess third-party and as dbt package would make sense. Like dbt-utils but for athena related macros

nicor88 commented 3 months ago

I like the idea of another package that might include the insert-by-period macros/utilities. In the past we had quite some users using what @jessedobbelaere proposed.

I'm wondering where shall we put such utility package and how can we call it?

CommonCrisis commented 3 months ago

Why not simply dbt-athena-utils in https://github.com/dbt-athena - everybody knows dbt-utils so everybody would know what to expect

nicor88 commented 3 months ago

We have https://github.com/dbt-athena/athena-utils that is the main reference for athena-utils to be used within dbt. How about that?

CommonCrisis commented 3 months ago

sure why not. We can add add dbt_custom_materializations folder there.

But ideally we also make the adapter more flexible. E.g. I had to copy paste the build iceberg logic into a separate file to make it work with the insert_by_period macro.

nicor88 commented 3 months ago

@CommonCrisis absolutely, we can refactor what we need in the adapter to avoid overwriting as much as we can - I will have a look at your https://github.com/CommonCrisis/insert-by-period-dbt-athena/tree/main to check the implementation.

jessedobbelaere commented 2 months ago

💡 Last night, dbt made this announcement about a new incremental-strategy for time-series event data. It introduces incremental_strategy='microbatch' to enable micro-batching to replace the custom insert-by-period macro's in the wild. It also seems to simplify things a lot, less config and including sampling for dev runs. Let's focus on integrating with this feature when it comes out?