apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.94k stars 2.08k forks source link

flink:FlinkSink support dynamically changed schema #4190

Open naisongwen opened 2 years ago

naisongwen commented 2 years ago

Now,FlinkSink requires developers to transfer the schema parameter to build DataStream, which means once the schema given,then the TableSchema will be determinded, canot be changed for ever ,but in practical scenarios,some data format,for example ,JSON,the tranformed schema is unfixed with field added,deleted or renamed and so on,so we want to change the mapped TableSchema while the DataStream is running.

Shane-Yu commented 2 years ago

I also met this problem in CDC scenarios

hililiwei commented 2 years ago

To summarize, your expectation is to dynamically update Iceberg table schema based on the schema of the data flow when the schema of the data flow does not match the Iceberg table?

stevenzwu commented 2 years ago

I am not sure this would the universally desired behavior. if data stream contains incompatible schema change (like removing a required field), it will break the downstream consumers.

there is a value of automatically syncing input data schema to Iceberg table schema (for compatible schema evolution). Personally, I would like to keep it at the control plane, which would be more natural if there is a schema registry for tracking input data schema change. Control plane can then update Iceberg table schema and restart the Flink job to pick up new Iceberg table schema for write path.

It is tricky to support in automatic schema sync in the data plane. There would be parallel Iceberg writers (like hundreds) for a single sink table. Coordinating metadata (like schema) change is very tricky.

naisongwen commented 2 years ago

right

lintingbin commented 2 years ago

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

hililiwei commented 2 years ago

I think we should take this feature seriously. In fact, in the Flink CDC, HUDI already supports dynamic table schema changes without restarting tasks. It captures schema changes to the table and updates it in real time.

When synchronizing data using flink CDC, it is unacceptable to restart a task if the table schema changes.

lintingbin commented 1 year ago

We have internally implemented modifying columns, adding columns after the last column, and deleting the last column without restarting the flink program. Our processing logic is as follows: DataStream<Map<String, String>> -> map -> DataStream -> FlinkSink. In the implementation of map, we will refresh the table schema to generate the latest RowData after each checkpoint is done. At the same time, we have also modified the implementation of FlinkSink. Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file. Is anyone interested in this feature? I can contribute our modifications to FlinkSink if needed.

hililiwei commented 1 year ago

I personally look forward to seeing your PR.

stevenzwu commented 1 year ago

@lintingbin2009 it might be helpful to describe the solution at high-level design in this issue or some doc.

Every time newAppender is called, we will refresh the table schema and use the latest table schema to write the data file

This sounds expensive and may not work well at scale. if every writer task needs to poll table for every file, it can create a lot of load on the Iceberg metadata system. Ideally, the table schema polling and change should done by operator coordinator.

lintingbin commented 1 year ago

@hililiwei @stevenzwu https://github.com/apache/iceberg/pull/5425 This is my PR. Hope to have some suggestions. Now we test in an environment with a parallelism of about 40. The checkpoint time is the same as before the dynamic refresh schema is not added.

leichangqing commented 1 year ago

We also met this problem in our scenarios. We modify schema of the sink iceberg table frequently. Now I try to get the table schema through iceberg table refresh api to dynamically construct a DataStream to add new columns. It is possible to add new columns, but writing new columns to iceberg sink does not take effect, because sink does not support dynamic update of schema.If iceberg flink sink writer can add iceberg table refresh function support, it will be more convenient to use. After all, it takes a minute or two to restart flink now. @stevenzwu

I have interesting on this, how to contact with u about dynamical schema

lintingbin commented 1 year ago

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far.

image

Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.
stevenzwu commented 1 year ago

It will be a good starting point if someone likes to create a design doc on how to solve this problem in a general and scalable way

lintingbin commented 1 year ago

@stevenzwu This is a doc I wrote, you can give your opinion, and I will modify it.

FranMorilloAWS commented 4 months ago

Is there any news on this?

Ruees commented 3 months ago

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

Can there be sample code to demonstrate how to use it?

Ruees commented 3 months ago

@leichangqing You can refer to the last two commits of my branch https://github.com/lintingbin2009/iceberg/tree/flink-sink-dynamically-change. We have put this part of the code in our own production environment and executed it for half a year, and it seems that there is no problem so far. image Corresponding to 1, 2, 3 in the above picture, you need to prepare:

  1. There needs to be a broadcast node that can subscribe to your schema changes.
  2. The data processing node can generate RowData according to the latest schema processing data.
  3. This is based on the above code modification so that the iceberg writer node can receive the latest schema and apply it.

I tried to pull the Flinksink related modification code for the first commit and added a column at the end using Java API in the map operator, but the result was not successful. Even after inserting the data successfully, the column at the end was still empty

lkokhreidze commented 3 months ago

Just commenting for visibility that this feature would be extremely useful for our use case too. It's similar to CDC use case but instead driven by the services emitting events. I'd also be happy to lend a hand, but at the moment, it's not clear what the state is. Is the proposed design agreed upon, or does it need re-iteration?

pvary commented 3 months ago

I think it is not trivial to implement this feature, as the schema of the RowData objects which are the input of the Sink is finalized when the job graph is created. To change the schema one need to regenerate the job graph, essentially restarting the job (calling the main method). There might be some way to work around this, by changing the input to records where the schema is embedded to the records (performance loss), or getting the schema from an outside source (additional external depenency), but this would need some deeper changes in the Sink. Also care should be taken, how to synchronize the table schema refresh throughout the tasks when the changes are detected...

As a workaround, we created our own schema check before converting the input to RowData, and throw a SuppressRestartsException when changes are detected. We used Flink Kubernetes Operator to restart the job from failed state, using kubernetes.operator.job.restart.failed. The main method refreshes the table and the new job instance is started with the new schema.