MeltanoLabs / tap-postgres

Singer Tap for PostgreSQL
https://hub.meltano.com/extractors/tap-postgres--meltanolabs/
Other
20 stars 26 forks source link

feat: potentially add xmin replication? #219

Open qbatten opened 1 year ago

qbatten commented 1 year ago

Airbyte has this as of July 10, see here. Haven't spent time scoping this out but doesnt work on current version of this plugin. I suspect this'd be easy to add, I assume the catalog is excluding hidden/system columns and it could just include them(?)

Specifically, I tried to just set xmin as the replication column, and it looks like it errors saying hey that column isnt in the catalog.

2023-08-25T17:06:34.041196Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=INFO message=Stream public-mytable is using incremental replication with replication key xmin cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.041319Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=CRITICAL message='NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
...
2023-08-25T17:06:34.043415Z [info     ]     replication_key_sql_datatype = md_map.get(('properties', replication_key)).get('sql-datatype') cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.043529Z [info     ] AttributeError: 'NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
visch commented 1 year ago

Airbyte has this as of July 10, see here. Haven't spent time scoping this out but doesnt work on current version of this plugin. I suspect this'd be easy to add, I assume the catalog is excluding hidden/system columns and it could just include them(?)

Specifically, I tried to just set xmin as the replication column, and it looks like it errors saying hey that column isnt in the catalog.

2023-08-25T17:06:34.041196Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=INFO message=Stream public-mytable is using incremental replication with replication key xmin cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.041319Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=CRITICAL message='NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
...
2023-08-25T17:06:34.043415Z [info     ]     replication_key_sql_datatype = md_map.get(('properties', replication_key)).get('sql-datatype') cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.043529Z [info     ] AttributeError: 'NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0

Interesting! I had someone else ask about this as well recently, seems like we could definitely do this. Like you said discovery would need to pull these additional columns, we could just add the two explicitly.

@qbatten if you overrode the schema for the table you're trying to pull (annoying but worth a shot) and added xmin I wonder if it'd magically work.

qbatten commented 1 year ago

Hm okay there is a lot of complexity here.

On your suggestion, I added an extra to the extractor for xmin (as in "meltano.yml code block" below). This got us a step further in that it did get meltano to try and use xmin in the select statement. However, postgres complains about it. Specifically it complains that there is no ordering operator for datatype xid. That makes sense bc xmin's data type is xid and ordering on xid's is not straightforward. We could cast it to text and then postgres wont complain but the behavior of order by xmin::text is not gonna be what we want, I believe.

Looks like xmin's updates have fairly complex behavior as well (of course, I guess. link). Other useful links I ran into: some vaguely related discussion, txid docs, oid docs).

All this now has me wondering how airbyte handles this.

meltano.yml code block ```yaml - name: my_pipeline-0 inherit_from: tap-postgres-my_pipeline schema: public-my_table: xmin: type: ["int", "null"] ```
Error thrown in meltano ``` 2023-09-05T21:25:45.464197Z [info ] time=2023-09-05 21:25:43 name=tap_postgres level=INFO message=Stream public-my_table is using incremental replication with replication key xmin cmd_type=elb consumer=False name=my_pipeline-0 pro ducer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464323Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Current Server Encoding: UTF8 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464447Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Current Client Encoding: UTF8 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464565Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=hstore is UNavailable cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464685Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Beginning new incremental replication sync 1692982654464 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464821Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=select statement: SELECT "id" , "my_col" , "user_id" cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.464975Z [info ] FROM "public"."my_table" cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.465106Z [info ] ORDER BY "xmin" ASC with itersize 20000 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.465236Z [info ] time=2023-09-05 21:25:44 name=singer level=INFO message=METRIC: {"type": "counter", "metric": "record_count", "value": 0, "tags": {}} cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string _id=my_pipeline-0 2023-09-05T21:25:45.465367Z [info ] time=2023-09-05 21:25:44 name=tap_postgres level=CRITICAL message=could not identify an ordering operator for type xid cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.465486Z [info ] LINE 3: ORDER BY "xmin" ASC cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.465604Z [info ] ^ cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 ... 2023-09-05T21:25:45.468293Z [info ] return super().execute(query, vars) cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.468423Z [info ] psycopg2.errors.UndefinedFunction: could not identify an ordering operator for type xid cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.468545Z [info ] LINE 3: ORDER BY "xmin" ASC cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.468677Z [info ] ^ cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.468806Z [info ] HINT: Use an explicit ordering operator or modify the query. cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 2023-09-05T21:25:45.468944Z [info ] cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0 ```
qbatten commented 1 year ago

Ah, here's the meat of Airbyte's xmin-handling code

visch commented 1 year ago

Nice work @qbatten , and good find for sure.

So the conversion we could do here https://github.com/MeltanoLabs/tap-postgres/blob/main/tap_postgres/client.py#L231-L232 , just hardcode if the column name is xmin then do the conversion 🤷 and the logic here.

Those writeups you found are good too, and if we really want this feature we should add them in as warnings. It seems like xmin isn't the best incremental key, but I'd guess for some folks it's better than nothing with very large tables