slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
399 stars 27 forks source link

_SLING_LOADED_AT is type NUMBER(38,0) on Snowflake instead of TIMESTAMP #313

Closed johnoscott closed 3 months ago

johnoscott commented 3 months ago

Issue Description

The docs describe the _SLING_LOADED_AT column as a timestamp, but it is created in Snowflake as a NUMBER(38,0) type.

Is it possible to specify the type so i can avoid having to cast it :

SELECT TO_TIMESTAMP_NTZ(_SLING_LOADED_AT) AS datetime_column
FROM ACCOUNTS;

I have tried specifying a column type but it doesnt work :

source: local://
target: MY_SNOWFLAKE

defaults:
  mode: full-refresh
  object: src_client.{stream_file_name}
  source_options:
    format: csv
    columns:
      id: string
      modified: datetime # snowflake type: TIMESTAMP_NTZ
      _sling_loaded_at: datetime # doesnt work !!! will always be number

...

Version: 1.2.11

mac

see above

❯ sling run --debug  --replication ./sling-upload-day-01.yaml
2024-06-05 22:35:43 INF Sling Replication [1 streams] | local:// -> MY_SNOWFLAKE

2024-06-05 22:35:43 INF [1 / 1] running stream file://./data/client/day_01/accounts.csv
2024-06-05 22:35:43 DBG Sling version: 1.2.11 (darwin arm64)
2024-06-05 22:35:43 DBG type is file-db
2024-06-05 22:35:43 DBG using source options: {"trim_space":false,"empty_as_null":true,"header":true,"fields_per_rec":-1,"compression":"auto","null_if":"NULL","datetime_format":"AUTO","skip_blank_lines":false,"max_decimals":-1,"columns":{}}
2024-06-05 22:35:43 DBG using target options: {"datetime_format":"auto","file_max_rows":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-06-05 22:35:43 INF connecting to target database (snowflake)
2024-06-05 22:35:43 DBG opened "snowflake" connection (conn-snowflake-CEI)
2024-06-05 22:35:46 INF reading from source file system (file)
2024-06-05 22:35:46 DBG reading datastream from file://./data/client/day_01/accounts.csv [format=csv]
2024-06-05 22:35:46 DBG merging csv readers of 1 files [concurrency=3] from file://./data/client/day_01/accounts.csv
2024-06-05 22:35:46 DBG processing reader from file://./data/client/day_01/accounts.csv
2024-06-05 22:35:46 DBG delimiter auto-detected: ","
2024-06-05 22:35:46 INF writing to target database [mode: full-refresh]
2024-06-05 22:35:46 DBG drop table if exists "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:35:46 DBG table "SRC_CLIENT"."ACCOUNTS_TMP" dropped
2024-06-05 22:35:47 DBG create table "SRC_CLIENT"."ACCOUNTS_TMP" ("ACCOUNT_ID" bigint,
"MODIFIED" date,
"ACCOUNT_NAME" text,
"ADDRESS" text,
"_SLING_LOADED_AT" integer,
"_SLING_STREAM_URL" varchar)
2024-06-05 22:35:48 INF streaming data
2024-06-05 22:35:49 DBG USE SCHEMA SRC_CLIENT
2024-06-05 22:35:49 DBG writing to file:///var/folders/s9/g_1xyqx54bz4x_hv7w9rn9l00000gn/T/snowflake/put/2024-06-05T223549.352/part.01 [fileRowLimit=0 fileBytesLimit=300000000 compression=zstd concurrency=7 useBufferedStream=false fileFormat=csv]
2024-06-05 22:35:49 DBG REMOVE @SRC_CLIENT.sling_staging/"SRC_CLIENT"."ACCOUNTS_TMP"/2024-06-05T223549.353
2024-06-05 22:35:50 DBG PUT 'file:///var/folders/s9/g_1xyqx54bz4x_hv7w9rn9l00000gn/T/snowflake/put/2024-06-05T223549.352/part.01.0001.csv.zst' @SRC_CLIENT.sling_staging/"SRC_CLIENT"."ACCOUNTS_TMP"/2024-06-05T223549.353 PARALLEL=1 AUTO_COMPRESS=FALSE
2024-06-05 22:35:52 DBG COPY INTO "SRC_CLIENT"."ACCOUNTS_TMP" ("ACCOUNT_ID", "MODIFIED", "ACCOUNT_NAME", "ADDRESS", "_SLING_LOADED_AT", "_SLING_STREAM_URL")
FROM ( 
  SELECT T.$1, T.$2, T.$3, T.$4, T.$5, T.$6
  FROM @SRC_CLIENT.sling_staging/"SRC_CLIENT"."ACCOUNTS_TMP"/2024-06-05T223549.353 as T
)
FILE_FORMAT = (
  TYPE = CSV
  RECORD_DELIMITER = '\n'
  ESCAPE_UNENCLOSED_FIELD = NONE
  FIELD_OPTIONALLY_ENCLOSED_BY='0x22'
  EMPTY_FIELD_AS_NULL=FALSE
  NULL_IF = '\\N'
  SKIP_HEADER=1
  REPLACE_INVALID_CHARACTERS=TRUE
)
ON_ERROR = ABORT_STATEMENT
2024-06-05 22:35:53 DBG 
+--------------------------------------------------------------------------------------+--------+-------------+-------------+
| FILE                                                                                 | STATUS | ROWS_LOADED | ERRORS_SEEN |
+--------------------------------------------------------------------------------------+--------+-------------+-------------+
| sling_staging/"SRC_CLIENT"."ACCOUNTS_TMP"/2024-06-05T223549.353/part.01.0001.csv.zst | LOADED |           2 |           0 |
+--------------------------------------------------------------------------------------+--------+-------------+-------------+
2024-06-05 22:35:53 DBG REMOVE @SRC_CLIENT.sling_staging/"SRC_CLIENT"."ACCOUNTS_TMP"/2024-06-05T223549.353
2024-06-05 22:35:54 DBG select count(*) cnt from "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:35:55 DBG comparing checksums ["ACCOUNT_ID [bigint | BIGINT]","MODIFIED [date | DATE]","ACCOUNT_NAME [text | TEXT]","ADDRESS [text | TEXT]","_SLING_LOADED_AT [bigint | BIGINT]","_SLING_STREAM_URL [text | TEXT]"]
2024-06-05 22:35:55 DBG select sum(abs("ACCOUNT_ID")) as "ACCOUNT_ID", sum(DATE_PART('epoch_second', "MODIFIED") * 1000000) as "MODIFIED", sum(length("ACCOUNT_NAME"::string)) as "ACCOUNT_NAME", sum(length("ADDRESS"::string)) as "ADDRESS", sum(abs("_SLING_LOADED_AT")) as "_SLING_LOADED_AT", sum(length("_SLING_STREAM_URL"::string)) as "_SLING_STREAM_URL" from "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:35:56 DBG drop table if exists "SRC_CLIENT"."ACCOUNTS"
2024-06-05 22:35:56 DBG table "SRC_CLIENT"."ACCOUNTS" dropped
2024-06-05 22:35:57 DBG create table "SRC_CLIENT"."ACCOUNTS" ("ACCOUNT_ID" integer,
"MODIFIED" date,
"ACCOUNT_NAME" text,
"ADDRESS" text,
"_SLING_LOADED_AT" bigint,
"_SLING_STREAM_URL" varchar)
2024-06-05 22:35:58 INF created table "SRC_CLIENT"."ACCOUNTS"
2024-06-05 22:35:58 DBG insert into "SRC_CLIENT"."ACCOUNTS" ("ACCOUNT_ID", "MODIFIED", "ACCOUNT_NAME", "ADDRESS", "_SLING_LOADED_AT", "_SLING_STREAM_URL") select "ACCOUNT_ID", "MODIFIED", "ACCOUNT_NAME", "ADDRESS", "_SLING_LOADED_AT", "_SLING_STREAM_URL" from "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:35:59 DBG inserted rows into "SRC_CLIENT"."ACCOUNTS" from temp table "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:35:59 INF inserted 2 rows into "SRC_CLIENT"."ACCOUNTS" in 16 secs [0 r/s]
2024-06-05 22:35:59 DBG drop table if exists "SRC_CLIENT"."ACCOUNTS_TMP"
2024-06-05 22:36:00 DBG table "SRC_CLIENT"."ACCOUNTS_TMP" dropped
2024-06-05 22:36:00 DBG closed "snowflake" connection (conn-snowflake-CEI)
2024-06-05 22:36:00 INF execution succeeded

2024-06-05 22:36:01 INF Sling Replication Completed in 18s | local:// -> MY_SNOWFLAKE | 1 Successes | 0 Failures
flarco commented 3 months ago

Hi, it's an epoch timestamp.

See https://www.epochconverter.com/ For snowflake, use https://docs.snowflake.com/en/sql-reference/functions/to_timestamp

flarco commented 3 months ago

Not possible to cast as timestamp. This is due to sling dealing with many systems, epoch is a timezone neutral, universal value. You could maybe use post_sql to update the target table? Or create a view.

flarco commented 3 months ago

All right, thinking of it, I've reconsidered and will add logic to accept SLING_LOADED_AT_COLUMN='timestamp' so the column will be casted as native timestamp. SLING_LOADED_AT_COLUMN='true' will still use default epoch / unix timestamp.

johnoscott commented 3 months ago

Yes its more of a convenience feature, so I appreciate you agreeing to make the change.

flarco commented 3 months ago

Added with https://github.com/slingdata-io/sling-cli/pull/318/commits/f53d09246b5ac00a3cd0ef7165a2ef02437eacb1 for next release. Closing.