timeplus-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
1 stars 0 forks source link

database_proton.go getcount return 0 sometimes. #21

Open yokofly opened 4 hours ago

yokofly commented 4 hours ago

Issue Description

we already sleep several seconds seems still have a chance to get zero?

image

after retry:

[root@node0 sling]# ./sling-timeplus run -r daily_to_his.yaml --streams "rtdc_fund" -d
WARN[0000]log.go:244 gosnowflake.(*defaultLogger).Warn DBUS_SESSION_BUS_ADDRESS envvar looks to be not set, this can lead to runaway dbus-daemon processes. To avoid this, set envvar DBUS_SESSION_BUS_ADDRESS=$XDG_RUNTIME_DIR/bus (if it exists) or DBUS_SESSION_BUS_ADDRESS=/dev/null.
2024-10-15 19:21:59 INF Sling Replication [1 streams] | DAILY_TIMEPLUS -> HISTORICAL_TIMEPLUS

2024-10-15 19:21:59 INF [1 / 1] running stream rtdc_fund
2024-10-15 19:21:59 DBG Sling version: 1.0.7 (linux amd64)
2024-10-15 19:21:59 DBG type is db-db
2024-10-15 19:21:59 DBG using: {"columns":null,"mode":"incremental","transforms":null}
2024-10-15 19:21:59 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":11}
2024-10-15 19:21:59 DBG using target options: {"batch_limit":5000,"datetime_format":"auto","file_max_rows":0,"max_decimals":11,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-10-15 19:21:59 DBG opened "proton" connection (conn-proton-7of)
2024-10-15 19:21:59 DBG opened "proton" connection (conn-proton-eAt)
2024-10-15 19:21:59 INF connecting to source database (proton)
2024-10-15 19:21:59 INF connecting to target database (proton)
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'currency'
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'source'
2024-10-15 19:21:59 INF getting checkpoint value
2024-10-15 19:21:59 DBG select max(`_tp_time`) as max_val from table(`default`.`gj_rtdc_fund`)
2024-10-15 19:21:59 INF reading from source database
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'currency'
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'source'
2024-10-15 19:21:59 DBG select * from table(`default`.`rtdc_fund`) where `_tp_time` > to_time('2024-10-15 07:14:21.227000 +00') order by `_tp_time` asc
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'currency'
2024-10-15 19:21:59 DBG using text since type 'low_cardinality' not mapped for col 'source'
2024-10-15 19:21:59 INF writing to target database [mode: incremental]
2024-10-15 19:21:59 DBG drop stream if exists `default`.`gj_rtdc_fund_tmp`
2024-10-15 19:21:59 DBG table `default`.`gj_rtdc_fund_tmp` dropped
2024-10-15 19:21:59 DBG create stream `default`.`gj_rtdc_fund_tmp` (`tradedate` nullable(int64),
`productaccount` nullable(string),
`assetaccount` nullable(string),
`currency` nullable(string),
`holdingcash` nullable(float64),
`settledcash` nullable(float64),
`frozenCash` nullable(float64),
`unfrozenCash` nullable(float64),
`tradablecash` nullable(float64),
`initDeposit` nullable(float64),
`deposit` nullable(float64),
`tradableDeposit` nullable(float64),
`occupiedDeposit` nullable(float64),
`frozenDeposit` nullable(float64),
`unfrozenDeposit` nullable(float64),
`source` nullable(string),
`_tp_time` datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4))
2024-10-15 19:22:01 INF streaming data
2024-10-15 19:22:01 DBG use `default`
2024-10-15 19:22:02 INF Bulk import completed: 1 batches, 50 rows
2024-10-15 19:22:02 DBG 50 ROWS COPIED
2024-10-15 19:22:07 DBG select count(*) as cnt from table(`default`.`gj_rtdc_fund_tmp`)
2024-10-15 19:22:09 DBG using text since type 'low_cardinality' not mapped for col 'currency'
2024-10-15 19:22:09 DBG using text since type 'low_cardinality' not mapped for col 'source'
2024-10-15 19:22:09 DBG inserting tradedate [nullable(int64)] into tradedate [int64]
2024-10-15 19:22:09 DBG inserting productaccount [nullable(string)] into productaccount [string]
2024-10-15 19:22:09 DBG inserting assetaccount [nullable(string)] into assetaccount [string]
2024-10-15 19:22:09 DBG inserting currency [nullable(string)] into currency [low_cardinality(string)]
2024-10-15 19:22:09 DBG inserting holdingcash [nullable(float64)] into holdingcash [float64]
2024-10-15 19:22:09 DBG inserting settledcash [nullable(float64)] into settledcash [float64]
2024-10-15 19:22:09 DBG inserting frozenCash [nullable(float64)] into frozenCash [float64]
2024-10-15 19:22:09 DBG inserting unfrozenCash [nullable(float64)] into unfrozenCash [float64]
2024-10-15 19:22:09 DBG inserting tradablecash [nullable(float64)] into tradablecash [float64]
2024-10-15 19:22:09 DBG inserting initDeposit [nullable(float64)] into initDeposit [float64]
2024-10-15 19:22:09 DBG inserting deposit [nullable(float64)] into deposit [float64]
2024-10-15 19:22:09 DBG inserting tradableDeposit [nullable(float64)] into tradableDeposit [float64]
2024-10-15 19:22:09 DBG inserting occupiedDeposit [nullable(float64)] into occupiedDeposit [float64]
2024-10-15 19:22:09 DBG inserting frozenDeposit [nullable(float64)] into frozenDeposit [float64]
2024-10-15 19:22:09 DBG inserting unfrozenDeposit [nullable(float64)] into unfrozenDeposit [float64]
2024-10-15 19:22:09 DBG inserting source [nullable(string)] into source [low_cardinality(string)]
2024-10-15 19:22:09 DBG insert into `default`.`gj_rtdc_fund` (`tradedate`, `productaccount`, `assetaccount`, `currency`, `holdingcash`, `settledcash`, `frozenCash`, `unfrozenCash`, `tradablecash`, `initDeposit`, `deposit`, `tradableDeposit`, `occupiedDeposit`, `frozenDeposit`, `unfrozenDeposit`, `source`, `_tp_time`, `_tp_sn`) select `tradedate`, `productaccount`, `assetaccount`, `currency`, `holdingcash`, `settledcash`, `frozenCash`, `unfrozenCash`, `tradablecash`, `initDeposit`, `deposit`, `tradableDeposit`, `occupiedDeposit`, `frozenDeposit`, `unfrozenDeposit`, `source`, `_tp_time`, `_tp_sn` from table(`default`.`gj_rtdc_fund_tmp`)
2024-10-15 19:22:09 DBG inserted rows into default.`gj_rtdc_fund` from temp table `default`.`gj_rtdc_fund_tmp`
2024-10-15 19:22:09 INF inserted 50 rows into default.`gj_rtdc_fund` in 9 secs [5 r/s] [7.4 kB]
2024-10-15 19:22:09 DBG drop stream if exists `default`.`gj_rtdc_fund_tmp`
2024-10-15 19:22:09 DBG table `default`.`gj_rtdc_fund_tmp` dropped
2024-10-15 19:22:09 DBG closed "proton" connection (conn-proton-eAt)
2024-10-15 19:22:09 INF execution succeeded

2024-10-15 19:22:09 INF Sling Replication Completed in 9s | DAILY_TIMEPLUS -> HISTORICAL_TIMEPLUS | 1 Successes | 0 Failures
yokofly commented 4 hours ago

another thing: let's disable sling update