Closed kevinob11 closed 3 years ago
Thanks @kevinob11 for reporting! I assume you created a view manually on top of an airbyte managed table?
@ChristopheDuong Is it possible that there is a cascading delete when we delete tables? If so, is it possible that it is only something that affect older versions?
Yep, correct, my view points at that table.
On Wed, Apr 7, 2021, 6:42 PM Michel Tricot @.***> wrote:
@kevinob11 https://github.com/kevinob11 I assumed you created a view manually on top of an airbyte managed table?
@ChristopheDuong https://github.com/ChristopheDuong Is it possible that there is a cascading delete when we delete tables?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/airbytehq/airbyte/issues/2800#issuecomment-815384647, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA7UVWLEAC4VO5IE7YSPQWLTHUCZBANCNFSM42RX7EAQ .
Yes, it is due to cascade delete on normalized tables but it's coming from DBT, not from Airbyte's side as described in this other issue: https://github.com/airbytehq/airbyte/issues/1732#issuecomment-770963783 where we tried to find a way around but failed to do so...
So recommendations from DBT are:
Instead, I recommend ensuring that all models that are referenced in your BI tool are materialized as tables – even though tables select from other models, they don’t have the same binding behavior, so won’t get dropped by the cascade qualifier.
Or if you base your views on top of the Airbyte raw tables, then it won't be dropped in cascade...
Interesting, makes sense I guess. I'll look into materializing or basing things on the raw tables. Thanks much!
@michel-tricot @ChristopheDuong
Pinging here for a bit of help / guidance if possible, as this has become quite the headache for me. I pull in a bunch of raw data from other systems using Airbyte, then connect them with other data to create simpler views that my reporting and mapping tools consume. As stated earlier in this issue that is a problem because those views get deleted in a cascade when the Airbyte tables are re-made. You mentioned two possible solutions: Materialize to real tables or base your views on the raw Airbyte tables.
For materializing the Airbyte data to real tables, I have to schedule that materialization and it creates an even longer window where the data isn't fresh depending upon how synced the schedule is with the Airbyte syncs. Is there a nice way for me to trigger the materialization at the end of an Airbyte sync? Some of my data is very timely (electric meter power status) and adding an additional 5 minutes affects my users.
For basing it on the raw tables, it appears those use a single JSON column for the data, which makes this pretty slow / difficult, am I missing something here?
Thanks, I know this is a "solved" issue, but these are both ending up being harder than I was hoping to implement effectively.
Hey @kevinob11, @ChristopheDuong will follow up.
What is your timezone btw? (just in case it ends up easier to resolve with a zoom call)
Thanks so much, my timezone is US/Pacific
Is there a nice way for me to trigger the materialization at the end of an Airbyte sync? Some of my data is very timely (electric meter power status) and adding an additional 5 minutes affects my users.
It's coming up soon with this: https://github.com/airbytehq/airbyte/issues/2959, I hope that it would solve this kind of post-sync transformations chained after the replication from Airbyte completes
If you had an Airflow instance, you could link them up together too: https://docs.airbyte.io/tutorials/using-the-airflow-airbyte-operator
Otherwise, looking at the next topics to handle on normalization in the medium/long term, maybe if we have a higher demand from users for "Incremental DBT / Incremental batch normalization", it would avoid more often to delete in cascade on postgres but I'm not too sure about it... https://github.com/airbytehq/airbyte/issues/2566
What do you think?
Another idea is also to recreate the view automatically when a user is querying the table but it would depend on your use case and tooling that you have... If you need, we can indeed jump in a call to discuss this more in a synchronous manner
Airflow might work here, I wonder if you'd see the same issue as above. I suppose both of these would be mitigated if you are just using them to update a "materialized" table instead of a view so you are just addressing timing rather than hoping to make real views work.
Unfortunately I can't do it on the query side of things, as the apps I'm running from (Geoserver, Tableau, etc.) are read-only.
Let's hop on a quick call if you don't mind, I'd be happy to talk through it. Would tomorrow 4/23 sometime work? What timezone are you in? I'm happy to be flexible as needed as long as it doesn't hit my meetings (I can't do 1PM tomorrow).
Let's hop on a quick call if you don't mind, I'd be happy to talk through it. Would tomorrow 4/23 sometime work? What timezone are you in? I'm happy to be flexible as needed as long as it doesn't hit my meetings (I can't do 1PM tomorrow).
Yes, I sent you an invite for tomorrow at 10am PST time (I am based in Europe so it's 7pm for me)
You can also join me on slack if needed
Perfect, I'll chat with you then!
Hey @ChristopheDuong what was the outcome of the call? (for future reference)
@michel-tricot I can chime in here a bit. No easy answers as of today, materialize to tables from somewhere else or just put your full query wherever you use the data (Tableau, Geoserver, etc.). Long-term custom DBT steps after data sync would make for a better place to safely materialize costly aggregation, these are on the roadmap.
Thank you @kevinob11 !
Any updates @ChristopheDuong? Still running into this issue two years later. Would it be possible to simply alter the Full Refresh Append approach to issue a "Truncate Table" command prior to performing the sync? It seems to me the outcome would essentially be the same at that point, but I believe "Append" syncs survive the DBT drops.
In case anyone else ran into this. Maybe this may spur some ideas on how I eventually handled it for a Postgres destination. I now refrain from using any Full Refresh Overwrites and instead use Full Refresh Appends. Also this will only work with data that has clear primary key/composite keys that can uniquely identify rows. It will ignore changes to any rows prefaced with an underscore (i.e. all columns artificially created by airbyte)
create table airbyte_cleanup_settings (table_id serial primary key, table_schema text, table_name text, dedupe_keys text[]);
This table has to be setup to track airbyte tables that are able to be pruned. the dedupe keys can be thought of as essentially as the primary key field within the Airbyte UI when setting up streams in a connection. It can be a single column, or multiple. Hence the array type of the field.
The following dynamic function takes the information from the above table and essentially converts a full refresh table into an SCD table using Airbyte's emitted at as the timestamp. It computes a hash of columns within the table and will remove all duplicate, consecutive instances of that hash after the initial. However, if a hash changes, then reverts back, it will keep both instances of that hash since they were not consecutive. Note it will also remove the information from the "raw" table as well so that it does not return on the next airbyte sync.
This query should also be safe to use on SCD tables in the event data was accidentally reloaded/duplicated for some reason and will automatically adapt. That said, I would encourage a data backup before trialing it out on your data. I have tried to build some safeguards into the query itself as well.
No guarantee on performance for large tables and I would encourage regular vacuuming of tables after executing this query.
CREATE OR REPLACE FUNCTION admn.airbyte_auto_prune_tables()
RETURNS void AS $$
DECLARE
name_schema text;
name_table text;
partition_keys text;
validation_query text;
partition_cnt bigint;
total_cnt bigint;
dynamic_query text;
rows_deleted int;
BEGIN
FOR name_schema, name_table, partition_keys IN
select table_schema, table_name,
string_agg('"'||column_name||'"',', ') filter (where column_name not like '_airbyte%_hashid') as partition_keys
from (
SELECT t.table_schema, t.table_name, c.column_name,
count(c.column_name)over (partition by acs.table_schema, acs.table_name) as col_cnt,
array_length(dedupe_keys,1) AS num_keys
FROM admn.airbyte_cleanup_settings acs
INNER JOIN information_schema.tables t
ON lower(acs.table_schema) = lower(t.table_schema)
AND lower(acs.table_name) like lower(t.table_name)
AND t.table_type = 'BASE TABLE'
INNER JOIN information_schema.columns c
ON (
lower(c.column_name) ilike ANY(acs.dedupe_keys)
or lower(c.column_name) like '_airbyte%_hashid'
)
AND lower(c.table_schema) = lower(acs.table_schema)
AND lower(c.table_name) like lower(acs.table_name)
) f
--ensure all at least one dedupe_key column is present and that ALL columns exist within the real table,
-- add 1 for the hashid column we will be adding in
where col_cnt = num_keys + 1
and num_keys > 0
group by table_schema, table_name
loop
if name_table not like '%_scd' then
--verify that partitions are accurate by ensuring they uniquely define ALL existing rows
execute format(
'SELECT count(concat_ws(''-'', %3$s)), count(*)
FROM %1$s.%2$s;
', name_schema, name_table, partition_keys
) into partition_cnt, total_cnt;
IF partition_cnt <> total_cnt THEN
RAISE EXCEPTION 'Partition Keys count (%s) does not equal total count (%s). Ensure rows wont be incorrectly deleted!!', partition_cnt, total_cnt;
END IF;
--SCD tables need to utilize the airbyte unique id to maintain SCD accuracy, no need to confirm accuracy
else
partition_keys := '"_airbyte_unique_key"';
end if;
rows_deleted := 0;
dynamic_query :=
format(
'
drop table if exists hashes_to_delete;
create temp table hashes_to_delete as
with hash_computer as (
SELECT "_airbyte_ab_id", "_airbyte_emitted_at", %3$s, (
SELECT md5(array_agg(value)::text)
FROM (
SELECT key, value
FROM jsonb_each_text(row_to_json(subquery)::jsonb)
where left(key,1) <> ''_''
ORDER BY key
) AS ordered_keys
) as _row_value_hash
FROM (
SELECT *
FROM %1$s.%2$s
) AS subquery
)
SELECT "_airbyte_ab_id"
FROM (
SELECT _row_value_hash, %3$s, "_airbyte_ab_id",
lead(_row_value_hash) OVER (PARTITION BY %3$s ORDER BY "_airbyte_emitted_at" DESC) AS prior_hash
FROM hash_computer
) f
WHERE prior_hash = _row_value_hash
--indicates first/only row for the specified partition keys
and prior_hash is not null;
DELETE FROM %1$s.%2$s WHERE _airbyte_ab_id IN (SELECT _airbyte_ab_id FROM hashes_to_delete);
DELETE FROM %1$s._airbyte_raw_%2$s WHERE _airbyte_ab_id IN (SELECT _airbyte_ab_id FROM hashes_to_delete);
drop table if exists hashes_to_delete;
', name_schema, name_table, partition_keys
);
--uncomment for debugging
--RAISE NOTICE 'Dynamic Query: %', dynamic_query; -- This will print the dynamic query
EXECUTE dynamic_query;
GET DIAGNOSTICS rows_deleted = ROW_COUNT;
rows_deleted := (select rows_deleted/2.0);
raise notice '%.% was pruned with % rows deleted.', name_schema, name_table, rows_deleted;
END LOOP;
END;
$$ LANGUAGE plpgsql;
Expected Behavior
I expect to create a view and have it not disappear
Current Behavior
I created a view directly using a sql command, waited for the next Airbyte sync to run (or ran it manually) and my view disappears.
Logs
I can provide logs of a sync example that created the issue if needed
Steps to Reproduce
Severity of the bug for you
Critical
Airbyte Version
0.16.0-alpha
Connector Version (if applicable)
0.1.13