ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.38k stars 601 forks source link

feat(flink): support converting Flink job outputs into in-memory dataframes (e.g. pandas, pyarrow) #6963

Open chloeh13q opened 1 year ago

chloeh13q commented 1 year ago

Is your feature request related to a problem?

Currently, the to_pandas() method in Flink backend calls pyflink's to_pandas() method underneath the hood. pyflink's to_pandas() method would hang until the job completes, which means that it won’t work for streaming jobs on unbounded sources &, therefore, we don’t have anything that would work ootb for continuous jobs. This is also confirmed by this thread.

A few notes related to this feature:

  1. It seems to me that pyflink'sto_pandas() hangs even with limit. We should talk to the pyflink maintainers and confirm whether this behavior is expected.
  2. Can we support some kind of periodic fetching of results, fetching based on time intervals, etc? pyflink does not support these ootb, so we'd have to implement support for it in Ibis.
  3. Can we return, e.g., a pyarrow RecordBatchReading, immediately without blocking? An iterator is pull-based; it won't solve the infinite unbounded data source problem entirely. We can get some kind of laziness and call__next__ and pull as it arrives, but it will require additional work for unbounded data sources.

Describe the solution you'd like

Support converting Flink job outputs into in-memory dataframes (e.g. pandas, pyarrow)

What version of ibis are you running?

6.1.0

What backend(s) are you using, if any?

Flink

Code of Conduct

mfatihaktas commented 8 months ago

It seems to me that pyflink's to_pandas() hangs even with limit. We should talk to the pyflink maintainers and confirm whether this behavior is expected.

Did we ever produce this behavior? If we did, how? Possibly using DataGen connector?

Can we support some kind of periodic fetching of results, fetching based on time intervals, etc? pyflink does not support these ootb, so we'd have to implement support for it in Ibis.

What would be the use case for periodic fetching of the table state (results)? Would not this be the same as creating a view of the table and then running a select query on it (periodically)?

Can we return, e.g., a pyarrow RecordBatchReading, immediately without blocking? An iterator is pull-based; it won't solve the infinite unbounded data source problem entirely. We can get some kind of laziness and call next and pull as it arrives, but it will require additional work for unbounded data sources.

We have added to_pyarrow_batches() for the Flink backend after this issue was created. Would that be doing what is suggested here? If it would be, then I guess we would need to test it with a continuous query and see if it allows pulling the table data "lazily".