dbt-labs / dbt-athena

The athena adapter plugin for dbt (https://getdbt.com)
https://dbt-athena.github.io
Apache License 2.0
228 stars 98 forks source link

[Feature] Allow use of `unique_tmp_table_suffix` flag with Iceberg tables. #688

Open benbeckingsale opened 4 months ago

benbeckingsale commented 4 months ago

Is this your first time submitting a feature request?

Describe the feature

I would like to enable the use of the unique_temp_table_suffix param when using Iceberg tables with a merge strategy. Currently this flag only has an effect if also using hive tables with an insert_overwrite strategy.

Describe alternatives you've considered

Using a different temp_schema per run to avoid table name collisions – but it is much more desirable to keep all temp tables in the same schema.

Who will this benefit?

This should reduce the risk of temp table name collision for anyone using parallel DAGs/processes to write to the same model (and using Iceberg with a merge strategy).

In my case, I have DAGs A and B which write non-overlapping data to the same incremental model (DAG A writes recent data and more frequently; B writes older data and less frequently). If A and B overlap such that the temp table created by A still exists while B tries to create its own, DAG B will fail due to name collision.

DAG A -> temp_schema.model_name__dbt_tmp
DAG B -> temp_schema.model_name__dbt_tmp

If unique_tmp_table_suffix were supported in this case, the table name collision could be avoided.

DAG A -> temp_schema.model_name__018dc4
DAG B -> temp_schema.model_name__bbc3a9

Are you interested in contributing this feature?

Yes

Anything else?

Similar to:

nicor88 commented 4 months ago

I believe that this type of feature could be implemented and it does make sense in some cases.

~Said so, I'm not in favor of adding such a feature because it can lead to weird data issues, and having such a feature to allow that use cases could lead to inconsistent data on the user side~

Reflecting a bit more on this, could be a nice feature to have for some cases:

Here are some edge cases that could lead to ambiguous situations:

Said so @benbeckingsale feel free to propose a feature for it.

benbeckingsale commented 3 months ago

Hi @nicor88 and thanks for your patience.

Just to explain my case a little more in terms of your answer above –

I'm using DBT to incrementally update a model whose input is a 'raw' time-series table (a source updated by an external streaming process which often receives late data) and whose output is an aggregated time-series.

Both input and output tables are partitioned by hour. The end goal is to run the model for the last 'x' hour partitions (2 currently). I'm using a DAG per partition so that the model can be run more frequently for the most recent partition and less frequently for older ones.

The DAG runs might overlap – hence the table collision issue – but the data written by each will not, since each DAG run is confined to a single partition in SQL. In other words this would be a 'concurrent merge on different partitions'.

This is a very common use case in my world, so I'd be very interested to know whether you can recommend alternative ways of doing this with DBT athena, perhaps using a single DAG?

This feature is still needed regardless, so I will aim to contribute soon.

nicor88 commented 3 months ago

@benbeckingsale Sorry for the late reply, but I was OoO. If you act on different partitions then should be fine, and yes having unique_temp_table_suffix is necessary in case of concurrent operations. What comes to my mind is a simple merge for your use case, as you act in different hour partitions, or eventually an incremental behavior with append mode and a delete to force this pattern: delete+insert on the specific partitions that you are dealing with, something that I do often, but not concurrently.

Out of curiosity why do you want to make such an operation for the last x hours parallel? are you dealing with such big data that parallelism is required? Can't you process the last x hours with one query?

teoria commented 1 day ago

hi @nicor88 i have the same needs i created a log monitoring system, each system save json files in S3 buckets every day a airflow dag runs dbt models that create a incremental iceberg table so if i need a airflow backfill many dbt models are triggered at same time and the current setup creates the temp table with the same name. :(

my workaround:

i add the unique_temporary_table_suffix() into incremental.sql file

  {% if unique_tmp_table_suffix == True and table_type == 'iceberg' %}
    {% set tmp_table_suffix = adapter.generate_unique_temporary_table_suffix() %}
  {% endif %}

and now i can execute many concurrents dagruns

nicor88 commented 10 hours ago

@teoria feel free to submit a PR with your changes, I believe that make sense to have your workaround also for other users.

teoria commented 7 hours ago

PR with the unique tmp table name for iceberg tables https://github.com/dbt-labs/dbt-athena/pull/757