awslabs / aws-sdk-kotlin

Multiplatform AWS SDK for Kotlin
Apache License 2.0
396 stars 48 forks source link

ReactiveStreams API Adapter for S3 ByteStream #612

Closed hantsy closed 12 months ago

hantsy commented 2 years ago

Describe the feature

Currently, the S3Client upload and get support ByteStream, when using it in a Reactive application, it is not so easy to convert a reactive streams type to ByteStream.

Is your Feature Request related to a problem?

No

Proposed Solution

I would like use the ReactiveStreams compatible Publisher<ByteBuffer>, esp, in Spring WebFlux/Reactor, use the specific Flux and DataBuffer aka Flux<DataBuffer>, also consider RxJava3/SmallRye Munity

Describe alternative solutions or features you've considered

Acknowledge

AWS Kotlin SDK version used

0.15.2-beta

Platform (JVM/JS/Native)

JVM

Operating System and version

Windows 10

aajtodd commented 2 years ago

@hantsy Thanks for the feature request.

NOTE: There is a rather large limitation from S3 though that requires a known Content-Length. This would require that you know the total size that will be produced by a Flow/Publisher at the time of making the request. This may or may not be an issue for you depending on your use case.

See https://github.com/aws/aws-sdk-net/issues/1095 for relevant discussion around this limitation.


Indeed today you would need to create your own adapter from Flow/Publisher -> ByteStream. I've produced a working sample using Flow<ByteArray> you can hopefully extrapolate from below that does this (assuming the limitation called out above is workable for your use case).

The key to marrying the two together is to collect the producer in a coroutine and write the individual buffers to an SdkByteChannel and use that channel as a ByteStream.OneShotStream.

/**
 * Dummy producer
 */
fun producer(content: ByteArray, cnt: Int): Flow<ByteArray> = flow {
    repeat(cnt) {
        println("emitting buffer $it")
        emit(content)
        delay(1.seconds)
    }
}

/**
 * Collect the [producer] flow and emit it as a [ByteStream.OneShotStream]
 *
 * @param producer the publisher
 * @param contentLength the total content length that producer will emit
 * @return a [ByteStream] that can be used for S3 operations
 */
fun CoroutineScope.byteStreamFlow(
    producer: Flow<ByteArray>,
    contentLength: Long
): ByteStream {
    val chan = SdkByteChannel()
    launch {
        producer
            .onCompletion { cause -> chan.close(cause)    }
            .collect {
                println("writing buffer")
                chan.writeFully(it)
            }
    }

    return object : ByteStream.OneShotStream() {
        override val contentLength: Long? = contentLength
        override fun readFrom(): SdkByteReadChannel = chan
    }
}

fun main(): Unit = runBlocking {
    val bucketName = "<YOUR-BUCKET-NAME>"
    val keyName = "<YOUR-KEY-NAME>"

    val dummyContent = "Reactive ByteStream example".encodeToByteArray()
    val cnt = 10
    val contentLength = (dummyContent.size * cnt).toLong()

    S3Client.fromEnvironment{
        sdkLogMode = SdkLogMode.LogRequest + SdkLogMode.LogResponse
    }.use { s3 ->

        // S3 requires Content-Length header for PutObject or UploadPart
        val bodyStream = byteStreamFlow(producer(dummyContent, cnt), contentLength)

        val resp = s3.putObject {
            bucket = bucketName
            key = keyName
            body = bodyStream
        }

        println(resp)
    }
}
hantsy commented 2 years ago

Currently, I am using Spring WebFlux multipart feature to upload a file to S3, the uploaded file FilePart include a content method (return a Flux<DataBuffer>) to retrieve the file content.

I convert Flux<DataBuffer> into ByteArray, and use S3Client ByteStream.readBytes to read it it. On my testing environment (LocalStack docker and a text file), it works well.

suspend fun mergeDataBuffers(dataBufferFlux: Flux<DataBuffer>): ByteArray {
        return DataBufferUtils.join(dataBufferFlux) // merge all databuffer(`Flux`) to a single dataBuffer(`Mono`).
            .map { dataBuffer ->
                val bytes = ByteArray(dataBuffer.readableByteCount())
                dataBuffer.read(bytes)
                DataBufferUtils.release(dataBuffer)
                bytes
            }
            .awaitSingle()
    }

