Closed metesynnada closed 1 year ago
👍 I would very much like a feature like this too.
The use of SELECT
is also compelling to me because you can then filter and order, for example.
Some ideal features (doesn't have to be in the first implementation, but it would be nice if the API supported this):
@andygrove and @Dandandan do you have thoughts on what this feature might look like?
BTW the sqllogictest driver has some version of INSERT INTO AS SELECT support (not as full featured as this proposal): https://github.com/apache/arrow-datafusion/blob/bd645279874df8e70f1cd891ecea89577733b788/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs#L30
We will start working on this feature quite soon, so any help would be welcomed. Our team can provide initial design as well.
We will start working on this feature quite soon, so any help would be welcomed. Our team can provide initial design as well.
I am certainly interested in this feature and will standby to review designs and code
I terms of the API to support I would like to recommend we follow some other implementation's API than inventing our own. For example, perhaps we can use the COPY orders TO 'orders' (FORMAT PARQUET, PARTITION_BY (year, month));
style command described by DuckDB: https://duckdb.org/2023/02/13/announcing-duckdb-070.html
Using create external table
seems like it may make implementations more complicated for some reasons:
INSERT INTO
commands in a rowFor a format like CSV or NDJSON appending new data might be straightforward, but parquet doesn't really support append well.
Thanks a lot for your help! I believe we can consolidate the usages into a Google doc soon, which will also include a literature survey on DDLs & DMLs. Once we have everything in one place, we can review it together and decide on the best options for the use cases.
FYI I wrote up some thought related to catalog manipulations here https://github.com/apache/arrow-datafusion/issues/5291
Hi @alamb, I added a design document. cc @Dandandan & @mingmwang if you are interested as well. https://docs.google.com/document/d/19IhOklwkhIm09-3XPpZHf75frtxoQtMa0g-1QG8Da1c/edit?usp=sharing
Thanks @metesynnada -- https://docs.google.com/document/d/19IhOklwkhIm09-3XPpZHf75frtxoQtMa0g-1QG8Da1c/edit?usp=sharing looks great
I can't wait to help with this feature and use DataFusion to write resorted parquet files.
My major feedback is:
TableProvider
to support writingcc @avantgardnerio who has also been thinking about ways to support writing to tables. Not sure who else might be interested in this proposal too but it might be worth a note to the mailing list dev@arrow.apache.org and the ASF slack channel pointing at this ticket and asking for feedback.
Hello @alamb, thanks for all your work on this project. I've noticed that the current TableProvider API does not seem to support getting a mutable reference to a downcasted table via the as_any method. Is this expected, and if not do we expect the API to change to support mutability when this issue is complete?
Is this expected, and if not do we expect the API to change to support mutability when this issue is complete?
TLDR is I don't expect this to change. The TableProviders are wrapped in Arc
s and potentially shared across multiple SessionContext
s
To support mutability the TableProvider
implementation would need to implement "interior mutability" (for example have some sort of Mutex
or RwLock
to control access)
To support mutability the TableProvider implementation would need to implement "interior mutability"
I think this touches on a key point, there needs to be some sort of consistency/atomicity story here. Most users would likely expect that INSERT INTO
is atomic, i.e. a query sees all the inserted data or none of the inserted data. They may additionally have expectations with respect to transaction isolation / serializability.
Blindly appending to a CSV / JSON file without any external coordination will result in queries seeing partial or potentially corrupted data
One common approach is for new data to always be written to a new file, thus ensuring atomicity.
This basic approach can then be optionally extended with things like:
I think adding some pieces of functionality for this to DataFusion would be amazing, and may even be of interest to the delta-rs folks (FYI @roeap), but may benefit from having a more fleshed out catalog story first (#5291)
I agree with the atomicity. Aborted queries might result corrupted files.
In which cases do we use abort_multipart
for put_multipart
? Do you have a use case in another library for these APIs? Maybe I can iterate a design for such functionality in Datafusion.
In which cases do we use abort_multipart for put_multipart
In an ideal world, every time you wish to abandon a multipart upload... In practice Azure and GCP automatically cleanup incomplete multipart uploads after a given time, and S3 can be configured to have similar behaviour.
Do you have a use case in another library for these APIs
I believe delta-rs is making use of them. FWIW in the case of object stores the minimum PUT payload is 5MB, so if uploading files smaller than this there is no benefit over a regular PUT request.
Aborted queries might result corrupted files.
Aborted queries definitely could, but also the append operation itself is likely not atomic, as it may take multiple calls to the underlying write
primitive to completely flush the payload. In the intervening time between these writes any reader may see incomplete data, and any parallel writer would interleave its writes resulting in data corruption
Let's talk about all these in the sync-meeting. Some of the concerns I see here are definitely valid/important (e.g. atomicity of appends), but I also think some higher-level concerns are bleeding into the discussion of a lower-level mechanism. It is great to think ahead, but we should also be mindful of separation of concern/complexity.
Here is what I heard at our meeting (documenting here in case others are following along). Please correct me if I am wrong
TableProvider
) for "writing to a table / sink. This trait will allow other systems to implement whatever semantics they may wayThe INSERT INTO <...>
and COPY ...
functionality will be implemented in terms of the trait.
The trait should support both "streaming"
I missed the call, but this would be great for our use-case! Thanks guys for thinking of us.
Here is what I heard at our meeting (documenting here in case others are following along). Please correct me if I am wrong
- DataFusion should have some sort of Trait (separate from
TableProvider
) for "writing to a table / sink. This trait will allow other systems to implement whatever semantics they may way- The trait should support both "streaming" (incremental writing and appending) as well as writing complete files
- There will be some sort of implementation in DataFusion that allows writing to the existing file formats (e.g. parquet, csv, etc) that may or may not support appending (or may support appending when there is some external coordination) but this implementation will remain simple
The
INSERT INTO <...>
andCOPY ...
functionality will be implemented in terms of the trait.
I think this summarizes the meeting well. I made a POC asap on trait implementation.
Related discussion: https://github.com/apache/arrow-rs/pull/3791
I think adding some pieces of functionality for this to DataFusion would be amazing, and may even be of interest to the delta-rs folks (FYI @roeap)
This would be amazing indeed. In fact I am currently working on the full implementation of deltas optimistic commit protocol. And we'd be more then happy to adopt these new APIs as well as contribute to their implementation here. We also have users asking to support more catalogs, and adopting datafusion traits for this is always a great option.
Do you have a use case in another library for these APIs I believe delta-rs is making use of them.
Indeed we do use the multipart APIs.
Fantastic! We will have an initial implementation ready sometime next week.
I believe this is now complete, given @devinjdangelo 's recent work and the work from @metesynnada and @ozankabak earlier. Additional work is tracked in https://github.com/apache/arrow-datafusion/issues/6569
Is your feature request related to a problem or challenge? Please describe what you are trying to do. I want to create a table from a CSV(JSON or AVRO) file, execute a query and write the results into a CSV file.
Describe the solution you'd like If we support this on a logical plan level, this might be executed. It may also require enhancing catalog management.
Describe alternatives you've considered NA
Additional context NA