dhatim / fastexcel

Generate and read big Excel files quickly
Other
672 stars 122 forks source link

Support Reactive Programming #134

Open ghahramani opened 3 years ago

ghahramani commented 3 years ago

To use it in a reactive application, you need to do a nasty trick for the output stream and create your own like below (My example is returning the ByteArray as soon as ready to the HTTP call so it is a streaming of excel data instead of writing to the file) and then attached it to your upstream and schedule it on a parallel thread. It would be very nice to support the reactive programming either RxJava or Reactor project. For instance, the workbook can return Flux or Flowable depends on the project and not relying on the output stream, which is a feature of blocking applications.

This is my output stream I do a workaround to make it work in reactive programming

class PublisherOutputStream(
    private var maxSize: Int = 1 * 1024 * 1024,
    private var flushTimeout: Duration = Duration.ofMillis(50)
) : OutputStream() {

    private var publisher = Sinks.many().unicast().onBackpressureBuffer<Byte>()

    fun configurePublisher(): Flux<DataBuffer> = publisher
        .asFlux()
        .bufferTimeout(maxSize, flushTimeout)
        .map { bytes ->
            val buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(bytes.size)
            buffer.write(bytes.toByteArray())
            buffer
        }
        .cast(DataBuffer::class.java)

    override fun write(b: Int) {
        publisher.tryEmitNext(b.toByte())
    }

    override fun close() {
        publisher.tryEmitComplete() //which should trigger the clean up of the whole thing
    }

}

And then I have to use this to make it work with my application

Flux.merge(
    outputStream
        .configurePublisher()
        .subscribeOn(Schedulers.newParallel("ReaderThread")),
    repository
        .findAll()
        .limitRequest(MAX_ROWS.minus(1).toLong())
        .doFirst {
            val headerRowIndex = rowId.getAndIncrement()
            columns.forEachIndexed { index, name ->
                sheet.value(headerRowIndex, index, name)
            }
            sheet.flush()
        }
        .map { item -> mapper.convertValue<Map<String, String>>(item) }
        .doOnNext { item ->
            val currentRow = rowId.getAndIncrement()
            columns.forEachIndexed { index, name -> sheet.value(currentRow, index, item[name]) }
            sheet.flush()
        }
        .subscribeOn(Schedulers.boundedElastic())
        .then(Mono.empty())
)

As you can see it is too much work and it is very nasty and is not really following reactive principals. But if the workbook returns a Flux or Flowable, it will be very easy to add it to your pipeline and make it work.

rzymek commented 3 years ago

Could you propose how a reactive fastexcel API could look like?

ghahramani commented 3 years ago

So basically,

Reader: The API needs to expose a publisher (Something like Flux<DataBuffer>) then whoever wants to read it, will subscribe to it and receive the DataBuffer or something similar to it which every signal will be a row or really just a partial binary data of the XLSX without determining rows or columns

Writer: It can be very similar to what I have in my example above. The API needs to expose a sink (any type) Sinks.many().unicast().onBackpressureBuffer<DataBuffer>() that the developer will push to it via publisher.tryEmitNext(b.toByte())

This is obviously is just a proposal, it needs to think more in detail about it but it could be very similar to what Spring framework (Spring Boot) is providing when you want to upload a file to a reactive server.