slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
398 stars 27 forks source link

Error using file:// as src-stream #223

Closed comino closed 6 months ago

comino commented 6 months ago

There seems to be an issue trying to sling data from a csv file when using the src-stream option. Depending on the data I get different error messages, which makes it a little tricky to pin down the issue.

CSV sample: events.csv

Obj;PropId;Value;TimeStamp;TimeStampISO
BB01;85;45,3828582763672;133245162327228051;2023-03-28T22:30:32Z
BB01;85;40,3816032409668;133245181140278467;2023-03-28T23:01:54Z
BB01;85;45,3858795166016;133245207233952957;2023-03-28T23:45:23Z
BB01;85;50,388298034668;133245209487304477;2023-03-28T23:49:08Z
BB01;85;45,3873443603516;133245215378614197;2023-03-28T23:58:57Z
BB01;85;40,3829345703125;133245217529463186;2023-03-29T00:02:32Z
BB01;85;35,3816719055176;133245220376169720;2023-03-29T00:07:17Z
BB01;85;40,3844985961914;133245230678878369;2023-03-29T00:24:27Z
BB01;85;45,3865814208984;133245234406821951;2023-03-29T00:30:40Z

This works just fine:

cat ../csv/events.csv | sling run --tgt-conn MYSQL ---tgt-object 'public.events'

This one, doing the same thing throws an error:

sling run --tgt-conn MYSQL ---tgt-object 'public.audit' --src-stream file://../csv/events.csv
~ datastream error

~ Error reading file
record on line 2: wrong number of fields

context canceled

The same error appears when using a task or replication

source:
  stream: file://../csv/events.csv

target:
  conn: MYSQL
  object: public.audit

mode: full-refresh

Using SQLITE throws the same error on the sample file.

sling run --tgt-conn SQLITE ---tgt-object 'audit' --src-stream file://../csv/ex.csv
record on line 2: wrong number of fields

context canceled

I tested it with 1.1.11, 1.1.12 and 1.1.13. Tested on ubuntu 22

comino commented 6 months ago

(venv) user@banana:~/sling$ sling run -d --tgt-conn SQLITE ---tgt-object 'audit' --src-stream file://../csv/ex.csv
2024-03-12 23:10:18 DBG Sling version: 1.1.13 (linux amd64)
2024-03-12 23:10:18 DBG type is file-db
2024-03-12 23:10:18 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"compression":"AUTO","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"delimiter":",","max_decimals":-1}
2024-03-12 23:10:18 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-03-12 23:10:18 INF connecting to target database (sqlite)
2024-03-12 23:10:18 INF reading from source file system (file)
2024-03-12 23:10:18 DBG reading datastream from ../csv/ex.csv [format=csv]
2024-03-12 23:10:18 INF execution failed
fatal:
~ execution failed
--- task_run.go:96 func1 ---
~ could not read from file
--- task_run.go:358 runFileToDB ---
~ Could not FileSysReadDataflow for file
--- task_run_read.go:255 ReadFromFile ---
~ error getting dataflow
--- fs.go:554 ReadDataflow ---
--- fs.go:1006 GetDataflow ---
~ dataflow error while waiting for ready state
--- dataflow.go:595 WaitReady ---

~ datastream error
--- dataflow.go:527 PushStreamChan ---

--- fs_local.go:143 func1 ---
--- datastream.go:831 ConsumeCsvReader ---
--- datastream.go:447 Start ---
--- datastream.go:1829 next ---
~ Error reading file
--- datastream.go:802 func1 ---
record on line 2: wrong number of fields

context canceled

~ Error consuming reader for ../csv/ex.csv
--- fs_local.go:143 func1 ---
~ could start datastream
--- datastream.go:831 ConsumeCsvReader ---
~ error in getting rows
--- datastream.go:494 Start ---

--- fs_local.go:143 func1 ---
--- datastream.go:831 ConsumeCsvReader ---
--- datastream.go:447 Start ---
--- datastream.go:1829 next ---
~ Error reading file
--- datastream.go:802 func1 ---
record on line 2: wrong number of fields

context canceled

