TobikoData / sqlmesh

Efficient data transformation and modeling framework that is backwards compatible with dbt.
https://sqlmesh.com
Apache License 2.0
1.6k stars 142 forks source link

When matched ambiguous name #2934

Closed MikeWallis42 closed 2 weeks ago

MikeWallis42 commented 1 month ago

I tried to use the when_matched configuration property to add a condition but the target placeholder wasn't substituted. If we take the structure of the example you provide then it could look something like this.

when_matched WHEN MATCHED AND source.salary <> target.salary THEN UPDATE SET target.salary = COALESCE(source.salary, target.salary)

which resolves to the below for Trino

WHEN MATCHED AND "__merge_source__"."salary" <> "salary" THEN UPDATE SET ...

I think perhaps this is an exception where each side of the comparator needs qualifying but then the update statement does not.

I plan to use this to compare a fingerprint for each row so that I can eliminate false positive updates for end users.

eakmanrq commented 1 month ago

So is the error you get a Trino error saying that salary is ambiguous and it is referring to this? <> "salary"

MikeWallis42 commented 1 month ago

I put together a little MRE just to clarify

CREATE TABLE IF NOT EXISTS iceberg.michael_wallis.mre_merge_target(
    update_datetime TIMESTAMP(6),
    employee_id BIGINT,
    salary DOUBLE
);

INSERT INTO iceberg.michael_wallis.mre_merge_target
VALUES(
    FROM_ISO8601_TIMESTAMP('2024-07-25T23:12:00'),
    1,
    15000
),
(
    FROM_ISO8601_TIMESTAMP('2024-07-25T23:12:00'),
    2,
    25000
);

CREATE TABLE IF NOT EXISTS iceberg.michael_wallis.mre_merge_source(
    update_datetime TIMESTAMP(6),
    employee_id BIGINT,
    salary DOUBLE
);

INSERT INTO iceberg.michael_wallis.mre_merge_source
VALUES(
    FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'),
    1,
    17500
),
(
    FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'),
    2,
    25000
),
(
    FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'),
    3,
    75000
);

MERGE INTO iceberg.michael_wallis.mre_merge_target AS "__merge_target__" USING
(
    SELECT
        "update_datetime",
        "employee_id",
        "salary"
    FROM "iceberg"."michael_wallis"."mre_merge_source"
) AS "__merge_source__"
ON "__merge_target__"."employee_id" = "__merge_source__"."employee_id"
WHEN MATCHED AND "salary" <> "__merge_source__"."salary"
    THEN UPDATE
        SET "update_datetime" = "__merge_source__"."update_datetime", "salary" = "__merge_source__"."salary"
WHEN NOT MATCHED
    THEN INSERT
        ("update_datetime", "employee_id", "salary")
        VALUES("__merge_source__"."update_datetime", "__merge_source__"."employee_id", "__merge_source__"."salary")

Screenshot 2024-07-26 at 08 58 52

erindru commented 1 month ago

Thanks for that @MikeWallis42. Is there any chance you can represent the MRE as a set of SQLMesh model definitions instead?

MikeWallis42 commented 1 month ago

@erindru apologies for the delay in responding, I've been on holiday for the last week. I'm attempting to set these up as SQLMesh models but I'm having some trouble. Particularly because changing the contents of a seed model triggers a backfill process which will not yield the same logic flow. Alternatively, categorising the change as forwards only or non-breaking will not run the model and create the merge statement. I think the best I can do is a hybrid... where you plan and backfill the seed and model first. Run the inserts and updates to mimic changes manually then run the sqlmesh model again afterwards to pick up the changes and attempt to merge them in.

update_datetime,employee_id,salary
'2024-07-25T23:12:00',1,15000
'2024-07-25T23:12:00',2,25000
MODEL (
  name salary,
  kind SEED (
    path '$root/seeds/salary.csv'
  )
)

N.b. I couldn't work out how to type cast the seed directly here, I kept getting token errors when specifying the column types.

MODEL (
  name salary_incremental,
  kind INCREMENTAL_BY_UNIQUE_KEY (
    unique_key employee_id,
    when_matched WHEN MATCHED AND target.salary <> source.salary THEN UPDATE SET target.update_datetime = source.update_datetime, target.salary = source.salary
  ),
  cron '@hourly'
);

SELECT
  update_datetime,
  employee_id,
  salary
FROM salary
INSERT INTO salary
VALUES(
  FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'),
  3,
  75000
);
UPDATE salary
SET update_datetime = FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'), salary = 17500
WHERE employee_id = 1;
UPDATE salary
SET update_datetime = FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00')
WHERE employee_id = 2;
MikeWallis42 commented 3 weeks ago

