timeplus-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
1 stars 0 forks source link

target stream created is append-only and incremental sync result is not right when source stream is mutable stream and sling from proton to proton #28

Open jhao0117 opened 1 month ago

jhao0117 commented 1 month ago
  1. Source stream is a mutable stream:
    
    timeplusd :) show create coinbase_ohlc_1m_vkv

SHOW CREATE STREAM coinbase_ohlc_1m_vkv

Query id: 4e15a8d2-e04b-43b2-b90e-754046a5176c

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ CREATE MUTABLE STREAM default.coinbase_ohlc_1m_vkv ( time datetime64(3), symbol string, open float32, close float32, high float32, low float32, _tp_time datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4) ) ENGINE = MutableStream(1, 1) PRIMARY KEY (time, symbol) │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.001 sec.

timeplusd :)


2. run sling to replicate data: 

(base) jameshao@192 sling % ./sling run --src-conn AWS_PROTON_SERVER_2 --src-stream 'coinbase_ohlc_1m_vkv' --tgt- conn MAC_LOCAL --tgt-object 'coinbase_ohlc_1m_vkv' --mode incremental --update-key '_tp_time' 11:05AM INF connecting to source database (proton) 11:05AM INF connecting to target database (proton) 11:05AM INF getting checkpoint value 11:05AM INF Creating intermediate configuration 11:05AM INF Exporting data from source Proton database 11:05AM INF connecting to source database (proton) 11:05AM INF reading from source database 11:05AM INF writing to target file system (file) 11:05AM INF wrote 51554 rows [18,266 r/s] to /var/folders/10/lvb7c5rn6js2w5jcb1pfk9hh0000gn/T/proton_transfer_915658077.csv 11:05AM INF Checking exported data 11:05AM INF Preparing to import data to target Proton database 11:05AM INF Importing data to target Proton database 11:05AM INF connecting to target database (proton) 11:05AM INF reading from source file system (file) 11:05AM INF writing to target database [mode: incremental] 11:05AM INF streaming data 11:05AM INF created table default.coinbase_ohlc_1m_vkv 11:05AM INF inserted 51554 rows into default.coinbase_ohlc_1m_vkv in 9 secs [5,318 r/s] 11:05AM INF Transferred 51554 rows between Proton databases in 12 secs [5,317 r/s] 11:05AM INF execution succeeded (base) jameshao@192 sling %

3. target stream is not a mutable stream but append-only:

timeplusd :) show create coinbase_ohlc_1m_vkv

SHOW CREATE STREAM coinbase_ohlc_1m_vkv

Query id: 2091a5a9-e0c5-4fd8-a3c6-a998d826da5e

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ CREATE STREAM default.coinbase_ohlc_1m_vkv ( time nullable(datetime64(6)), symbol nullable(string), open nullable(decimal(25, 6)), close nullable(decimal(25, 6)), high nullable(decimal(25, 6)), low nullable(decimal(25, 6)), _tp_time datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4), INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 256 ) ENGINE = Stream(1, 1, rand()) PARTITION BY to_YYYYMMDD(_tp_time) ORDER BY to_start_of_hour(_tp_time) SETTINGS index_granularity = 8192 │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.001 sec.

timeplusd :)

4. add some data into the mutable stream and sling incremental again, 

(base) jameshao@192 sling % ./sling run --src-conn AWS_PROTON_SERVER_2 --src-stream 'coinbase_ohlc_1m_vkv' --tgt-conn MAC_LOCAL --tgt-object 'coinbase_ohlc_1m_vkv' --mode incremental --update-key '_tp_time' 11:26AM INF connecting to source database (proton) 11:26AM INF connecting to target database (proton) 11:26AM INF getting checkpoint value 11:26AM INF Creating intermediate configuration 11:26AM INF Exporting data from source Proton database 11:26AM INF connecting to source database (proton) 11:26AM INF reading from source database 11:26AM INF writing to target file system (file) 11:26AM INF wrote 44 rows [33 r/s] to /var/folders/10/lvb7c5rn6js2w5jcb1pfk9hh0000gn/T/proton_transfer_1789645138.csv 11:26AM INF Checking exported data 11:26AM INF Preparing to import data to target Proton database 11:26AM INF Importing data to target Proton database 11:26AM INF connecting to target database (proton) 11:26AM INF reading from source file system (file) 11:26AM INF writing to target database [mode: incremental] 11:26AM INF streaming data 11:27AM INF inserted 44 rows into default.coinbase_ohlc_1m_vkv in 9 secs [5 r/s] 11:27AM INF Transferred 44 rows between Proton databases in 10 secs [5 r/s] 11:27AM INF execution succeeded (base) jameshao@192 sling %

5. the result is not correct:

ubuntu@proton2:~/timeplus/bin$ ./timeplusd client -h 127.0.0.1 timeplusd client version 2.4.7. Connecting to 127.0.0.1:8463 as user default. Connected to timeplusd server version 2.4.7 revision 54459.

timeplusd :) select count() from table(coinbase_ohlc_1m_vkv)

SELECT count() FROM table(coinbase_ohlc_1m_vkv)

Query id: 045ca5ab-9452-4dad-a6f7-635f2b95270e

┌─count()─┐ │ 51594 │ └─────────┘

1 row in set. Elapsed: 0.017 sec.

timeplusd :)

timeplusd :) select count() from table(coinbase_ohlc_1m_vkv)

SELECT count() FROM table(coinbase_ohlc_1m_vkv)

Query id: 22eecd1d-d211-4b11-9e95-4e6e321b9c8c

┌─count()─┐ │ 51598 │ └─────────┘

1 row in set. Elapsed: 0.002 sec.

timeplusd :)


6. check the earliest _tp_time of the source stream and target stream, they are same, so no retention policy impact.

timeplusd :) select min(_tp_time) from table(coinbase_ohlc_1m_vkv)

SELECT min(_tp_time) FROM table(coinbase_ohlc_1m_vkv)

Query id: a4429796-c140-4782-9c3a-6f82f35178a8

┌───────────min(_tp_time)─┐ │ 2024-10-08 09:21:26.779 │ └─────────────────────────┘

1 row in set. Elapsed: 0.021 sec. Processed 51.59 thousand rows, 412.75 KB (2.46 million rows/s., 19.69 MB/s.)

timeplusd :)

timeplusd :) select min(_tp_time) from table(coinbase_ohlc_1m_vkv)

SELECT min(_tp_time) FROM table(coinbase_ohlc_1m_vkv)

Query id: 7e2fd33e-51ab-4cef-a7d8-1e4928e3411e

┌───────────min(_tp_time)─┐ │ 2024-10-08 09:21:26.779 │ └─────────────────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 51.60 thousand rows, 412.78 KB (8.36 million rows/s., 66.86 MB/s.)

timeplusd :)