avro-kotlin / avro4k

Avro format support for Kotlin
Apache License 2.0
198 stars 37 forks source link

Add alternative data provider to Sequence in AvroObjectContainer #256

Closed barbulescu closed 2 months ago

barbulescu commented 2 months ago

Is your feature request related to a problem? Please describe. We have several pipelines extracting data from Elasticsearch and providing it as a Coroutine Flow and we want to serialize entire data set into files.

More details:

Describe the solution you'd like I have no solution yet, but I will start looking over Avro4k code to try undestanding it.

Describe alternatives you've considered Migration to Avro4k v2 is on hold until a solution is available.

Additional context

fun <T> encodeToStream(
        schema: Schema,
        serializer: SerializationStrategy<T>,
        values: Sequence<T>,
        outputStream: OutputStream,
        builder: AvroObjectContainerBuilder.() -> Unit = {},
    ) 
barbulescu commented 2 months ago

Maybe something like:

    /**
     * Encodes the given data to the given output stream.
     *
     * Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
     */
    public fun <T> encodeToStream(
        schema: Schema,
        serializer: SerializationStrategy<T>,
        outputStream: OutputStream,
        builder: AvroObjectContainerBuilder.() -> Unit = {},
        dataProvider: DataWriter<T>.() -> Unit = {}
    ) {
        val datumWriter: DatumWriter<T> = KotlinxSerializationDatumWriter(serializer, avro)
        val dataFileWriter = DataFileWriter(datumWriter)
        try {
            builder(AvroObjectContainerBuilder(dataFileWriter))
            dataFileWriter.create(schema, outputStream)
            val dataWriter = DataWriter<T>(dataFileWriter)
            dataProvider(dataWriter)
        } finally {
            dataFileWriter.flush()
        }
    }

    /**
     * Encodes the given sequence to the given output stream.
     *
     * Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
     */
    public fun <T> encodeToStream(
        schema: Schema,
        serializer: SerializationStrategy<T>,
        values: Sequence<T>,
        outputStream: OutputStream,
        builder: AvroObjectContainerBuilder.() -> Unit = {},
    ) {
        encodeToStream(schema, serializer, outputStream, builder) {
            values.forEach {
                write(it)
            }
        }
    }

public class DataWriter<T>(private val delegate: DataFileWriter<T>) {
    public fun write(data: T) {
        delegate.append(data)
    }
}
Chuckame commented 2 months ago

To be sure I understood correctly, you currently have a Flow while the api uses a Sequence right?

Serializing to a single file from a Flow should not be accepted, as flows may be multithreaded while sequences aren't. Writing to a single file should be done on a single thread sequentially, or you may experience concurrency issues, the reason why the entrypoint only provides a Sequence (and not a list also to prevent OOMs).

Can you share the current code that you have with the v1 handling this flow to write to the object container file?

EDIT

I'm really curious about how you can write to this single file using non-sequential multi-threaded flows 🤔

With Flow, you need to manage the backpressure or to suspend the push when a write to the file is in-progress.

With Sequence (or with any Iterator), as it's blocking by definition, each in-progress write will block until it's fully written and will go to the next item.

Chuckame commented 2 months ago
public fun <T> encodeToStream(
        schema: Schema,
        serializer: SerializationStrategy<T>,
        outputStream: OutputStream,
        builder: AvroObjectContainerBuilder.() -> Unit = {},
+       dataProvider: DataWriter<T>.() -> Unit = {}
    )

This change won't work if you need to use suspendable functions as the builder is not suspendable, and making encodeToStream suspendable will pollute a bit the API. Also, Having 2 different lambdas as params will make the code unreadable.

Let's do it as before, returning a writer to write values. But now, you will need to close the writer to ensure flushing to the output stream. You'll still have to close the stream yourself.

public inline fun <reified T> AvroObjectContainer.openWriter(
    outputStream: OutputStream,
    noinline builder: AvroObjectContainerBuilder.() -> Unit = {},
): AvroObjectContainerWriter<T>

class AvroObjectContainerWriter<T> : Closeable {
    fun writeValue(value: T)
    fun close()
}

AvroObjectContainer.openWriter(fileStream).use { writer ->
    writer.writeValue(...)
}

Tell me if it's ok for you

barbulescu commented 2 months ago

Yes, having a writer would work best for us.

Thank you!

Chuckame commented 2 months ago

The release v2.1.0 including this change has been shipped, let me know if there is any issue!