aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.19k stars 845 forks source link

S3 upload with BlockingOutputStream leads data corruption on writes using a shared byte array #4272

Open jonathanswenson opened 1 year ago

jonathanswenson commented 1 year ago

Describe the bug

Using BlockingOutputStreamAsyncRequestBody (via AsyncRequestBody.forBlockingOutputStream(...)) and sharing the byte array between subsequent writes to the output stream, leads to data corruption when uploading a stream to S3 using async java sdk.

at a high level the write pattern is as follows (full code snippet below).

BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320);
... make a request to s3

Random random = new Random(3470);
OutputStream outputStream = body.outputStream();
// single buffer used for all writes
byte[] buffer = new byte[1024];
for (int i = 0; i < 10; i++) {
  // fill this buffer between writes.
  random.nextBytes(buffer);
  outputStream.write(buffer, 0, bytesToWrite);
}

Expected Behavior

I expect that using re-using an byte array between writes to an OutputStream does not lead to corrupt data.

Current Behavior

The data written to the output stream does not match the data that is written to s3.

Reproduction Steps

gradle imports:

    implementation(platform("software.amazon.awssdk:bom:2.20.118"))
    implementation("software.amazon.awssdk:s3-transfer-manager")
    implementation("software.amazon.awssdk:s3")
    implementation("software.amazon.awssdk:sso")

    // https://mvnrepository.com/artifact/commons-codec/commons-codec
    implementation("commons-codec:commons-codec:1.16.0")
import org.apache.commons.codec.binary.Hex;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingOutputStreamAsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class BlockingOutputStreamTest {
  // not necessary for repro, just how I connect to AWS.
  String profileName = "<some profile name>";
  String bucket = "<some bucket name>";
  String algorithm = "SHA-256";

  @Test
  public void reUsedBufferTest() throws NoSuchAlgorithmException {
    S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
      .credentialsProvider(ProfileCredentialsProvider.create(profileName))
      .region(Region.US_WEST_2)
      .build();

    try (S3TransferManager transferManager = S3TransferManager.builder()
      .s3Client(s3AsyncClient)
      .build()) {

      // not 100% required, but it IS a race, and doesn't always happen on the first try.
      // this reproduction (10 chunks of 32 bytes) on my machine seems to always trigger it the first time.
      for (int iteration = 0; iteration < 30; iteration++) {
        String key = UUID.randomUUID().toString();
        System.out.println("key is: "+ key);

        BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320L);

        UploadRequest uploadRequest = UploadRequest.builder()
          .putObjectRequest(PutObjectRequest.builder()
            .bucket(bucket)
            .key(key)
            // attempting to manually set the digest values here causes the requests to fail.
            .build())
          .requestBody(body)
          .build();

        Upload uploadResponse = transferManager.upload(uploadRequest);
        MessageDigest uploadMD = MessageDigest.getInstance(algorithm);
        // with a seed of 3470, we expect a crc32 digest of:
        // or sha256: "0e317c890dedbf007e6b4c25bbf347563f645d83e29b8eed85a1b293ea0d31ba"
        // or md5: 7052d097616126ae82c211a9834220f3

        Random random = new Random(3470);

        try (OutputStream outputStream = body.outputStream()) {
          byte[] buffer = new byte[1024];
          for (int i = 0; i < 10; i++) {
            // want new random bytes every time.
            random.nextBytes(buffer);
            int bytesToWrite = 32;

            uploadMD.update(buffer, 0, bytesToWrite);
            // My best guess here is that the BlockingOutputStreamAsyncRequestBody is not appropriately
            // copying the bytes. Instead, I believe that it is using the same buffer for each write
            // however, between each iteration we're overwriting that buffer.
            // then I believe that there's a race to consume those bytes before we get around to overwriting them
            outputStream.write(buffer, 0, bytesToWrite);

            // if you uncomment this line, it seems to work everytime.
            // Thread.sleep(10);
          }
        } catch (Exception e) {
          // this typically throws if the upload fails due bad credentials. It also takes some time (10s) to timeout
          // Don't throw, just let the below lines run, one of them will also throw letting us see the reason why
          // we failed to connect.
          e.printStackTrace();
        }

        // wait for the upload to finish.
        uploadResponse.completionFuture().join();

        String uploadDigest = Hex.encodeHexString(uploadMD.digest());
        System.out.println("ulDigest: " + uploadDigest);

        DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest = DownloadRequest.builder()
          .getObjectRequest(
            GetObjectRequest.builder()
              .bucket(bucket)
              .key(key)
              .build()
          ).responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
          .build();

        CompletedDownload<ResponseInputStream<GetObjectResponse>> dl = transferManager.download(downloadRequest)
          .completionFuture()
          .join();

        MessageDigest responseDigest = MessageDigest.getInstance(algorithm);
        try (InputStream inStream = dl.result()) {
          byte[] allBytes = inStream.readAllBytes();
          responseDigest.update(allBytes);
        } catch (Exception e) {
          e.printStackTrace();
        }

        String downloadDigest = Hex.encodeHexString(responseDigest.digest());
        System.out.println("dlDigest: " + downloadDigest);

        assertEquals(uploadDigest, downloadDigest);
      }
    }
  }

