SpectraLogic / ds3_java_sdk

Apache License 2.0
7 stars 15 forks source link

PUT bulk object potential issue #594

Open alexgb1 opened 2 years ago

alexgb1 commented 2 years ago

Hello,

We've been having some issues transfering an object from S3 to Blackpearl using this library, version 5.4.0

We are transfering using the following ObjectChannelBuilder

// Scala code
// inputStream souce of bytes

new Ds3ClientHelpers.ObjectChannelBuilder() {
          @throws[IOException]
          override final def buildChannel(key: String): SeekableByteChannel = {
            val readChannel: ReadableByteChannel = Channels.newChannel(inputStream)
           new  ReadOnlySeekableByteChannel(readChannel)
          }
        }

It usually happens when we are transfering files bigger than 64 GB.

I configured the max upload size

WriteJobOptions
          .create()
          .withMaxUploadSize(/*  Let's say 512 GB  */)

but didn't change the behaviour. (I assume this might not affect the blob size)

Please see stack trace below, any feedback is welcome.

org.apache.http.client.ClientProtocolException: null
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:187)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72)
    at com.spectralogic.ds3client.networking.NetworkClientImpl$RequestExecutor.execute(NetworkClientImpl.java:239)
    at com.spectralogic.ds3client.networking.NetworkClientImpl.getResponse(NetworkClientImpl.java:177)
    at com.spectralogic.ds3client.Ds3ClientImpl.putObject(Ds3ClientImpl.java:70)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.PutJobTransferMethod.transferJobPart(PutJobTransferMethod.java:78)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.MaxNumObjectTransferAttemptsDecorator.transferJobPart(MaxNumObjectTransferAttemptsDecorator.java:59)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.AbstractTransferStrategy.lambda$transferJobParts$2(AbstractTransferStrategy.java:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.$anonfun$info$4(Slf4jLoggerInternal.scala:91)
    at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.isInfoEnabled(Slf4jLoggerInternal.scala:66)
    at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.info(Slf4jLoggerInternal.scala:91)
    at >>$extension @ fs2.Pull$.fs2$Pull$$go$1(Pull.scala:1189)
Caused by: org.apache.http.client.NonRepeatableRequestException: Cannot retry request with a non-repeatable request entity
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:108)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:72)
    at com.spectralogic.ds3client.networking.NetworkClientImpl$RequestExecutor.execute(NetworkClientImpl.java:239)
    at com.spectralogic.ds3client.networking.NetworkClientImpl.getResponse(NetworkClientImpl.java:177)
    at com.spectralogic.ds3client.Ds3ClientImpl.putObject(Ds3ClientImpl.java:70)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.PutJobTransferMethod.transferJobPart(PutJobTransferMethod.java:78)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.MaxNumObjectTransferAttemptsDecorator.transferJobPart(MaxNumObjectTransferAttemptsDecorator.java:59)
    at com.spectralogic.ds3client.helpers.strategy.transferstrategy.AbstractTransferStrategy.lambda$transferJobParts$2(AbstractTransferStrategy.java:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.spectralogic.ds3client.exceptions.ContentLengthNotMatchException: The Content length for /_Common_Test_Bucket/big_file.mxf (68719476736) does not match the number of bytes read (0)
    at com.spectralogic.ds3client.Ds3InputStreamEntity.writeTo(Ds3InputStreamEntity.java:49)
    at org.apache.http.impl.execchain.RequestEntityProxy.writeTo(RequestEntityProxy.java:121)
    at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
    at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:152)
    at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    ... 12 common frames omitted
RachelTucker commented 2 years ago

How are you transferring the object from S3 to BP? Are you downloading the object from S3 locally and then sending it to BP? Or are you piping the stream directly from the S3 download into the BP’s put call?

The error Cannot retry request with a non-repeatable request entity suggests that some network interruption happened and the data had to be re-sent. The re-sending of the data failed because we could not seek back to the beginning of the input stream.

In your code snippet for your implementation of the buildChannel call, what is inputStream? Is it seekable?

Also, setting maxUploadSize does affect the blob size assuming that blobbing is enabled.

alexgb1 commented 2 years ago

How are you transferring the object from S3 to BP? Are you downloading the object from S3 locally and then sending it to BP? Or are you piping the stream directly from the S3 download into the BP’s put call?

The stream of bytes comes directly from S3.

In your code snippet for your implementation of the buildChannel call, what is inputStream? Is it seekable?

The inputStream is not seekable. It's a pull-based stream written into an OutputStream piped to an InputStream.

Also, setting maxUploadSize does affect the blob size assuming that blobbing is enabled.

Regarding the max upload size, is setting the maxUploadSize through com.spectralogic.ds3client.helpers.Ds3ClientHelpers (as in the example above) different than setting it through the com.spectralogic.ds3client.Ds3Client e.g.

ds3Client.putBulkJobSpectraS3(
                            new PutBulkJobSpectraS3Request(
                              "bucket",
                              List(new Ds3Object(ds3Object.key, ds3Object.size))
                            ).withMaxUploadSize(ds3Object.size)
                          )

I'm asking because the behaviour is different than expected. The maxUploadSize on the Ds3Client works as expected and the one on the Ds3ClientHelpers it doesn't.

We managed to PUT objects bigger than 64 GB, using the putBulkJobSpectraS3 method on the Ds3Client by setting the max upload size in the PutBulkJobSpectraS3Request.

RachelTucker commented 2 years ago

Streaming Strategy

Since you have non-seekable input streams, you need to run the job with StreamingTransferBehavior as opposed to the default RandomAccessTransferBehavior. You can do this by starting the job with Job.startWriteJobUsingStreamedBehavior instead of Job.startWriteJob(). You can see the javadoc here.

Max Upload Size

Setting the MAX_UPLOAD_SIZE on the helpers just adds that value to the PutBulkJobSpectraS3Request, which you can see here. Note that if the value cannot exceed 1 TB, and 1 TB will be used if a larger value is specified.

alexgb1 commented 2 years ago

We set the max upload size in the options, along with other properties:

val options = WriteJobOptions
          .create()
          .withChecksumType(ChecksumType.Type.MD5)
          .withWriteOptimization(WriteOptimization.PERFORMANCE)
          .withMaxUploadSize(dpaConfig.maxUploadSize)

and then used the startWriteJobUsingStreamedBehavior

  val job = helpers.startWriteJobUsingStreamedBehavior(ds3Object.location.bucket, List(content).asJava, options)

We did everything as in the javadoc and then the write would fail at the default of 64GB.

We are not using the helper currently and it works as expected.

Is there any test for this scenario using the helper class?