slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
444 stars 34 forks source link

Replicating from PostgreSQL based on a `timestampz` column fails with SQL syntax error #394

Closed mastermel closed 1 month ago

mastermel commented 1 month ago

Issue Description

I am replicating a PostgreSQL materialized view using an update_key set to a column of type timestampz. This worked fine on sling version 1.2.13 and is now failing on version 1.2.20. Below is the effective PostgreSQL schema for the materialized view that is my source stream:

         Materialized view "public.bcdata_vend_items"
       Column       |            Type             | Modifiers
--------------------+-----------------------------+-----------
 id                 | text                        |
 store_key          | text                        |
 updated_at         | timestamp with time zone    |
 item_no            | character varying(100)      |
 description        | character varying(200)      |
 oh_qty             | integer                     |
 oh_cost            | numeric                     |
 oh_retail          | numeric                     |
 season             | character varying(200)      |
 department         | character varying(200)      |
 subdepartment      | character varying(200)      |
 category           | character varying(200)      |
 subcategory        | character varying(200)      |
 size               | character varying(50)       |
 brand              | character varying(200)      |
 grade              | character varying(200)      |
 vend_date          | timestamp with time zone    |
 vend_no            | text                        |
 vend_customer_id   | integer                     |
 vend_employee_code | text                        |
 vend_qty           | bigint                      |
 vend_cost          | numeric                     |
 vend_retail        | numeric                     |
 shop_date          | timestamp without time zone |
 days_on_hand       | integer                     |
 invoice_no         | text                        |
 shop_customer_id   | integer                     |
 shop_employee_id   | integer                     |
 shop_quantity      | bigint                      |
 shop_cost          | numeric                     |
 shop_retail_marked | numeric                     |
 markdown_percent   | numeric                     |
 shop_retail_final  | numeric                     |
 discount_percent   | numeric                     |
Indexes:
    "bcdata_vend_items_id_idx" UNIQUE, btree (id)
    "bcdata_vend_items_id_updated_at_idx" btree (id, updated_at)
    "bcdata_vend_items_updated_at_idx" btree (updated_at)
$ docker run --rm -i slingdata/sling:v1.2.20 --version
Version: 1.2.20

Official docker container running slingdata/sling:v1.2.20 on Ubuntu 22.04

source: BASELINE_PG
target: BCDATA_BIGQUERY

defaults:
  mode: backfill
  source_options:
    range: 2010-01-01,2024-10-04

streams:
  bcdata_vend_items:
    primary_key: [id]
    update_key: updated_at
    object: bcdata_test.vend_items
    target_options:
      table_tmp: bcdata_test.vend_items_tmp_duc-basecampedge-s151
$ docker run --rm -i --net=host -v /basecamp/bcdata/sling:/sling -e SLING_HOME_DIR=/sling slingdata/sling:v1.2.20 run -d -r /sling/replication.yaml --streams "bcdata_vend_items"

2024-10-04 19:40:40 INF Sling Replication [1 streams] | BASELINE_PG -> BCDATA_BIGQUERY

2024-10-04 19:40:40 INF [1 / 1] running stream bcdata_vend_items
2024-10-04 19:40:40 DBG Sling version: 1.2.20 (linux amd64)
2024-10-04 19:40:40 DBG type is db-db
2024-10-04 19:40:40 DBG using: {"columns":null,"mode":"backfill","transforms":null}
2024-10-04 19:40:40 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":9,"range":"2010-01-01,2024-10-04"}
2024-10-04 19:40:40 DBG using target options: {"datetime_format":"2006-01-02 15:04:05.000000-07","file_max_rows":0,"max_decimals":9,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source","table_tmp":"bcdata_test.vend_items_tmp_duc-basecampedge-s151"}
2024-10-04 19:40:40 DBG opened "postgres" connection (conn-postgres-0Uc)
2024-10-04 19:40:40 DBG opened "bigquery" connection (conn-bigquery-3ZZ)
2024-10-04 19:40:40 INF connecting to source database (postgres)
2024-10-04 19:40:40 INF connecting to target database (bigquery)
2024-10-04 19:40:42 INF reading from source database
2024-10-04 19:40:42 DBG select * from "public"."bcdata_vend_items" where "updated_at" >=  and "updated_at" <=  order by "updated_at" asc
2024-10-04 19:40:42 INF execution failed

--- database.go:2341 func1 ---
--- database_postgres.go:108 BulkExportStream ---
~ Error running query
--- database.go:781 StreamRows ---
~ SQL Error for:
select * from "public"."bcdata_vend_items" where "updated_at" >=  and "updated_at" <=  order by "updated_at" asc
--- database.go:827 StreamRowsContext ---
pq: syntax error at or near "and"

context canceled

2024-10-04 19:40:42 INF Sling Replication Completed in 2s | BASELINE_PG -> BCDATA_BIGQUERY | 0 Successes | 1 Failures

fatal:
--- proc.go:271 main ---
--- sling_cli.go:458 main ---
--- sling_cli.go:494 cliInit ---
--- cli.go:286 CliProcess ---
~ failure running replication (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:202 processRun ---

--------------------------- bcdata_vend_items ---------------------------
--- task_run.go:99 func1 ---
~ Could not ReadFromDB
--- task_run.go:525 runDbToDb ---
~ Could not BulkExportFlow
--- task_run_read.go:175 ReadFromDB ---
--- database.go:2360 BulkExportFlow ---
~ dataflow error while waiting for ready state
--- dataflow.go:635 WaitReady ---

--- database.go:2341 func1 ---
--- database_postgres.go:108 BulkExportStream ---
~ Error running query
--- database.go:781 StreamRows ---
~ SQL Error for:
select * from "public"."bcdata_vend_items" where "updated_at" >=  and "updated_at" <=  order by "updated_at" asc
--- database.go:827 StreamRowsContext ---
pq: syntax error at or near "and"
mastermel commented 1 month ago

I've not dug into this too deeply yet, but could it be due to the postgres template not having values for either variable.timestampz_layout_str or variable.timestamp_layout_str?

I'm guessing the issue arises within ReadFromDB() on line 113 of core/sling/task_run_read.go. If srcConn.GetTemplateValue("variable.timestampz_layout_str") returns an empty string, wouldn't g.R(timestampTemplate, "value", startValue) then also result in startValue and endValue being empty?

flarco commented 1 month ago

Thanks, this is being unit tested here, but not with timestampz 🫤 . I've pushed the change you suggested, which I think is correct. Can you test with the dev build?

mastermel commented 1 month ago

Yep, that seems to have fixed it and it's working the way I would expect on the current dev build 👍