slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
446 stars 34 forks source link

Feature/allow direct insert #404

Closed yokofly closed 1 month ago

yokofly commented 1 month ago

try fix #35 usage: export SLING_ALLOW_DIRECT_INSERT=true current if we stop the insert to the target table, the rollback will do nothing, because the target table is directly inserted.

✗ bash scripts/build.sh &&   ./sling run --src-conn clickhouse  --src-stream 'v' --tgt-conn Clickhouse  --tgt-object "now_v" --mode full-refresh -d
2024-10-16 03:41:26 DBG Sling version: dev (linux amd64)
2024-10-16 03:41:26 DBG type is db-db
2024-10-16 03:41:26 DBG using: {"columns":null,"mode":"full-refresh","transforms":null}
2024-10-16 03:41:26 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":11}
2024-10-16 03:41:26 DBG using target options: {"batch_limit":100000,"datetime_format":"auto","file_max_rows":0,"max_decimals":11,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-10-16 03:41:26 DBG opened "clickhouse" connection (conn-clickhouse-rXZ)
2024-10-16 03:41:26 DBG opened "clickhouse" connection (conn-clickhouse-hdQ)
2024-10-16 03:41:26 INF connecting to source database (clickhouse)
2024-10-16 03:41:26 INF connecting to target database (clickhouse)
2024-10-16 03:41:26 INF reading from source database
2024-10-16 03:41:26 DBG select * from `default`.`v`
2024-10-16 03:41:26 INF writing to target database [mode: full-refresh]
2024-10-16 03:41:26 INF streaming data directly into final table
2024-10-16 03:41:26 DBG use `default`
4s 1,321,044 369567 r/s 12 MB | 22% MEM | 12% CPU ^C
interrupting...
2024-10-16 03:41:31 DBG closed "clickhouse" connection (conn-clickhouse-rXZ)
2024-10-16 03:41:31 DBG closed "clickhouse" connection (conn-clickhouse-hdQ)
4s 1,639,813 544953 r/s 15 MB | 22% MEM | 11% CPU 
2024-10-16 03:41:31 INF execution failed
fatal:
--- proc.go:271 main ---
--- sling_cli.go:458 main ---
--- sling_cli.go:494 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:225 processRun ---
~ failure running task (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:396 runTask ---
--- task_run.go:155 Execute ---

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:146 WriteToDb ---
--- task_run_write.go:413 writeDirectly ---
--- database.go:2313 BulkImportFlow ---
~ could not bulk import
--- database.go:2300 func1 ---
~ could not copy data
--- database_clickhouse.go:261 BulkImportStream ---
~ could not commit transaction
--- database_clickhouse.go:257 func2 ---

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:146 WriteToDb ---
~ could not insert into `default`.`now_v`
--- task_run_write.go:416 writeDirectly ---

--- datastream.go:944 func9 ---
--- datastream.go:854 2 ---

(base) ➜  sling-cli git:(feature/allow-direct-insert) ✗ ./sling conns exec clickhouse  "select count() from now_v"
+---------+
| COUNT() |
+---------+
| 1800020 |
+---------+
3:42AM INF successful! duration: 0 seconds (1 affected records)
yokofly commented 1 month ago

@flarco plz care about the full workflow, even though the code is pretty simple. but I have not tested the presql and postsql.. my locally simple tests looks good/expected ^

yokofly commented 1 month ago

a tricky issue: I was trying to load a CSV file into the database using incremental mode to update specific keys.

previous, sling default infer some of column type as int64, actually the type is int32, and upsert can handle this well. with this pr: the dataflow type is int64, then cannot convert to int32, this adds some complexity.

I have no better solution.

yokofly commented 1 month ago

I have a basic solution: add a special check: if the target table existed, we use the target table type for loading dataflow. instead, infer.

so the type comes from: first check Config, if we specified use config type second check that the Target table if exists, we use the target table type finally, let's make infer.

yokofly commented 1 month ago

another issue: incremental mode from csv to database, it will try to compare the count. but actually, the update key comparison finds no need to insert(or insert 0 ROWS), but finally, it will get an inconsistent count. we need a way to notify this inconsistent is because update_key comparison result?

flarco commented 1 month ago

thanks @yokofly will spend time on this tomorrow

yokofly commented 1 month ago

@flarco to sum up: here are 2 issues I found in my current code:

  1. the column change, previous we have upsert to handle some column type incorrectly but still can be inserted from temp table to target, but now we do not have such a guard. so we need adjust column type directly from target if there is?(ideally we shall tell the user to create target table first by themselves?)
  2. the stream written count and the target table count. we shall find a more better way to comparison OR lower the err level to just logging instead of stopping the transfer. because in some cases we wrote ZERO count and the target table count is unrelated to this new insert.
yokofly commented 1 month ago

hi @flarco do u have any idea about the above 2 issues? Or It would be much appreciated if you can work on this, I am not familiar with df type change/sync status.

flarco commented 1 month ago

@yokofly Yes, I will. I think this will need more time to get right, the logic standing today has been developed over many iterations, and can get tricky. Thanks for all this, I'll take this on from here to get this merged into the next version. The plan is to release 1.2.21 tomorrow, since tests are passing (without this PR merged). Don't want to delay the release further.

flarco commented 1 month ago

@yokofly Can you edit the PR to merge into branch v1.2.22? It auto-closed this one and doesn't let me change it.

yokofly commented 1 month ago

ok