slingdata-io / sling-cli

Sling is a CLI tool that extracts data from a source storage/database and loads it in a target storage/database.
https://docs.slingdata.io
GNU General Public License v3.0
446 stars 34 forks source link

Enum is not being properly casted while doing ingestion #434

Open PiotrSierkin-Ki opened 2 weeks ago

PiotrSierkin-Ki commented 2 weeks ago

Issue Description

source: LOCAL_DEV_SOUECE
target: LOCAL_DEV_DESTINATION

defaults:
  object: '{stream_schema}.{stream_table}'
  mode: incremental
  primary_key: id

streams:
  public.users:
    mode: incremental
    primary_key: [id]
    sql: select "id", "username", "email", "role"::userroleenum, "created_at", "last_login" from "public"."users"

SOURCE DB

-- Step 1: Create a new enum type for user roles
CREATE TYPE userroleenum AS ENUM ('admin', 'editor', 'viewer');

-- Step 2: Create a new table for users that uses this enum type
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    role userroleenum NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login TIMESTAMP
);

-- Step 3: Provide some insert statements to populate the user table
INSERT INTO users (username, email, role, last_login)
VALUES 
('admin_user', 'admin@example.com', 'admin', '2023-01-01 12:00:00'),
('editor_user', 'editor@example.com', 'editor', '2023-02-01 12:00:00'),
('viewer_user', 'viewer@example.com', 'viewer', '2023-03-01 12:00:00');

DESTINATION DB

-- Step 1: Create a new enum type for user roles
CREATE TYPE userroleenum AS ENUM ('admin', 'editor', 'viewer');

-- Step 2: Create a new table for users that uses this enum type
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    role userroleenum NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login TIMESTAMP
);
❯ sling run -r bug_sling.yaml --debug
2024-11-06 14:22:02 INF Sling Replication | LOCAL_DEV_SOUECE -> LOCAL_DEV_DESTINATION | public.users
2024-11-06 14:22:02 DBG Sling version: 1.2.22 (darwin arm64)
2024-11-06 14:22:02 DBG type is db-db
2024-11-06 14:22:02 DBG using: {"columns":null,"mode":"incremental","transforms":null}
2024-11-06 14:22:02 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1}
2024-11-06 14:22:02 DBG using target options: {"datetime_format":"auto","file_max_rows":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-11-06 14:22:02 DBG opened "postgres" connection (conn-postgres-ugQ)
2024-11-06 14:22:02 DBG opened "postgres" connection (conn-postgres-ohX)
2024-11-06 14:22:02 INF connecting to source database (postgres)
2024-11-06 14:22:02 INF connecting to target database (postgres)
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 INF reading from source database
2024-11-06 14:22:02 DBG select "id", "username", "email", "role"::userroleenum, "created_at", "last_login" from "public"."users"
2024-11-06 14:22:02 INF writing to target database [mode: incremental]
2024-11-06 14:22:02 DBG drop table if exists "public"."users_tmp"
2024-11-06 14:22:02 DBG table "public"."users_tmp" dropped
2024-11-06 14:22:02 DBG create table if not exists "public"."users_tmp" ("id" integer,
"username" varchar(65500),
"email" varchar(65500),
"role" text,
"created_at" timestamp,
"last_login" timestamp)
2024-11-06 14:22:02 INF created table "public"."users_tmp"
2024-11-06 14:22:02 INF streaming data
2024-11-06 14:22:02 DBG select count(*) cnt from "public"."users_tmp"
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 DBG Performing upsert from temporary table "public"."users_tmp" to target table "public"."users" with primary keys [id]
2024-11-06 14:22:02 DBG using text since type 'userroleenum' not mapped for col 'role'
2024-11-06 14:22:02 DBG inserting username [character varying(65500)] into username [character varying(255)]
2024-11-06 14:22:02 DBG inserting email [character varying(65500)] into email [character varying(255)]
2024-11-06 14:22:02 DBG inserting role [text] into role [userroleenum]
2024-11-06 14:22:02 DBG create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
2024-11-06 14:22:02 DBG create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
2024-11-06 14:22:02 DBG with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
2024-11-06 14:22:02 DBG drop table if exists "public"."users_tmp"
2024-11-06 14:22:02 DBG table "public"."users_tmp" dropped
2024-11-06 14:22:02 DBG closed "postgres" connection (conn-postgres-ohX)
2024-11-06 14:22:02 INF execution failed

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
--- transaction.go:135 ExecContext ---
pq: column "role" is of type userroleenum but expression is of type text

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

2024-11-06 14:22:03 INF Sling Replication Completed in 0s | LOCAL_DEV_SOUECE -> LOCAL_DEV_DESTINATION | 0 Successes | 1 Failures

fatal:
--- proc.go:271 main ---
--- sling_cli.go:458 main ---
--- sling_cli.go:494 cliInit ---
--- cli.go:286 CliProcess ---
~ failure running replication (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:209 processRun ---

--------------------------- public.users ---------------------------
--- task_run.go:116 func2 ---
~ Could not WriteToDb
--- task_run.go:559 runDbToDb ---
~ Error transferring data from temp to final table
--- task_run_write.go:325 WriteToDb ---
~ Could not perform upsert from temp
--- task_run_write.go:759 transferData ---
~ Could not perform upsert from temp
--- task_run_write.go:803 performUpsert ---
~ could not upsert
--- database.go:2153 Upsert ---
~ Could not upsert
--- transaction.go:479 Upsert ---

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create temporary table temp3ttRN as
        with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        , updates as (
                update "public"."users" tgt
                set "username" = src."username", "email" = src."email", "role" = src."role", "created_at" = src."created_at", "last_login" = src."last_login"
                from src_table src
                where src."id" = tgt."id"
                returning tgt.*
        )
        select * from updates
--- transaction.go:135 ExecContext ---
pq: column "role" is of type userroleenum but expression is of type text

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: create unique index if not exists temp3ttRN_idx on temp3ttRN ("id")
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block

--- task_run.go:116 func2 ---
--- task_run.go:559 runDbToDb ---
--- task_run_write.go:325 WriteToDb ---
--- task_run_write.go:759 transferData ---
--- task_run_write.go:803 performUpsert ---
--- database.go:2153 Upsert ---
--- transaction.go:474 Upsert ---
~ Error executing query
--- transaction.go:149 ExecMultiContext ---
~ Error executing: with src_table as (
                select "id", "username", "email", "role", "created_at", "last_login" from "public"."users_tmp"
        )
        insert into "public"."users"
        ("id", "username", "email", "role", "created_at", "last_login")
        select "id", "username", "email", "role", "created_at", "last_login" from src_table src
        where not exists (
                select 1
                from temp3ttRN upd
                where src."id" = upd."id"
        )
--- transaction.go:135 ExecContext ---
pq: current transaction is aborted, commands ignored until end of transaction block
PiotrSierkin-Ki commented 2 weeks ago

Additionally using full-refresh works but enum is being casted to the text type

source: LOCAL_DEV_SOUECE
target: LOCAL_DEV_DESTINATION

defaults:
  object: '{stream_schema}.{stream_table}'
  mode: incremental
  primary_key: id

streams:
  public.users:
    mode: full-refresh