embulk / embulk

Embulk: Pluggable Bulk Data Loader.
https://www.embulk.org/
Apache License 2.0
1.75k stars 201 forks source link

Question: best method to detect & record changes to existing data #573

Open bcipolli opened 7 years ago

bcipolli commented 7 years ago

I would like to ask a question of the embulk community. Is there an email list? Here is my question:

I am running a schedule task ETL from MySQL to Redshift. I re-create the tables each time (no update). During the task, before deleting the old data, I want to detect which cells (row+column) that had been written from the previous ETL had data that changed. I want to record those changes to a new table. (That will help us with our temporal prediction task.)

My question is: are there tools to do this within embulk? If not, what's the correct place to create such code? It doesn't seem to fit the filter nor output logic cleanly. I would also like to avoid reading the input data twice, as it can be quite slow (even though it's just millions of rows).

Any ideas? Thank you in advance..

hiroyuki-sato commented 7 years ago

Hello @bcipolli

I would like to summarize your requirements. My understanding about your requirement is the following. Is this correct? If not, please tell me more detail your scenario (ex. example data or something)

My idea.

I think the following configuration may help you.

P.S.

There are no mailing list or forum about Embulk yet. So currently, this place is the best ask about Embulk.

bcipolli commented 7 years ago

Hi @hiroyuki-sato ,

Thank you for your ideas. The requirements is not quite what I was trying to say. Here's the simplest version I can think of:

Input side (MySQL):

Output side (Redshift):

hiroyuki-sato commented 7 years ago

@bcipolli Do you mean like the following?

If so my idea is the below.

2017/3/26 input data

field_name value created_at
name1 val1_1 2017/3/26
name2 val2_1 2017/3/26

2017/3/27 input data

field_name value created_at
name1 val1_1 2017/3/26
name2 val2_1 2017/3/26
name1 val1_2 2017/3/27
name1 val2_2 2017/3/27

2017/3/27 output data

field_name new_value old_value updated_at
name1 val1_2 val1_1 2017/3/27
name2 val2_2 val2_1 2017/3/27
bcipolli commented 7 years ago

Thank you @hiroyuki-sato for your hard work understanding. I will try with a table too!

2017/03/26 input data

field_name value created_at updated_at
name1 val1_1 2017/01/01 2017/01/01
name2 val2_1 2017/03/26 2017/03/26

2017/03/27 input data (first row didn't change. second row updated. third row added).

field_name value created_at updated_at
name1 val1_1 2017/01/01 2017/01/01
name2 val2_2 2017/03/26 2017/03/27
name3 val3_2 2017/03/27 2017/03/27

2017/03/27 output data (to append to log)

field_name old_value new_value updated_at
name2 val2_1 val2_2 2017/03/27
name3 NULL val3_2 2017/03/27
hiroyuki-sato commented 7 years ago

Probably, It's difficult to load data with Embulk only.

Maybe you need to join table like the following.

My first idea is to execute join statement on the MySQL side like the following.

in:
  type: mysql
  query: |
    select 
      today.field_name
      ,yesterday.value as old_value
      ,today.value as new_value
      ,today.updated_at
    from
      table1 today
      ,table1 yesterday
    where
      today.field_name = yesterday.field_name,
      and today.updated_at > '2017-03-27'
      and today.updated_at < '2710-03-28'
      and yesterday.updated_at > '2017-03-26'
      and yesterday.updated_at < '2017-03-27'
  # ....
out:
  type: redshift
  mode: replace
  # ...

Maybe, you can use Liquid on the query parameters.

An alternative idea is to execute the following on the Redshift side with ETL engine.

I'll let you know If I find new idea.

bcipolli commented 7 years ago

Thank you @hiroyuki-sato . I do not have yesterday's data on the MySQL side, as our production database does not record changes. I agree with your second idea to do in REdshift.

I wonder if this is a custom code change for me to make to the Redshift output plugin, or if it's something to create a PR for. It doesn't seem to fit into the filter plugin logic, so I guess it'd have to stay in the output plugin...

Something like a new set of config params:

out:
  type: redshift
  track_changes:
    table: changes_table_name
    columns:
      - field_name

...

Or perhaps, it can be its own output type. Are we able to specify multiple outputs per config yaml file?

hiroyuki-sato commented 7 years ago

Extend output-plugin

If you want to extend embulk-output-redshift, You can discuss it on the embulk-output-jdbc project.

This plugin based on mode (insert, replace, merge...). If you create a PR, It will be plan A or B.

But I can't image which is better for your needs.

Multiple outputs

Multiple outputs do not implement yet. (#206)

Off topic.

Just my idea. How about this flow?

| field_name | value   | created_at |
|------------|---------|------------|
| name1      | val1_1  | 2017/3/26  |
| name2      | val2_1  | 2017/3/26  |
| name1      | val1_2  | 2017/3/27  |
| name1      | val2_2  | 2017/3/27  |
| field_name | new_value   | old_value | updated_at |
|------------|-------------|-----------|------------|
| name1      | val1_2      | val1_1    | 2017/3/27  |
| name2      | val2_2      | val2_1    | 2017/3/27  |  

If this scenario is working well, Maybe you can use the following configuration.

out:
  type: redshift
  table: intermediate_table
  mode: insert
  after_load: |
    drop final_table;
    insert into
      final_table
    from
      intermediate_table yesterday
      , intermediate_table today
    where
      join_statement.

I'm not sure after_load work well with insert mode.

If you can't after_load you can ETL engine like digdag

bcipolli commented 7 years ago

@hiroyuki-sato Thank you for your suggestions. I hope to try these out over the weekend! I will let you know how it goes :)

bcipolli commented 7 years ago

OK, finally some time to work on this, this weekend :) Will update here when I've made some progress!

bcipolli commented 7 years ago

Yes, this works great! The only issue is to write the JOIN statement with all of the relevant columns. I can do it manually (list out the columns with values we're watching for changes), but I'd prefer to do it programmatically. I will look into the REDSHIFT docs... I have some idea.

bcipolli commented 7 years ago

OK @hiroyuki-sato, I have this working. It is not pretty, but I have some wrapper scripts to help me generate the YAML.


in:
  type: mysql
  host: 127.0.0.1
  port: 8442
  user: xx
  password: "yy"
  database: zz
  fetch_rows: 100000
  query: |
    SELECT
      columns
    FROM table
    WHERE conditions

out:
  type: redshift
  host: 127.0.0.1
  port: 8655
  user: other_user
  password: "other_password"
  database: db
  schema: schema
  table: table___new__
  access_key_id: id
  secret_access_key: secret
  iam_user_name: user
  s3_bucket: bucket
  s3_key_prefix: temp/redshift
  mode: replace

  after_load: |
    INSERT INTO schema.field_changes(schema_name, table_name, row_id, field_name, old_value, new_value)

      SELECT
        'schema',
        'table',
        ol.id,
        'field',
        CAST(ol.field AS TEXT),
        CAST(nw.field AS TEXT)
      FROM schema.table ol
      JOIN schema.table___new__ nw
        ON ol.id = nw.id
      WHERE ol.field != nw.field
        OR ol.field IS NULL and nw.field IS NOT NULL
        OR nw.field IS NULL and ol.field IS NOT NULL

    UNION

... (more select fields)
 ;
    DROP TABLE schema.table;
    CREATE TABLE  schema.table AS
        SELECT * FROM schema.table___new__;
    DROP TABLE schema.table___new__;

Let me know what you think...

bcipolli commented 7 years ago

Two issues with the above:

Both have simple workarounds (cast boolean to long, make sure the ETL runs twice before adding a column for tracking changes), but thought it's worth mentioning.

hiroyuki-sato commented 7 years ago

That's great!!

It thought that it is better to use ETL software (ex. Digdag) in your after_load part. I've never thought to use after_load for that case, But It may be OK.