context canceled```
flarco commented 6 months ago

thanks for the logs. I think I see the issue from this line:

2024-03-12 23:10:18 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"compression":"AUTO","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"delimiter":",","max_decimals":-1}

See "delimiter":","?

Can you do:

sling run -d --tgt-conn SQLITE ---tgt-object 'audit' --src-stream file://../csv/ex.csv --src-options '{ delimiter: ";" }'

Sling attempts to auto-detect the delimiter but isn't guaranteed.

comino commented 6 months ago

Ouch,

I suspected that too and tried to set a delimiter manually without success before (probably with a typo) because your sample command runs successfully!

It is strange that for some CSV files, it consistently works with SQLite but not with MYSQL even without the delimiter and with stdin it never fails. In this example, the delimiter is also recognized as "," but it continues to run successfully, so I didn't suspect this to be the root cause.

user@banana:~/sling$ sling run -d --tgt-conn SQLITE ---tgt-object 'audit' --src-stream file://../csv/audit.csv
2024-03-13 08:55:36 DBG Sling version: 1.1.13 (linux amd64)
2024-03-13 08:55:36 DBG type is file-db
2024-03-13 08:55:36 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"compression":"AUTO","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"delimiter":",","max_decimals":-1}
2024-03-13 08:55:36 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-03-13 08:55:36 INF connecting to target database (sqlite)
2024-03-13 08:55:36 INF reading from source file system (file)
2024-03-13 08:55:36 DBG reading datastream from ../csv/audit.csv [format=csv]
2024-03-13 08:55:36 INF writing to target database [mode: full-refresh]
2024-03-13 08:55:36 DBG drop table if exists "main"."audit_tmp"
2024-03-13 08:55:36 DBG table "main"."audit_tmp" dropped
2024-03-13 08:55:36 DBG create table if not exists "main"."audit_tmp" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source" text,
"_sling_loaded_at" integer)
2024-03-13 08:55:36 INF streaming data
2024-03-13 08:55:36 DBG connection was closed, reconnecting
2024-03-13 08:55:36 DBG select count(*) cnt from "main"."audit_tmp"
2024-03-13 08:55:36 DBG comparing checksums []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"} vs []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"}: []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"}
2024-03-13 08:55:36 DBG select sum(length("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source")) as "timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", sum(abs("_sling_loaded_at")) as "_sling_loaded_at" from "main"."audit_tmp"
2024-03-13 08:55:36 DBG
--- task_run.go:96 func1 ---
--- task_run.go:381 runFileToDB ---
--- task_run_write.go:328 WriteToDb ---
--- database.go:3067 CompareChecksums ---
checksum failure for timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source (sling-side vs db-side): 247966 != 246921 -- (sum(length("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source")))
[]interface {}{"246921", "2883593679696"}
2024-03-13 08:55:36 DBG drop table if exists "main"."audit"
2024-03-13 08:55:36 DBG table "main"."audit" dropped
2024-03-13 08:55:36 DBG create table if not exists "main"."audit" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source" text,
"_sling_loaded_at" bigint)
2024-03-13 08:55:36 INF created table "main"."audit"
2024-03-13 08:55:36 DBG inserting _sling_loaded_at [INTEGER] into _sling_loaded_at [bigint]
2024-03-13 08:55:36 DBG insert into "main"."audit" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at") select "timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at" from "main"."audit_tmp"
2024-03-13 08:55:36 DBG inserted rows into main."audit" from temp table "main"."audit_tmp"
2024-03-13 08:55:36 INF inserted 1686 rows into main."audit" in 0 secs [6,684 r/s]
2024-03-13 08:55:37 DBG connection was closed, reconnecting
2024-03-13 08:55:37 DBG drop table if exists "main"."audit_tmp"
2024-03-13 08:55:37 DBG table "main"."audit_tmp" dropped
2024-03-13 08:55:37 INF execution succeeded

It seems like delimiter auto-detection only works using stdio, which is a little unexpected for the user. I'm not sure if thats a bug that's fixable since I dont know too much about the internals. I will close this one and suggest reopening an issue for the auto-detection in case you think it's a bug that can be fixed.

For now, I will simply set the delimiter explicitly for now. Thank you very much !!

flarco commented 6 months ago

Yes you see the one column output: timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source

It crumbled all the columns together when it's the wrong delimiter.

I will revisit the delimiter auto detection logic, thank you for raising this.

On Wed, Mar 13, 2024, 5:25 AM Sven Eliasson @.***> wrote:

Ouch,

I suspected that too and tried to set a delimiter manually without success before (probably with a typo) because your sample command runs successfully!

It is strange that for some CSV files, it consistently works with SQLite but not with MYSQL even without the delimiter and with stdin it never fails. In this example, the delimiter is also recognized as "," but it continues to run successfully, so I didn't suspect this to be the root cause.

@.**:~/sling$ sling run -d --tgt-conn SQLITE ---tgt-object 'audit' --src-stream file://../csv/audit.csv 2024-03-13 08:55:36 DBG Sling version: 1.1.13 (linux amd64) 2024-03-13 08:55:36 DBG type is file-db 2024-03-13 08:55:36 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"compression":"AUTO","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"delimiter":",","max_decimals":-1} 2024-03-13 08:55:36 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"} 2024-03-13 08:55:36 INF connecting to target database (sqlite) 2024-03-13 08:55:36 INF reading from source file system (file) 2024-03-13 08:55:36 DBG reading datastream from ../csv/audit.csv [format=csv] 2024-03-13 08:55:36 INF writing to target database [mode: full-refresh] 2024-03-13 08:55:36 DBG drop table if exists "main"."audit_tmp" 2024-03-13 08:55:36 DBG table "main"."audit_tmp" dropped 2024-03-13 08:55:36 DBG create table if not exists "main"."audit_tmp" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source" text, "_sling_loaded_at" integer) 2024-03-13 08:55:36 INF streaming data 2024-03-13 08:55:36 DBG connection was closed, reconnecting 2024-03-13 08:55:36 DBG select count() cnt from "main"."audit_tmp" 2024-03-13 08:55:36 DBG comparing checksums []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"} vs []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"}: []string{"timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at"} 2024-03-13 08:55:36 DBG select sum(length("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source")) as "timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", sum(abs("_sling_loaded_at")) as "_sling_loaded_at" from "main"."audit_tmp" 2024-03-13 08:55:36 DBG --- task_run.go:96 func1 --- --- task_run.go:381 runFileToDB --- --- task_run_write.go:328 WriteToDb --- --- database.go:3067 CompareChecksums --- checksum failure for timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source (sling-side vs db-side): 247966 != 246921 -- (sum(length("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source"))) []interface {}{"246921", "2883593679696"} 2024-03-13 08:55:36 DBG drop table if exists "main"."audit" 2024-03-13 08:55:36 DBG table "main"."audit" dropped 2024-03-13 08:55:36 DBG create table if not exists "main"."audit" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source" text, "_sling_loaded_at" bigint) 2024-03-13 08:55:36 INF created table "main"."audit" 2024-03-13 08:55:36 DBG inserting _sling_loaded_at [INTEGER] into _sling_loaded_at [bigint] 2024-03-13 08:55:36 DBG insert into "main"."audit" ("timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at") select "timestamp_timestampiso_objtag_action_text_prvvalue_curvalue_personid_personnameshort_personfunction_computername_source", "_sling_loaded_at" from "main"."audit_tmp" 2024-03-13 08:55:36 DBG inserted rows into main."audit" from temp table "main"."audit_tmp" 2024-03-13 08:55:36 INF inserted 1686 rows into main."audit" in 0 secs [6,684 r/s] 2024-03-13 08:55:37 DBG connection was closed, reconnecting 2024-03-13 08:55:37 DBG drop table if exists "main"."audit_tmp" 2024-03-13 08:55:37 DBG table "main"."audit_tmp" dropped 2024-03-13 08:55:37 INF execution succeeded

It seems like delimiter auto-detection only works using stdio, which is a little unexpected for the user.

For now, I will simply set the delimiter explicitly for now. Thank you very much!

— Reply to this email directly, view it on GitHub https://github.com/slingdata-io/sling-cli/issues/223#issuecomment-1993801924, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB2QZYU7FUHJTWOITQXCAXDYYAEOBAVCNFSM6AAAAABETBMS2WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSOJTHAYDCOJSGQ . You are receiving this because you commented.Message ID: @.***>

flarco commented 6 months ago

Was able to reproduce your errors. Fixed: https://github.com/slingdata-io/sling-cli/pull/208/commits/2a79e931d1f5c7799fbfd94c48c60a0e4b172cac Also, thanks for the donation!