Closed alberttwong closed 5 months ago
Duplicate key is append only immutable so deletes or update aren't allowed
Primary key tables are mutable and can have update and delete and upsert. Key used as unique and sort key. https://docs.starrocks.io/docs/table_design/table_types/primary_key_table/
Default table type is duplicate key. You can create primary key table with correct syntax.
We also have a 3rd type. Aggregate key table. Also immutable but increments (no new row create on key match) on duplicate entries. Really good for website hits use cases.
OK, here is the plan so far (feedback welcome!).
Since there are many options, or ways to create tables, I think the best path forward is to accept multiple entries in a new option: target.options.table_keys
. This will accommodate multiple connection types (such as redshift, which uses sort keys) or bigquery (with uses partition or clustering keys), postgres (index keys)... Like this, sling can add extra DDL constraints as specified.
The primary key can be defined at source (leaving it as-it, since it's most common) or target.options.table_keys if preferred.
Example Task config:
source:
conn: SOURCE_DB
stream: public.new_table
primary_key: [col1, col2] # works with all
target:
conn: TARGET_DB
object: public.new_table
options:
table_keys:
primary: [col1, col2] # works with all
hash: [col1, col2] # works with starrocks
aggregate: [col1, col2] # works with starrocks
duplicate: [col1, col2] # works with starrocks
sort: [col1, col2] # works with starrocks, redshift
cluster: [col1, col2] # works with snowflake, bigquery
partition: [col1, col2] # works with bigquery
index: [col1, col2] # works with mysql, postgres, sql server, sqlite...
unique_index: [col1, col2] # works with mysql, postgres, sql server, sqlite...
Cool... looks like it'll support primary key and duplicate key. how about something like this?
create table orders (
dt date NOT NULL,
order_id bigint NOT NULL,
user_id int NOT NULL,
merchant_id int NOT NULL,
good_id int NOT NULL,
good_name string NOT NULL,
price int NOT NULL,
cnt int NOT NULL,
revenue int NOT NULL,
state tinyint NOT NULL
) PRIMARY KEY (dt, order_id)
PARTITION BY RANGE(`dt`) (
PARTITION p20210820 VALUES [('2021-08-20'), ('2021-08-21')),
PARTITION p20210821 VALUES [('2021-08-21'), ('2021-08-22')),
...
PARTITION p20210929 VALUES [('2021-09-29'), ('2021-09-30')),
PARTITION p20210930 VALUES [('2021-09-30'), ('2021-10-01'))
DISTRIBUTED BY HASH(order_id)
ORDER BY(`merchant_id`,`revenue`)
PROPERTIES("replication_num" = "3",
"enable_persistent_index" = "true");
or
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p1 VALUES LESS THAN ("2014-01-01"),
PARTITION p2 VALUES LESS THAN ("2014-06-01"),
PARTITION p3 VALUES LESS THAN ("2014-12-01")
)
DISTRIBUTED BY HASH(k2)
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10"
);
🤯 haha, I think we have to draw the line somewhere. The user will just have to use target.options.table_ddl
for that.
Tell me when you want me to test primary key and duplicate key tables,
@alberttwong Ready for testing!
For StarRocks, target.options.table_keys
accepts aggregate
, duplicate
, hash
, primary
or unique
. So you can create all 4 types of tables if you want.
Examples of using table_keys
:
CLI FLAGS
sling run -d --src-conn postgres --src-stream public.test1k --tgt-conn starrocks --tgt-object 'public.{stream_schema}_{stream_table}' --tgt-options '{ table_keys: { duplicate: [ id, name ], hash: [ id ] } }'
TASK YAML
source:
conn: postgres
stream: public.test1k
primary_key: [ id, name ]
target:
conn: starrocks
object: 'public.{stream_schema}_{stream_table}'
options:
table_keys:
hash: [ id ]
mode: full-refresh
sling run -c task.yaml
REPLICATION YAML
source: postgres
target: starrocks
defaults:
mode: incremental
object: 'public.{stream_schema}_{stream_table}'
streams:
public.test1k:
mode: full-refresh
update_key: create_date
target_options:
table_keys:
unique: [ id, name ]
hash: [ id ]
sling run -r replication.yaml
CLI didn't work. Did I miss something?
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { duplicate: [ staff_id ], hash: [ staff_id ] } }'
2024-02-10 23:22:28 DBG Sling version: dev (darwin arm64)
2024-02-10 23:22:28 DBG type is db-db
2024-02-10 23:22:28 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:22:28 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:22:28 INF connecting to source database (postgres)
2024-02-10 23:22:28 INF connecting to target database (starrocks)
2024-02-10 23:22:28 INF reading from source database
2024-02-10 23:22:28 DBG select * from "public"."staff"
2024-02-10 23:22:28 INF writing to target database [mode: full-refresh]
2024-02-10 23:22:28 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:22:28 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:22:28 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { sort: [ staff_id ], hash: [ staff_id ] } }'
2024-02-10 23:23:02 DBG Sling version: dev (darwin arm64)
2024-02-10 23:23:02 DBG type is db-db
2024-02-10 23:23:02 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:23:02 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:23:02 INF connecting to source database (postgres)
2024-02-10 23:23:02 INF connecting to target database (starrocks)
2024-02-10 23:23:02 INF reading from source database
2024-02-10 23:23:02 DBG select * from "public"."staff"
2024-02-10 23:23:02 INF writing to target database [mode: full-refresh]
2024-02-10 23:23:02 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:23:02 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:23:02 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary: [ staff_id ], hash: [ staff_id ] } }'
2024-02-10 23:23:10 DBG Sling version: dev (darwin arm64)
2024-02-10 23:23:10 DBG type is db-db
2024-02-10 23:23:10 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:23:10 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:23:10 INF connecting to source database (postgres)
2024-02-10 23:23:10 INF connecting to target database (starrocks)
2024-02-10 23:23:10 INF reading from source database
2024-02-10 23:23:10 DBG select * from "public"."staff"
2024-02-10 23:23:10 INF writing to target database [mode: full-refresh]
2024-02-10 23:23:10 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:23:10 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:23:10 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary:staff_id , hash:staff_id } }'
2024-02-10 23:23:46 DBG Sling version: dev (darwin arm64)
2024-02-10 23:23:46 DBG type is db-db
2024-02-10 23:23:46 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:23:46 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:23:46 INF connecting to source database (postgres)
2024-02-10 23:23:46 INF connecting to target database (starrocks)
2024-02-10 23:23:46 INF reading from source database
2024-02-10 23:23:46 DBG select * from "public"."staff"
2024-02-10 23:23:46 INF writing to target database [mode: full-refresh]
2024-02-10 23:23:46 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:23:46 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:23:46 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
So I tried with --primary-key
. This feels weird to define because I already using options but it worked??? surprised.
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary:staff_id , hash:staff_id } }' -d --p
rimary-key staff_id
2024-02-10 23:26:04 DBG Sling version: dev (darwin arm64)
2024-02-10 23:26:04 DBG type is db-db
2024-02-10 23:26:04 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:26:04 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:26:05 INF connecting to source database (postgres)
2024-02-10 23:26:05 INF connecting to target database (starrocks)
2024-02-10 23:26:05 INF reading from source database
2024-02-10 23:26:05 DBG select * from "public"."staff"
2024-02-10 23:26:05 INF writing to target database [mode: full-refresh]
2024-02-10 23:26:05 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:26:05 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:26:05 DBG create table if not exists `albert`.`staff_tmp` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary) primary key(`staff_id`) distributed by hash(`staff_id`)
2024-02-10 23:26:05 INF streaming data
2024-02-10 23:26:05 DBG WARN: Using INSERT mode which is meant for small datasets. Please set the `fe_url` for loading large datasets via Stream Load mode. See https://docs.slingdata.io/connections/database-connections/starrocks
2024-02-10 23:26:05 DBG select count(*) cnt from `albert`.`staff_tmp`
2024-02-10 23:26:05 DBG comparing checksums []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"} vs []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"}: []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"}
2024-02-10 23:26:05 DBG select sum(abs(`staff_id`)) as `staff_id`, sum(length(`first_name`)) as `first_name`, sum(length(`last_name`)) as `last_name`, sum(abs(`address_id`)) as `address_id`, sum(length(`email`)) as `email`, sum(abs(`store_id`)) as `store_id`, sum(`active`) as `active`, sum(length(`username`)) as `username`, sum(length(`password`)) as `password`, sum(cast((UNIX_TIMESTAMP(`last_update`) * 1000000) as UNSIGNED)) as `last_update`, sum(length(`picture`)) as `picture` from `albert`.`staff_tmp`
2024-02-10 23:26:05 DBG Error 1064: Vectorized engine does not support the operator, cast from VARBINARY to VARCHAR failed, maybe use switch function backend [id=10004] [host=127.0.0.1]
2024-02-10 23:26:05 DBG drop table if exists `albert`.`staff`
2024-02-10 23:26:05 DBG table `albert`.`staff` dropped
2024-02-10 23:26:05 INF dropped table `albert`.`staff`
2024-02-10 23:26:05 DBG create table if not exists `albert`.`staff` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary) primary key(`staff_id`) distributed by hash(`staff_id`)
2024-02-10 23:26:05 INF created table `albert`.`staff`
2024-02-10 23:26:05 DBG insert into `albert`.`staff` (`staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture`) select `staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture` from `albert`.`staff_tmp`
2024-02-10 23:26:05 DBG inserted rows into `albert`.`staff` from temp table `albert`.`staff_tmp`
2024-02-10 23:26:05 INF inserted 2 rows into `albert`.`staff` in 0 secs [3 r/s]
2024-02-10 23:26:05 DBG connection was closed, reconnecting
2024-02-10 23:26:05 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:26:05 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:26:05 INF execution succeeded
tried duplicate-key but didn't work.
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { duplicate:staff_id , hash:staff_id } }' -d --duplicate-key staff_id
2024-02-10 23:27:36 DBG Sling version: dev (darwin arm64)
2024-02-10 23:27:36 DBG type is db-db
2024-02-10 23:27:36 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:27:36 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:27:36 INF connecting to source database (postgres)
2024-02-10 23:27:36 INF connecting to target database (starrocks)
2024-02-10 23:27:36 INF reading from source database
2024-02-10 23:27:36 DBG select * from "public"."staff"
2024-02-10 23:27:36 INF writing to target database [mode: full-refresh]
2024-02-10 23:27:36 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:27:36 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:27:36 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
maybe change duplicate to --sort-key
???... didn't work either
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { duplicate:staff_id , hash:staff_id } }' -d --sort-key staff_id
2024-02-10 23:27:45 DBG Sling version: dev (darwin arm64)
2024-02-10 23:27:45 DBG type is db-db
2024-02-10 23:27:45 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-02-10 23:27:45 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-10 23:27:45 INF connecting to source database (postgres)
2024-02-10 23:27:45 INF connecting to target database (starrocks)
2024-02-10 23:27:45 INF reading from source database
2024-02-10 23:27:45 DBG select * from "public"."staff"
2024-02-10 23:27:45 INF writing to target database [mode: full-refresh]
2024-02-10 23:27:45 DBG drop table if exists `albert`.`staff_tmp`
2024-02-10 23:27:45 DBG table `albert`.`staff_tmp` dropped
2024-02-10 23:27:45 INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:202 GenerateDDL ---
did not provide sort-key or primary-key for creating StarRocks table
Are you using the latest code/binary? did not provide sort-key or primary-key for creating StarRocks table
is an old error, I changed that.
Also, there is no --duplicate-key
or --sort-key
option. And the values inside table_keys
need to be arrays.
Try these:
sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { duplicate: [staff_id] , hash: [staff_id] } }' -d
sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { duplicate: [staff_id] , hash: [staff_id] , sort: [staff_id] } }' -d
im having same issue
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
starrocks-fe:
image: starrocks/fe-ubuntu:3.2-latest
hostname: starrocks-fe
user: root
command: |
sh /opt/starrocks/fe/bin/start_fe.sh
ports:
- 8030:8030
- 9020:9020
- 9030:9030
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
healthcheck:
test: 'mysql -u root -h starrocks-fe -P 9030 -e "SHOW FRONTENDS\G" |grep "Alive: true"'
interval: 10s
timeout: 5s
retries: 3
starrocks-be:
image: starrocks/be-ubuntu:3.2-latest
command:
- /bin/bash
- -c
- |
echo "# Enable data cache" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_enable = true" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_mem_size = 536870912" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_disk_size = 1073741824" >> /opt/starrocks/be/conf/be.conf
sleep 15s
mysql --connect-timeout 2 -h starrocks-fe -P 9030 -u root -e "ALTER SYSTEM ADD BACKEND \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
- 8040:8040
hostname: starrocks-be
user: root
depends_on:
- starrocks-fe
healthcheck:
test: 'mysql -u root -h starrocks-fe -P 9030 -e "SHOW BACKENDS\G" |grep "Alive: true"'
interval: 10s
timeout: 5s
retries: 3
sling run \
--src-conn POSTGRES \
--src-stream "SELECT order_id, name, order_value, priority, order_date FROM orders " \
--tgt-conn STARROCKS \
--tgt-object public.orders \
--tgt-options '{"primary-key": "order_id"}' \
--mode full-refresh \
--debug
OR
sling run \
--src-conn POSTGRES \
--src-stream "SELECT order_id, name, order_value, priority, order_date FROM orders " \
--tgt-conn STARROCKS \
--tgt-object public.orders \
--tgt-options '{ table_keys: { duplicate: [order_id] , hash: [order_id] , sort: [order_id] } }' \
--mode full-refresh \
--debug
Error Message
INF execution failed
fatal:
--- task_run.go:87 func1 ---
~ execution failed
--- task_run.go:501 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:165 WriteToDb ---
~ could not create temp table `public`.`orders_tmp`
--- task_func.go:72 createTableIfNotExists ---
~ Could not generate DDL for `public`.`orders_tmp`
--- database_starrocks.go:198 GenerateDDL ---
did not provide primary key for creating StarRocks table
(venv) soumilshah@Soumils-MBP Sling %
@soumilshah1995 the error message is old, so I believe you're not using latest version in branch v1.1.4. See new error message: https://github.com/slingdata-io/sling-cli/blob/v1.1.4/core/dbio/database/database_starrocks.go#L215
hmm could it due to I installed using home brew ?
yes looks like % sling --version Version: 1.1.3 (venv) soumilshah@Soumils-MBP Sling %
let me update the version
can you help me here how I can update using home brew ?
brew uninstall slingdata-io/sling/sling
brew install slingdata-io/sling/sling@1.1.4
Warning: No available formula or cask with the name "slingdata-io/sling/sling@1.1.4". Did you mean slingdata-io/sling/sling?
but if I do brew install slingdata-io/sling/sling
I will install 1.1.3
Please see here to compile form source: https://github.com/slingdata-io/sling-cli?tab=readme-ov-file#compiling-from-source
You need to install GoLang prior, and switch to branch V1.1.4
On Sun, Feb 11, 2024, 12:05 PM Soumil Nitin Shah @.***> wrote:
can you help me here how I can update using home brew ?
brew uninstall slingdata-io/sling/sling
brew install @.***
Warning: No available formula or cask with the name @.***". Did you mean slingdata-io/sling/sling?
but if I do brew install slingdata-io/sling/sling
I will install 1.1.3
— Reply to this email directly, view it on GitHub https://github.com/slingdata-io/sling-cli/issues/149#issuecomment-1937779480, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB2QZYWIZBHNIFBQWGFDJS3YTDMZ7AVCNFSM6AAAAABDANLG4CVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSMZXG43TSNBYGA . You are receiving this because you commented.Message ID: @.***>
Are you using the latest code/binary?
did not provide sort-key or primary-key for creating StarRocks table
is an old error, I changed that.
Primary and duplicate key works.
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary: [ staff_id ], hash: [ staff_id ] } }'
2024-02-11 07:45:12 DBG Sling version: dev (darwin arm64)
2024-02-11 07:45:12 DBG type is db-db
2024-02-11 07:45:12 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1,"transforms":["parse_bit"]}
2024-02-11 07:45:12 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source","table_keys":{"hash":["staff_id"],"primary":["staff_id"]}}
2024-02-11 07:45:12 INF connecting to source database (postgres)
2024-02-11 07:45:12 INF connecting to target database (starrocks)
2024-02-11 07:45:12 INF reading from source database
2024-02-11 07:45:12 DBG select * from "public"."staff"
2024-02-11 07:45:12 INF writing to target database [mode: full-refresh]
2024-02-11 07:45:12 DBG drop table if exists `albert`.`staff_tmp`
2024-02-11 07:45:12 DBG table `albert`.`staff_tmp` dropped
2024-02-11 07:45:12 DBG create table if not exists `albert`.`staff_tmp` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary) primary key(`staff_id`) distributed by hash(`staff_id`)
2024-02-11 07:45:12 INF streaming data
2024-02-11 07:45:12 DBG WARN: Using INSERT mode which is meant for small datasets. Please set the `fe_url` for loading large datasets via Stream Load mode. See https://docs.slingdata.io/connections/database-connections/starrocks
2024-02-11 07:45:12 DBG select count(*) cnt from `albert`.`staff_tmp`
2024-02-11 07:45:12 DBG comparing checksums []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"} vs []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"}: []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture"}
2024-02-11 07:45:12 DBG select sum(abs(`staff_id`)) as `staff_id`, sum(length(`first_name`)) as `first_name`, sum(length(`last_name`)) as `last_name`, sum(abs(`address_id`)) as `address_id`, sum(length(`email`)) as `email`, sum(abs(`store_id`)) as `store_id`, sum(`active`) as `active`, sum(length(`username`)) as `username`, sum(length(`password`)) as `password`, sum(cast((UNIX_TIMESTAMP(`last_update`) * 1000000) as UNSIGNED)) as `last_update`, sum(length(`picture`)) as `picture` from `albert`.`staff_tmp`
2024-02-11 07:45:12 DBG Error 1064: Vectorized engine does not support the operator, cast from VARBINARY to VARCHAR failed, maybe use switch function backend [id=10004] [host=127.0.0.1]
2024-02-11 07:45:12 DBG drop table if exists `albert`.`staff`
2024-02-11 07:45:12 DBG table `albert`.`staff` dropped
2024-02-11 07:45:12 INF dropped table `albert`.`staff`
2024-02-11 07:45:12 DBG create table if not exists `albert`.`staff` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary) primary key(`staff_id`) distributed by hash(`staff_id`)
2024-02-11 07:45:12 INF created table `albert`.`staff`
2024-02-11 07:45:12 DBG insert into `albert`.`staff` (`staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture`) select `staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture` from `albert`.`staff_tmp`
2024-02-11 07:45:12 DBG inserted rows into `albert`.`staff` from temp table `albert`.`staff_tmp`
2024-02-11 07:45:12 INF inserted 2 rows into `albert`.`staff` in 0 secs [3 r/s]
2024-02-11 07:45:12 DBG connection was closed, reconnecting
2024-02-11 07:45:12 DBG drop table if exists `albert`.`staff_tmp`
2024-02-11 07:45:12 DBG table `albert`.`staff_tmp` dropped
2024-02-11 07:45:12 INF execution succeeded
So I tried to put in a bad primary key and it allowed me to create a duplicate key table. Maybe the warn in this situation should be a fail but it's such minor issue.
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary:staff_id , hash:staff_id } }' -d
7:44AM WRN hash:staff_id is not a valid table key type. Valid table key types are: "aggregate","cluster","duplicate","hash","partition","primary","sort","unique","update"
7:44AM WRN primary:staff_id is not a valid table key type. Valid table key types are: "aggregate","cluster","duplicate","hash","partition","primary","sort","unique","update"
2024-02-11 07:44:28 DBG Sling version: dev (darwin arm64)
2024-02-11 07:44:28 DBG type is db-db
2024-02-11 07:44:28 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1,"transforms":["parse_bit"]}
2024-02-11 07:44:28 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-11 07:44:28 INF connecting to source database (postgres)
2024-02-11 07:44:28 INF connecting to target database (starrocks)
2024-02-11 07:44:29 INF reading from source database
2024-02-11 07:44:29 DBG select * from "public"."staff"
2024-02-11 07:44:29 INF writing to target database [mode: full-refresh]
2024-02-11 07:44:29 DBG drop table if exists `albert`.`staff_tmp`
2024-02-11 07:44:29 DBG table `albert`.`staff_tmp` dropped
2024-02-11 07:44:29 DBG create table if not exists `albert`.`staff_tmp` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary,
`_sling_row_id` varchar(255)) distributed by hash(`_sling_row_id`)
2024-02-11 07:44:29 INF streaming data
2024-02-11 07:44:29 DBG WARN: Using INSERT mode which is meant for small datasets. Please set the `fe_url` for loading large datasets via Stream Load mode. See https://docs.slingdata.io/connections/database-connections/starrocks
2024-02-11 07:44:29 DBG select count(*) cnt from `albert`.`staff_tmp`
2024-02-11 07:44:29 DBG comparing checksums []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture", "_sling_row_id"} vs []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture", "_sling_row_id"}: []string{"staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active", "username", "password", "last_update", "picture", "_sling_row_id"}
2024-02-11 07:44:29 DBG select sum(abs(`staff_id`)) as `staff_id`, sum(length(`first_name`)) as `first_name`, sum(length(`last_name`)) as `last_name`, sum(abs(`address_id`)) as `address_id`, sum(length(`email`)) as `email`, sum(abs(`store_id`)) as `store_id`, sum(`active`) as `active`, sum(length(`username`)) as `username`, sum(length(`password`)) as `password`, sum(cast((UNIX_TIMESTAMP(`last_update`) * 1000000) as UNSIGNED)) as `last_update`, sum(length(`picture`)) as `picture`, sum(length(`_sling_row_id`)) as `_sling_row_id` from `albert`.`staff_tmp`
2024-02-11 07:44:29 DBG Error 1064: Vectorized engine does not support the operator, cast from VARBINARY to VARCHAR failed, maybe use switch function backend [id=10004] [host=127.0.0.1]
2024-02-11 07:44:29 DBG drop table if exists `albert`.`staff`
2024-02-11 07:44:29 DBG table `albert`.`staff` dropped
2024-02-11 07:44:29 INF dropped table `albert`.`staff`
2024-02-11 07:44:29 DBG create table if not exists `albert`.`staff` (`staff_id` integer,
`first_name` string,
`last_name` string,
`address_id` smallint,
`email` string,
`store_id` smallint,
`active` char(5),
`username` string,
`password` string,
`last_update` datetime,
`picture` varbinary,
`_sling_row_id` varchar(255)) distributed by hash(`_sling_row_id`)
2024-02-11 07:44:29 INF created table `albert`.`staff`
2024-02-11 07:44:29 DBG insert into `albert`.`staff` (`staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture`, `_sling_row_id`) select `staff_id`, `first_name`, `last_name`, `address_id`, `email`, `store_id`, `active`, `username`, `password`, `last_update`, `picture`, `_sling_row_id` from `albert`.`staff_tmp`
2024-02-11 07:44:29 DBG inserted rows into `albert`.`staff` from temp table `albert`.`staff_tmp`
2024-02-11 07:44:29 INF inserted 2 rows into `albert`.`staff` in 0 secs [3 r/s]
2024-02-11 07:44:29 DBG connection was closed, reconnecting
2024-02-11 07:44:29 DBG drop table if exists `albert`.`staff_tmp`
2024-02-11 07:44:29 DBG table `albert`.`staff_tmp` dropped
2024-02-11 07:44:29 INF execution succeeded
good warning messages
atwong@Albert-CelerData dvdrental % sling run -d --src-conn postgreslocal --src-stream public.staff --tgt-conn starrockslocal --tgt-object 'albert.staff' --tgt-options '{ table_keys: { primary: [ staff_id2 ], hash: [ staff_id2 ] } }'
2024-02-11 07:47:43 DBG Sling version: dev (darwin arm64)
2024-02-11 07:47:43 DBG type is db-db
2024-02-11 07:47:43 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1,"transforms":["parse_bit"]}
2024-02-11 07:47:43 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source","table_keys":{"hash":["staff_id2"],"primary":["staff_id2"]}}
2024-02-11 07:47:43 INF connecting to source database (postgres)
2024-02-11 07:47:43 INF connecting to target database (starrocks)
2024-02-11 07:47:43 INF reading from source database
2024-02-11 07:47:43 DBG select * from "public"."staff"
2024-02-11 07:47:43 INF writing to target database [mode: full-refresh]
2024-02-11 07:47:43 DBG drop table if exists `albert`.`staff_tmp`
2024-02-11 07:47:43 DBG table `albert`.`staff_tmp` dropped
2024-02-11 07:47:43 INF execution failed
fatal:
--- task_run.go:89 func1 ---
~ execution failed
--- task_run.go:503 runDbToDb ---
~ Could not WriteToDb
--- task_run_write.go:177 WriteToDb ---
~ could not create temp table `albert`.`staff_tmp`
--- task_func.go:67 createTableIfNotExists ---
~ Could not generate DDL for `albert`.`staff_tmp`
--- database_starrocks.go:215 GenerateDDL ---
did not provide primary-key, duplicate-key, aggregate-key or hash-key for creating StarRocks table
everything looks good to me.
I also updated my examples to reflect the new 1.1.4.
Maybe the warn in this situation should be a fail but it's such minor issue.
Agreed, just pushed logic that will error if a column in table_key
is not found.
@flarco I really loved sling and I have made YouTube Video https://www.youtube.com/watch?time_continue=1&v=hxp5OowecLA
also I have shared sling on Linkedin
@soumilshah1995 Thanks alot, that's amazing.
Merged. Releasing 1.1.4
. Thanks for all the testing!
am I missing any steps here
sling --version
Version: 1.1.5
(venv) soumilshah@Soumils-MBP Sling %
(venv) soumilshah@Soumils-MBP Sling % sling run \
--src-conn POSTGRES \
--src-stream "SELECT order_id, name, order_value, priority, order_date FROM orders " \
--tgt-conn STARROCKS \
--tgt-object public.orders \
--tgt-options '{"primary-key": "order_id"}' \
--mode full-refresh \
--debug
6:29PM WRN Could not successfully get format values. Blank values for: stream_table
2024-02-13 18:29:53 DBG Sling version: 1.1.5 (darwin arm64)
2024-02-13 18:29:53 DBG type is db-db
2024-02-13 18:29:53 DBG using source options: {"empty_as_null":true,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1,"transforms":["parse_bit"]}
2024-02-13 18:29:53 DBG using target options: {"datetime_format":"auto","max_decimals":-1,"use_bulk":true,"add_new_columns":true,"column_casing":"source"}
2024-02-13 18:29:53 INF connecting to source database (postgres)
2024-02-13 18:29:53 INF connecting to target database (starrocks)
2024-02-13 18:29:53 INF reading from source database
2024-02-13 18:29:53 WRN Could not successfully get format values. Blank values for: stream_table
2024-02-13 18:29:53 DBG SELECT order_id, name, order_value, priority, order_date FROM orders
2024-02-13 18:29:53 INF writing to target database [mode: full-refresh]
2024-02-13 18:29:53 DBG drop table if exists `public`.`orders_tmp`
2024-02-13 18:29:53 DBG table `public`.`orders_tmp` dropped
2024-02-13 18:29:53 DBG create table if not exists `public`.`orders_tmp` (`order_id` string,
`name` string,
`order_value` integer,
`priority` string,
`order_date` datetime,
`_sling_row_id` varchar(255)) distributed by hash(`_sling_row_id`)
2024-02-13 18:29:53 INF execution failed
fatal:
~ execution failed
--- task_run.go:89 func1 ---
~ Could not WriteToDb
--- task_run.go:503 runDbToDb ---
~ could not create temp table `public`.`orders_tmp`
--- task_run_write.go:177 WriteToDb ---
~ Error creating table `public`.`orders_tmp`
--- task_func.go:73 createTableIfNotExists ---
~ Could not execute SQL
--- database.go:1133 ExecMulti ---
--- task_run.go:89 func1 ---
--- task_run.go:503 runDbToDb ---
--- task_run_write.go:177 WriteToDb ---
--- task_func.go:73 createTableIfNotExists ---
--- database.go:1131 ExecMulti ---
~ Error executing query
--- database.go:1176 ExecMultiContext ---
~ Error executing [tx: false] create table if not exists `public`.`orders_tmp` (`order_id` string,
`name` string,
`order_value` integer,
`priority` string,
`order_date` datetime,
`_sling_row_id` varchar(255)) distributed by hash(`_sling_row_id`)
--- database.go:1162 ExecContext ---
Error 1064: Unexpected exception: Table replication num should be less than or equal to the number of available BE nodes. You can change this default by setting the replication_num table properties. Current alive backend is [10004]. , table=orders_tmp, default_replication_num=3
(venv) soumilshah@Soumils-MBP Sling %
Interesting, not sure about what the error means.
But you're not actually passing the primary-key, to do that, you can use --primary-key
.
Try this:
sling run \
--src-conn POSTGRES \
--src-stream "SELECT order_id, name, order_value, priority, order_date FROM orders " \
--tgt-conn STARROCKS \
--tgt-object public.orders \
--primary-key order_id \
--mode full-refresh \
--debug
FYI By default, if no keys are provided, sling will generate a 128bit uuidv7 in column _sling_row_id
and use that as the HASH
key (only in StarRocks targets)
Got it Thanks @flarco
I think this is more StarRocks issue not sling let me ask Albert on this
version: "3"
services:
starrocks-fe:
image: starrocks/fe-ubuntu:3.2-latest
hostname: starrocks-fe
user: root
command: |
sh /opt/starrocks/fe/bin/start_fe.sh
ports:
- 8030:8030
- 9020:9020
- 9030:9030
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
healthcheck:
test: 'mysql -u root -h starrocks-fe -P 9030 -e "SHOW FRONTENDS\G" |grep "Alive: true"'
interval: 10s
timeout: 5s
retries: 3
starrocks-be:
image: starrocks/be-ubuntu:3.2-latest
command:
- /bin/bash
- -c
- |
echo "# Enable data cache" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_enable = true" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_mem_size = 536870912" >> /opt/starrocks/be/conf/be.conf
echo "block_cache_disk_size = 1073741824" >> /opt/starrocks/be/conf/be.conf
sleep 15s
mysql --connect-timeout 2 -h starrocks-fe -P 9030 -u root -e "ALTER SYSTEM ADD BACKEND \"starrocks-be:9050\";"
/opt/starrocks/be/bin/start_be.sh
ports:
- 8040:8040
hostname: starrocks-be
user: root
depends_on:
- starrocks-fe
healthcheck:
test: 'mysql -u root -h starrocks-fe -P 9030 -e "SHOW BACKENDS\G" |grep "Alive: true"'
interval: 10s
timeout: 5s
retries: 3
metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5432:5432
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"
minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
volumes:
hive-metastore-postgresql:
networks:
default:
name: hudi
sling run \
--src-conn POSTGRES \
--src-stream "SELECT order_id, name, order_value, priority, order_date FROM orders " \
--tgt-conn STARROCKS \
--tgt-object public.orders \
--primary-key order_id \
--mode full-refresh \
--debug````
## Error Message
Error 1064: Unexpected exception: Table replication num should be less than or equal to the number of available BE nodes. You can change this default by setting the replication_num table properties. Current alive backend is [10004]. , table=orders_tmp, default_replication_num=3
@alberttwong any idea why im seeing this error
Error 1064: Unexpected exception: Table replication num should be less than or equal to the number of available BE nodes. You can change this default by setting the replication_num table properties. Current alive backend is [10004]. , table=orders_tmp, default_replication_num=3
It's because the default table type in StarRocks is duplicate key however we do need a distribution hash and something that is a sort key
Eg. Primary key == StarRocks sort key and the key used for distribution hash
https://docs.starrocks.io/docs/table_design/table_types/duplicate_key_table/