locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.33k stars 362 forks source link

Non-blocking S3AsyncClient usage #3467

Open pomadchin opened 2 years ago

pomadchin commented 2 years ago

This task is to replace the blocking S3Client we use in all cases it makes sense to replace it with its non blocking version, i.e. in {RDD | Collection}{Readers | Writers}.

To make migration soft we may want to keep both clients (Sync and Async) available. i.e. it makes more sense for AttributeStore to use the blocking S3Client for now.

Leaving below the S3ClientProducer to support the idea.

object S3ClientProducer {
  @transient private lazy val overrideConfig: ClientOverrideConfiguration = {
    val retryCondition =
      OrRetryCondition.create(
        RetryCondition.defaultRetryCondition(),
        RetryOnErrorCodeCondition.create("RequestTimeout")
      )
    val backoffStrategy =
      FullJitterBackoffStrategy.builder()
        .baseDelay(Duration.ofMillis(50))
        .maxBackoffTime(Duration.ofMillis(15))
        .build()
    val retryPolicy =
      RetryPolicy.defaultRetryPolicy()
        .toBuilder
        .retryCondition(retryCondition)
        .backoffStrategy(backoffStrategy)
        .build()
    ClientOverrideConfiguration.builder()
      .retryPolicy(retryPolicy)
      .build()
  }

  @transient private lazy val client: S3Client =
    S3Client.builder()
      .overrideConfiguration(overrideConfig)
      .build()

  @transient private lazy val asyncClient: S3AsyncClient =
    S3AsyncClient.builder()
      .overrideConfiguration(overrideConfig)
      .build()

  private var summonClient: () => S3Client = () => client

  private var summonAsyncClient: () => S3AsyncClient = () => asyncClient

  /** Set an alternative default function for summoning S3Clients */
  def set(getClient: () => S3Client): Unit = summonClient = getClient

  def setAsync(getClient: () => S3AsyncClient): Unit = summonAsyncClient = getClient

  /** Get the current function registered as default for summong S3Clients */
  def get: () => S3Client = summonClient

  def getAsync: () => S3AsyncClient = summonAsyncClient
}

Connects https://github.com/locationtech/geotrellis/issues/2923, https://github.com/locationtech/geotrellis/issues/2306