Possible Solution

believe this is happening due to wrapping, but not copying the bytes passed to the output stream.

In https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/OutputStreamPublisher.java#L71 the byte buffer is wrapped for compatibility with async nio / publisher APIs. However, due to a lack of immutability and an expectation of blocking behavior from the OutputStream API, this leads to the wrapped data being mutated before it is successfully passed to the CRT library.

  @Test
  public void wrapTest() {
    byte[] buffer = new byte[10];
    // I think this is unnecessary...
    Arrays.fill(buffer, (byte) 0);

    ByteBuffer wrapped = ByteBuffer.wrap(buffer, 0, 10);

    assertEquals(0, wrapped.get(0));
    // mutate the original buffer.
    buffer[0] = (byte) 1;

    assertEquals(0, wrapped.get(0));
  }

Likely what needs to happen here is the data needs to be copied before the write call returns.

Additional Information/Context

Originally I filed https://github.com/awslabs/aws-crt-java/issues/658 with the aws-crt-java library

However I figured out that there is a similar but slightly different problem when using the CRT library -- the CRT library reports success when the correupted data is uploaded, while the standard (non-crt sdk) throws an error:

Suppressed: software.amazon.awssdk.core.exception.SdkClientException: Request attempt 1 failure: Data read has a different checksum than expected. Was 0x7052d097616126ae82c211a9834220f3, but expected 0xb0812d3843229373f0a691bd624025ee. This commonly means that the data was corrupted between the client and service. Note: Despite this error, the upload still completed and was persisted in S3.

AWS Java SDK version used

2.20.118

JDK version used

openjdk version "17.0.2" 2022-01-18 LTS OpenJDK Runtime Environment Zulu17.32+13-CA (build 17.0.2+8-LTS) OpenJDK 64-Bit Server VM Zulu17.32+13-CA (build 17.0.2+8-LTS, mixed mode, sharing)

Operating System and version

Mac OSX 13.4.1 (M1)

debora-ito commented 1 year ago

@jonathanswenson thank you for the detailed explanation. I believe it's the same issue reported here: https://github.com/aws/aws-sdk-java-v2/issues/4083.

If so, we have a task to investigate it further.

Sampo123 commented 8 months ago

I run into this same issue recently.

And your deduction is correct that the wrapping and not copying is the problem. One can not reasonably expect OutputStream#write() to return before it has either written the buffer or copied it.

As for now this renders the whole BlockingOutputStreamAsyncRequestBody pretty much pointless as you either need to trust in pure luck by adding some sleeps and waits or buffer your whole input into memory, and at that point you can use other more reliable ways to upload.

Sampo123 commented 8 months ago

@jonathanswenson an easy workaround is to use

final AsyncRequestBody body = AsyncRequestBody.fromPublisher(publisher);
final CompletableFuture<UploadPartResponse> resp = asyncClient.uploadPart(request, body);

And then something like this:

public class QueuePublisher extends OutputStream implements Publisher<ByteBuffer> {

    private final BlockingQueue<QueueBuffer> queue = new LinkedBlockingQueue<>(10);
    private final long contentLength;
    private volatile long pos = 0;

    record QueueBuffer(byte[] buffer, int length) {}

    private Subscriber<? super ByteBuffer> subscriber;

    public QueuePublisher(long contentLength) {
        this.contentLength = contentLength;
    }

    @Override
    public void write(int b) throws IOException {
        // no op
    }

    @Override
    public void write(byte[] buffer, int off, int len) throws IOException {
        final byte[] internalBuffer = new byte[len];
        System.arraycopy(buffer, 0, internalBuffer, 0, len);
        try {
            queue.put(new QueueBuffer(internalBuffer, len));
        } catch (InterruptedException e) {
            subscriber.onError(e);
            throw new IOException(e);
        }
    }

    @Override
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        this.subscriber = subscriber;
        this.subscriber.onSubscribe(new QueueSubscriber());

    }

    class QueueSubscriber implements Subscription {

        private final AtomicBoolean done = new AtomicBoolean(false);

        @Override
        public void request(long n) {
            if (done.get()) return;
            for (int i = 0; i < n; i++) {
                if (done.get()) break;
                send();
            }
        }

        private synchronized void send() {
            try {
                QueueBuffer qb = queue.take();
                subscriber.onNext(ByteBuffer.wrap(qb.buffer()));
                pos += qb.length;
                if (pos == contentLength) {
                    done.set(true);
                    subscriber.onComplete();
                }
            } catch (InterruptedException e) {
                subscriber.onError(e);
                throw new RuntimeException(e);
            }
        }

        @Override
        public void cancel() {
            // TODO implement
        }
    }
}

It will buffer a little bit more but in the end that should be no more than 10 x your write buffer size.

I'm moving potentially large files (several hundred MiB) before doing multipart, and even with multipart I have quite big parts so I cannot use anything other than streaming.

I haven't had the time to test this more but there is of course potential that this solution will send too much stuff downstream and I need to throttle it more according to the demand, but that remains to be seen.

apulbere commented 2 months ago

I also encountered the same corruption of uploaded files via BlockingOutputStreamAsyncRequestBody. No fix yet? This issue is almost 1 year old