@erindru, @eakmanrq, apologies for the nudge but I've been wracking my brains and I'm not sure that I can think of another way of providing an example purely with SQLMesh. Is the latest example I gave enough?

erindru commented 3 weeks ago

@MikeWallis42 thanks for your patience, i've managed to reproduce this based on your example using the following steps:

seeds/salary.csv

update_datetime,employee_id,salary
'2024-07-25T23:12:00',1,15000
'2024-07-25T23:12:00',2,25000

models/salary.sql

MODEL (
  name michael_wallis.salary,
  kind SEED (
    path '$root/seeds/salary.csv'
  ),
  columns (
    update_datetime TIMESTAMP,
    employee_id INTEGER,
    salary INTEGER
  )
)

models/salary_incremental.sql

MODEL (
  name michael_wallis.salary_incremental,
  kind INCREMENTAL_BY_UNIQUE_KEY (
    unique_key employee_id,
    when_matched WHEN MATCHED AND target.salary <> source.salary THEN UPDATE SET target.update_datetime = source.update_datetime, target.salary = source.salary
  ),
  cron '@hourly'
);

SELECT
  update_datetime,
  employee_id,
  salary
FROM michael_wallis.salary

Run sqlmesh:

$ sqlmesh plan
...SNIP
Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:00

The target environment has been updated successfully

Change the source data:

> use iceberg.michael_wallis;
> show create view salary; --to find the underlying table, eg 'sqlmesh__michael_wallis.michael_wallis__salary__703464889'
> UPDATE sqlmesh__michael_wallis.michael_wallis__salary__703464889 SET update_datetime = FROM_ISO8601_TIMESTAMP('2024-07-26T00:00:00'), salary = 17500 WHERE employee_id = 1;

Run sqlmesh run, this time with help from faketime to pretend an hour has elapsed so the next interval gets triggered:

$ faketime '+1 hour' sqlmesh run
...SNIP
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=AMBIGUOUS_NAME, message="line 1:471: Column 'salary' is ambiguous", 

The problem is that the when_matched is not translated correctly. In the model, its defined as

WHEN MATCHED AND target.salary <> source.salary THEN UPDATE SET target.update_datetime = source.update_datetime, target.salary = source.salary

However, by the time it gets to Trino, it becomes:

WHEN MATCHED AND "salary" <> "__merge_source__"."salary" THEN UPDATE SET "update_datetime" = "__merge_source__"."update_datetime", "salary" = "__merge_source__"."salary"

which means that the target placeholder is not being replaced with __merge_target__ and is just being omitted entirely.

Strangely it's working fine for source, i'll do some investigation because we have a unit test for this exact scenario and its currently passing

erindru commented 3 weeks ago

We've identified this as a bug in sqlglot SQL generation and aim to have a fix up next week. It only affects the Trino dialect, other dialects work as expected

erindru commented 2 weeks ago

Oh man, this has uncovered a can of worms.

This issue and subsequent PR is the reason for the current behaviour because I was having the opposite problem

And this behaviour of deliberately unaliasing the target table is also set for the Postgres dialect.

I'll need to dig in further

erindru commented 2 weeks ago

Ok, it looks like my issue was correct. The target alias does need to be removed from the THEN UPDATE SET part and this is also documented for Postgres:

Do not include the table's name in the specification of a target column.

The bug is excluding it from the WHEN MATCHED condition as well. It looks like it needs to be included there, and the original issue wasn't using a custom when_matched expression so this bug didnt get discovered before.

PR: https://github.com/tobymao/sqlglot/pull/3940

georgesittas commented 2 weeks ago

Fixed by https://github.com/tobymao/sqlglot/pull/3940

MikeWallis42 commented 2 weeks ago

Thank you so much for fixing this @erindru

viplazylmht commented 3 days ago

@erindru I think the target alias can be in THEN UPDATE SET as well.

Reproduce: Trino 431

CREATE TABLE default.EMPLOYEE (
  empId INTEGER,
  name varchar,
  salary INTEGER
);

INSERT INTO default.EMPLOYEE  VALUES (0001, 'Clark', 1000);
INSERT INTO default.EMPLOYEE  VALUES (0002, 'Dave', 1002);

explain (type validate)
MERGE into default.EMPLOYEE target
using (select 0003 as empId, 'John' as name, 1002 as salary) src
on target.name = src.name
when matched and src.salary <> target.salary then update set salary = coalesce(src.salary, target.salary);

Valid|
-----+
true |

target alias appears at the right side of the SET statement: coalesce(src.salary, target.salary) (the original query statement of this issue also including above expression).