aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.18k stars 845 forks source link

Line-by-line publisher of S3 object content #1253

Open martin-tarjanyi opened 5 years ago

martin-tarjanyi commented 5 years ago

Does the current version of the SDK support returning the content of an S3 object reactively in a Publisher line by line similar to for example Athena pagination?

zoewangg commented 5 years ago

You can use S3AsyncClient and provide your own AsyncResponseTransformer.

Example: https://github.com/aws/aws-sdk-java-v2/blob/31a17674db790f13173c66278e909c1775f4a5c4/services/s3/src/it/java/software/amazon/awssdk/services/s3/GetObjectAsyncIntegrationTest.java#L104

martin-tarjanyi commented 5 years ago

I see. And do you have plan to add an async response transfomer which exposes the ByteBuffer publisher?

I'm thinking about something like this:

public class ByteBufferSdkPublisherAsyncResponseTransformer<T extends SdkResponse>
        implements AsyncResponseTransformer<T, PublisherWrapper<T, ByteBuffer>>
{
    private volatile CompletableFuture<SdkPublisher<ByteBuffer>> cf;
    private volatile T response;

    @Override
    public CompletableFuture<PublisherWrapper<T, ByteBuffer>> prepare()
    {
        cf = new CompletableFuture<>();

        return cf.thenApply(publisher -> new PublisherWrapper<>(publisher, response));
    }

    @Override
    public void onResponse(T response)
    {
        this.response = response;
    }

    @Override
    public void onStream(SdkPublisher<ByteBuffer> publisher)
    {
        cf.complete(publisher);
    }

    @Override
    public void exceptionOccurred(Throwable error)
    {
        cf.completeExceptionally(error);
    }
}

Although, the ByteBuffer inside the CompletableFuture is not very nice to say the least. Instead I would imagine a method (getObjectByteBufferStream) in the S3AsyncClient class which could return SdkPublisher<ByteArray>.

What do you think?

zoewangg commented 5 years ago

Hmm, can you tell us a bit more of your use case? We might consider adding it if that's a common use case.

martin-tarjanyi commented 5 years ago

Sure. I have a text file in s3 which contains separate/individual data in each line like this:

useful information 1
useful information 2
useful information 3
...

The file is quite big, so I wouldn't like to load the whole file into memory. Instead, I would like to process it reactively line by line with backpressure.

iconara commented 5 years ago

I'm struggling with this too, I have code that does some S3 operations that I want to be able to do asynchronously, but for some of them the result needs to be handed off to for example Commons CSV that expects a Reader. I have written an AsyncResponseTransformer implementation that uses a LinkedBlockingQueue internally to present a InputStream interface, but I'm struggling with how to use the API correctly.

Specifically, when subscribing and requesting more, what is the unit of the Subscription#request call? Is it bytes? Is it calls to #onNext? Is there any way to know or control the size of the chunks passed to #onNext? I haven't been able to figure this out.

For me, it would obviously be much easier if there was an SDK implementation that would give me an InputStream or Reader from an asynchronous call, or like @zoewangg is requesting, a Publisher that managed buffering and produced lines, or custom sized chunks.

deepankerk commented 5 years ago

Does the current version of the SDK support returning the content of an S3 object reactively in a Publisher line by line similar to for example Athena pagination?

I was able to achieve line by line streaming using the below code:

ResponseInputStream responseInputStream = client.getObject(GetObjectRequest.builder() .bucket(s3Entity.getBucket().getName()) .key(s3Entity.getObject().getKey()) .build(), ResponseTransformer.toInputStream());

int linesRead = 0; try (BufferedReader reader = new BufferedReader(new InputStreamReader(responseInputStream, StandardCharsets.UTF_8))) { while (reader.readLine() != null) { linesRead++; } } catch (IOException e) { logger.log("IO Exception: "+e); } logger.log("Lines read: "+linesRead);`

This is not async though. I stumbled onto this issue looking for a better way to parse S3 files. More details here: https://github.com/aws/aws-sdk-java-v2/issues/1358

JosefGold commented 4 years ago

We have this use case as well, would be happy to have an implementation in the SDK

koziolk commented 4 years ago

@zoewangg Any update on when the feature described by @martin-tarjanyi will be added?

millems commented 4 years ago

Not at this time, but it's in our backlog. A PR is much easier to prioritize, if someone wanted to submit one for this feature.

Bennett-Lynch commented 2 years ago

Hi all,

We just released support for a new AsyncResponseTransformer: toPublisher(). This transformer makes it easier to directly consume a streaming-response payload (e.g., S3 GetObject) with async clients.

It returns a ResponsePublisher, which contains a reference to the SdkResponse returned by the service and a SdkPublisher<ByteBuffer>, which allows you to directly consume the streaming content.

This transformer is available starting in version 2.17.122. For more details and example usage, please see the Javadoc:

https://github.com/aws/aws-sdk-java-v2/blob/0e00a99bba0cc305a85969f156d1617ea7b708b6/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java#L147-L176

The ByteBuffer-stream could then be transformed into strings as follows:

SdkPublisher<ByteBuffer> bytePublisher = ...
SdkPublisher<String> strPublisher = bytePublisher.map(buf -> StandardCharsets.UTF_8.decode(buf).toString());

However, this does not buffer and split into lines. You would need to implement that transformation on your own.

Please let us know if you find a SdkPublisher<String> sufficient for this feature request or if you would still like to see an included line-by-line publisher. I think this would be a reasonable feature to include, as it would allow reading an S3 object similar to how the JDK allows reading a local file with the Files#lines(Path) API.

aniket-buku commented 2 years ago

Had anyone succeeded in doing the line by line publisher based on https://github.com/aws/aws-sdk-java-v2/issues/1253#issuecomment-1029500164 . I am not sure how to convert these buffers to single line. As I believe

SdkPublisher<String> strPublisher = bytePublisher.map(buf -> StandardCharsets.UTF_8.decode(buf).toString());

strPublisher might have any number of lines at a time that to with some incomplete ones.

ronytesler commented 1 year ago

any update?

arammkrtchyan commented 10 months ago

Bennett-Lynch if we go with the approach you suggested by transforming the byte buffer to lines may we end up having incomplete lines ?

loehnertz commented 7 months ago

In case you are using this with the Spring framework, they have the StringDecoder class as part of org.springframework.core, which does exactly what is missing here: Splitting an incoming reactive stream of bytes by some delimiters (line break characters by default). It takes care of buffering automatically.

You can use it like this together with the S3 SDK:

// The example uses WebFlux, but this works the same way with another RX framework.
private static Flux<String> splitByLine(Flux<ByteBuffer> buffers) {
    // Initialize this outside of a method ideally, as it can be final
    DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
    StringDecoder stringDecoder = StringDecoder.allMimeTypes();

    Flux<DataBuffer> dataBuffers = buffers.map(dataBufferFactory::wrap);
    return stringDecoder.decode(dataBuffers, ResolvableType.forClass(String.class), null, null);
}