Open tychoish opened 9 months ago
It is a little frustrating to have to iterate through a stream, to create record batch iterators to pass to lance which then get converted back into streams. It's not a big deal, and thankfully it's easy to do, and probably doesn't matter too much, but it is a bit awkward for anyone who's using DataFusion. I think an interesting alternative would be to have an alias for Stream
(which wouldn't have Sendable's schema() method, and so you could use a Sendable, and also lance could provide a conversion from RecordBatch iterators. I think, however, that while the ergonomics and extra code is a little annoying, there is a work around.
Yeah, I think we might want to switch to an API that looks more like:
fn write(...) -> DatasetWriter { ... }
struct DatasetWriter { ... }
impl DatasetWriter {
async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { ... }
async fn write_stream(&mut self, stream: SendableRecordBatchStream) -> Result<()> { ... }
async fn write_reader(&mut self, reader: impl RecordBatchReader) -> Result<()> { ... }
async fn close(self) -> Result<()> { ... }
}
That would make it much more flexible IMO.
Because Dataset::write() is a constructor and cannot cannot use an object_store constructed outside of the write method, and the append() method cannot handle the case when the table doesn't exist, if you have an object_store configured, but don't have any data, there's no way to write data.
I know the object_store part is kind of a mess. We define our own ObjectStore
that sometimes wraps and sometimes bypasses object_store::ObjectStore
, but I don't think we've yet bothered to come up with a coherent API for constructing our ObjectStore
. Open to more ideas here. If I have time I hope to refactor that one day.
fn write(...) -> DatasetWriter { ... }
struct DatasetWriter { ... }
impl DatasetWriter {
async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { ... }
async fn write_stream(&mut self, stream: SendableRecordBatchStream) -> Result<()> { ... }
async fn write_reader(&mut self, reader: impl RecordBatchReader) -> Result<()> { ... }
async fn close(self) -> Result<()> { ... }
}
I might make the following modifications to this:
u64
or a usize
, unless this is onerous.write
method in favor of Dataset::writer(self) -> DatasetWriter
or DatasetWriter::new()
(or a builder option?. I'm open to working on this if there's a design that people are agreed upon.
I don't think we've yet bothered to come up with a coherent API for constructing our ObjectStore. Open to more ideas here.
WriteParams
the ability to pass in an object store. https://github.com/lancedb/lance/compare/main...tychoish:lance:optional-object-store I don't love it, but it definitely makes things easier.I think making these functions both async and having a close method (and having the close method be async)
At the very least, the write_batch()
method might be called multiple times. The intention is that it could be used for different kind of stream you wanted to write like:
let mut writer = write_dataset(...)?;
while let Some(batch) = my_stream.next().await.transpose()? {
writer.write_batch(batch).await?;
}
writer.close.await?;
So close()
is there to say "I'm done". But I think you could make the methods that take iterators or streams just close out as part of their operation. That's fine with me.
conversely we could implement the destructor such that any cleanup is done at this point
I would, but destructors are completely sync so they aren't appropriate here where we need to do some IO (write the file footer(s)) when we close the writer. In the future, when Rust supports async destructors we might be able to do this.
streams and iterators give us the ability to control batch size
I'd caution against coupling the in-memory batch size to the batch size on disk. The desired batch size is almost always different, and I think the writer should buffer and control the batch size it is writing in all cases.
I think I would make a method like close be sync? I'm new enough to this async scene that I might be weird, but you sort of want things that handle closing/blocking/flushing to like actually execute when you say.
In general, my rule is, if a function does any IO, then it should be async. As far as it "executes when you say", it will execute when you do .await
, so I don't think there's any ambiguity there.
I think write-methods should indicate number of rows written returned as a u64 or a usize, unless this is onerous.
Yeah I would even go as far as to have it rust some WriteMetrics
struct:
WriteMetrics {
rows_written: usize,
bytes_written: usize,
files_written: usize,
}
But might be overkill.
I would forego a top-level
Yeah associating as a static method also makes sense to me.
I'm open to working on this if there's a design that people are agreed upon.
That would be nice :) I likely won't have time to work on this for a while.
At the very least, the write_batch() method might be called multiple times. The intention is that it could be used for different kind of stream ...
So close() is there to say "I'm done". But I think you could make the methods that take iterators or streams just close out as part of their operation. That's fine with me.
Are there requests for this? Right now you can just do RecordBatchIterator::new()
(or something similar, I played around with this a bit ago) for the cases when you actually need to send one, but in every other case you're fine. You could also have the not-iterator, not-stream version be write_batches(&[RecordBatch])
, and treat it similarly. I think the case for "I only want to add a single record batch but I want to call it many times before closing the writer is unlikely be a thing that users are going to get correctly or have consistent results.
I think it would be reasonable to have it not use the SendableRecordBatchStream
type and rather just using a Stream<Item = RecordBatch>
(or impl Into<Stream<...>>
etc.) for the same, and then you cover the "what about random stream types" without any loss.
My objection is mostly, that it's hard to remember to consistently get the close operation to work correctly (you could forget to call it, the program could panic for unrelated reasons between the write-returns and close completion,) and in that window, you (potentially) open yourself up for all manner of dirty-reads, or dropped writes, and if we can avoid it in some way, that it's worth it.
I take your point about the async/sync stuff and the close method.
Yeah I would even go as far as to have it rust some WriteMetrics struct:
This feels like overkill for me, a bit, but I'm definitely open to it, and we can layer it in later (start with records, change it to the other stuff later)
Are there requests for this? Right now you can just do RecordBatchIterator::new() (or something similar, I played around with this a bit ago) for the cases when you actually need to send one, but in every other case you're fine. You could also have the not-iterator, not-stream version be write_batches(&[RecordBatch]), and treat it similarly. I think the case for "I only want to add a single record batch but I want to call it many times before closing the writer is unlikely be a thing that users are going to get correctly or have consistent results.
+1. I would defer this kind of API until requested. If needed I think we could treat it like a "multipart write". It could look like this...
fn write(...) -> DatasetWriter { ... }
struct IncrementalWrite { ... }
impl IncrementalWrite {
async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { ... }
}
struct DatasetWriter { ... }
impl DatasetWriter {
async fn start_incremental_write(&self) -> Result<IncrementalWrite> { ... }
async fn commit_incremental_write(&self, write: IncrementalWrite) { ... }
async fn write_stream(&mut self, stream: SendableRecordBatchStream) -> Result<()> { ... }
async fn write_reader(&mut self, reader: impl RecordBatchReader) -> Result<()> { ... }
}
I like the idea of leaving the incremental write out of the original implementation until someone asks for it.
As an API, I would expect something more like:
struct IncrementalWrite { ... }
impl IncrementalWrite {
async fn write(&mut self, batch: &RecordBatch) -> Result<()> { ... }
async fn close() -> Resul<usize> { ... }
}
struct DatasetWriter { ... }
impl DatasetWriter {
async fn open_incremental_writer(&self) -> Result<IncrementalWriter> { ... }
async fn write_stream(&mut self, stream: SendableRecordBatchStream) -> Result<usize> { ... }
async fn write_reader(&mut self, reader: impl RecordBatchReader) -> Result<usize> { ... }
}
I imagine we'd implement the incremental writer as a wrapper around a deque, but if you had in mind to do something with a channel and a background task or similar that might also be interesting.
I've been playing around with the implementation, and a point of confusion for me (and I have more context than many, perhaps) is the difference between the upstream types and the lance types (given similar naming; I'm thinking mostly about Schema
, though the object store type has similar effects), which I think gets harder and weirder when we put DF/arrow types in public interfaces. Thoughts:
write_reader
(potentially) takes an IntoIterator<RecordBatch>
or similar rather than a record batch rader? write_stream
takes a narrower Into<Stream<Item=Result<RecordBatch>> + Send>
or similar, because the stuff that DF wraps SendableStream/RecordBatchStream is sort of redundant, and therefore likely to confuse: from a user perspective this will all. Dataset
but be exposed using other avenues.
I am trying to write some tools that involve writing data (given some
SendableRecordBatch
streams) and I've run into a number of rough spots with the API, and I wanted toIt is a little frustrating to have to iterate through a stream, to create record batch iterators to pass to lance which then get converted back into streams. It's not a big deal, and thankfully it's easy to do, and probably doesn't matter too much, but it is a bit awkward for anyone who's using DataFusion. I think an interesting alternative would be to have an alias for
Stream<RecordBatch>
(which wouldn't have Sendable'sschema()
method, and so you could use a Sendable, and also lance could provide a conversion from RecordBatch iterators. I think, however, that while the ergonomics and extra code is a little annoying, there is a work around.Because
Dataset::write()
is a constructor and cannot cannot use an object_store constructed outside of the write method, and theappend()
method cannot handle the case when the table doesn't exist, if you have an object_store configured, but don't have any data, there's no way to write data.This is a more minor quibble but the
with_storage_options
on the builder takes a URI, but you have to start withfrom_uri
to get ab builder, and indeed these are paths that are used for somewhat different purposes, butbuild_object_store()
on the builder uses the URI from the root of the builder rather than taking a URI as an argument. It also is a bit weird that it's aurl:Url
when it just gets casted to a string internally and handled as a path.I've been tinkering around with various ideas:
write
construction moves into the Builder as a finalier; and we add awrite()
method that requires a constructedDataset
, but has similar semantics.insert
method that does whatwrite()
does but on a constructed dataset,.append()
respect theappend/create
option rather than overriding it.WriteParams
(which would error on any method on a constructed dataset, but be used preferentially on the write method.The final option is very surgical, I think I'm personally most partial to the
insert
option, but a lot of this is preference, so I wanted to lay it out and see.