The bad is it will read all bytes in memory. Ideally, use a Flow to emit data(bytearray, bytebuffer) to S3 dynamically.

I will try your solution to use Flow in the whole progress, thanks.

hantsy commented 2 years ago

Currently using my solution, text file works, the image(binary file) download failed.

hantsy commented 2 years ago

Unlike Spring traditional MultipartFile from which we can read the file size directly. The new Spring reactive Part/FilePart(Upload files) does not include a method/field to get the content length.

ianbotsf commented 2 years ago

@hantsy I'm sorry to hear that downloading failed. Can you please provide your complete download code and the exception & stack trace you encountered?

hantsy commented 2 years ago

Currently I am using the following codes to get files from s3 service and write to http response in a Spring Webflux/Kotlin Coroutines appalication.

val request = GetObjectRequest {
    bucket = bucketName
    key = objectKey
}

// read the object into byte array.
s3client.getObject(request) { it.body?.toByteArray() 

...

// in spring controller.
val buffer = DataBufferUtils.read(
    ByteArrayResource(bytes), // the byte array retrieved from S3
    DefaultDataBufferFactory(),
    1024
).log()

// write to HTTP response
return exchange.response.writeWith(buffer).awaitSingleOrNull()
ianbotsf commented 2 years ago

I assume you're storing the result of getting the byte array into a value called bytes?

val bytes = s3client.getObject(request) { it.body?.toByteArray() }

If so, that looks like the correct way to read the entire object into memory. What exception or problem are you seeing?

hantsy commented 2 years ago

This works with a LocalStack Docker and I tested it with a text file. But on the AWS, it is deployed by the CI and Devops, I have no enough permission to view all aws logs.

For my experience, maybe I have to set content type to my HttpResponse, I will verify it.

hantsy commented 2 years ago

I know reading the object into a byte array is a bad idea. But there is no simple way to convert between Flux<DataBuffer> and S3 ByteStream.

hantsy commented 2 years ago

Aws SDK for Java includes some async APIs(based on Java 8 CompletableFuture), I hope this Aws SDK for Kotlin will include built-in Kotlin Coroutines/Flow(or ReactiveStreams API) for the async support.

ianbotsf commented 2 years ago

It should be possible to adapt a ByteStream into a Flow. For example:

const val bufferSize = 4096

suspend fun main() {
    val s3 = S3Client.fromEnvironment { }

    val req = GetObjectRequest {
        bucket = "some-bucket"
        key = "some-key"
    }

    s3.getObject(req) { resp ->
        val stream = resp.body as ByteStream.OneShotStream
        val flow = produceFlow(stream.readFrom())
        consumeFlow(flow)
    }

    println("Complete!")
}

fun produceFlow(reader: SdkByteReadChannel): Flow<ByteArray> {
    val buffer = ByteArray(bufferSize)
    return flow {
        var bytes = reader.readAvailable(buffer)
        while (bytes != -1) {
            if (bytes > 0) {
                emit(buffer.copyOfRange(0, bytes))
            }
            bytes = reader.readAvailable(buffer)
        }
    }
}

suspend fun consumeFlow(chunks: Flow<ByteArray>) {
    var total = 0
    RandomAccessFile("/tmp/chunk-download.zip", "rw").use { file ->
        chunks.collect { chunk ->
            println("Received a ${chunk.size} byte chunk, writing to file (written $total bytes so far)")
            file.write(chunk)
            total += chunk.size
        }
        println("Finished writing file, wrote $total bytes")
    }
}

This code sample emits individually-allocated buffer chunks into a Flow in produceFlow. It then writes those chunks to a RandomAccessFile in consumeFlow. Note that the consuming code needs to execute in the context of the getObject response lambda (otherwise, the stream will be freed and the bytes will no longer be available).

github-actions[bot] commented 12 months ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.

hantsy commented 9 months ago

I tried to apply the Flow<ByteArray>.toByteStream, and ByteStream.toFlow(), failed, check the branch of the example project: https://github.com/hantsy/aws-sdk-kotlin-spring-example/pull/6,

See related discussions: https://github.com/awslabs/aws-sdk-kotlin/discussions/1127