googleapis / java-spanner

Apache License 2.0
64 stars 116 forks source link

non blocking api throughput is low #1751

Closed takaaki7 closed 1 year ago

takaaki7 commented 2 years ago

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ..."). General, Core, and Other are also allowed as types Spanner:readRows

  2. OS type and version: osx 11.2.3 16cpu

  3. Java version: openjdk 17.0.1 2021-10-19

  4. version(s): 6.20.0

Overview

Nonblocking api like readAsync(), runAsync() seems has low throughput. Using blocking read api with my own threads is much faster than nonblocking api.

Especially, readAsync().toListAsync() concurrency seems to limitted to 8. The max value of num_sessions_in_pool(num_in_use_sessions) is only 8. (num_sessions_in_pool(num_read_sessions) is about 400).

What is the best way to use nonblocking methods with good performance?

Benchmark code

package jp.co.plaid.karte;

import com.google.cloud.spanner.*;
import jp.co.plaid.karte.util.FutureUtils;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import static java.util.stream.Collectors.toMap;

public class SpannerReadBenchmark {

  String projectId = "xxx";
  String instance = "xxx";
  String database = "xxx";
  String tableName = "xxx";

  DatabaseClient spannerClient = getClient();
  KeySet keySet = KeySet.newBuilder().addKey(Key.of("mock", "mock", "mock")).build();
  String column = "xxx";

  Executor executor1 = Executors.newFixedThreadPool(400);
  Executor executor2 = Executors.newFixedThreadPool(400);

  public DatabaseClient getClient() {
    SpannerOptions.Builder builder = SpannerOptions.newBuilder();
    SpannerOptions spannerOptions = builder.setProjectId(projectId).build();
    DatabaseId databaseId = DatabaseId.of(projectId, instance, database);

    return spannerOptions.getService().getDatabaseClient(databaseId);
  }

  @Test
  void test() throws Exception {
    benchmark("toListAsync", this::toListAsync);
    benchmark("runAsync_toListAsync", this::runAsyncAndToListAsync);
    benchmark("runAsync_setCallback", this::runAsyncAndSetCallback);
    benchmark("blockingRead", this::blockingRead);
  }

  void benchmark(String name, Callable<CompletableFuture<?>> callable) throws Exception {
    int repeatCunt = 10;
    List<Long> results = new ArrayList<>();
    for (int i = 0; i < repeatCunt; i++) {
      List<CompletableFuture<?>> futures = new ArrayList<>();
      for (int j = 0; j < 800; j++) {
        futures.add(callable.call());
      }

      long startTime = System.currentTimeMillis();
      CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
      long duration = System.currentTimeMillis() - startTime;
      results.add(duration);
      Thread.sleep(200);
    }
    System.out.println("----" + name + "----");
    System.out.println("avg:" + results.stream().mapToInt(Long::intValue).average().getAsDouble());
    System.out.println("all:" + results);
  }

  public CompletableFuture<List<String>> toListAsync() {
    try (AsyncResultSet resultSet =
           spannerClient
             .singleUse()
             .readAsync(tableName, keySet, List.of(column))) {
      CompletableFuture<List<String>> future =
        FutureUtils.toCompletableFuture(
          resultSet.toListAsync(
            input -> input.getString(column),
            executor1));
      return future;
    }
  }

  public CompletableFuture<List<String>> runAsyncAndToListAsync() {
    AsyncRunner asyncRunner = spannerClient.runAsync();
    return FutureUtils.toCompletableFuture(asyncRunner.runAsync(txn -> {
      AsyncResultSet resultSet = txn.readAsync("RefTables", keySet, List.of(column));
      return resultSet.toListAsync(input -> input.getString(column), executor1);
    }, executor2));
  }

  public CompletableFuture<List<String>> runAsyncAndSetCallback() {
    CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
    spannerClient.runAsync().runAsync(txn -> {
      AsyncResultSet resultSet = txn.readAsync(tableName, keySet, List.of(column));
      List<String> ret = new ArrayList<>();
      return resultSet.setCallback(
        executor1,
        resultSet1 -> {
          try {
            while (true) {
              switch (resultSet1.tryNext()) {
                case OK:
                  ret.add(resultSet1.getString(column));
                  break;

                case DONE:
                  future.complete(ret);
                  return AsyncResultSet.CallbackResponse.DONE;

                case NOT_READY:
                  return AsyncResultSet.CallbackResponse.CONTINUE;

                default:
                  throw new IllegalStateException();
              }
            }
          } catch (SpannerException e) {
            return AsyncResultSet.CallbackResponse.DONE;
          }
        });
    }, executor2);
    return future;
  }

  public CompletableFuture<List<String>> blockingRead() {
    CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
    executor1.execute(() -> {
      ResultSet resultSet = spannerClient.singleUse().read(tableName, keySet, List.of(column));
      List<String> ret = new ArrayList<>();
      while (resultSet.next()) {
        ret.add(resultSet.getCurrentRowAsStruct().getString(column));
      }
      future.complete(ret);
    });
    return future;
  }
}

result

----toListAsync----
avg:5228.1
all:[5727, 4605, 4655, 4591, 4461, 4567, 4510, 7277, 4612, 7276]
----runAsync_toListAsync----
avg:4914.0
all:[4761, 4625, 4707, 4709, 4722, 4581, 4607, 4681, 7184, 4563]
----runAsync_setCallback----
avg:4854.6
all:[6057, 5630, 4618, 4531, 4681, 4610, 4660, 4679, 4541, 4539]
----read(blocking)----
avg:337.3
all:[254, 156, 607, 315, 403, 292, 415, 597, 174, 160]
takaaki7 commented 2 years ago

I forgot, this is FutureUtils.toCompletableFuture code.

public class FutureUtils {

  public static <T> CompletableFuture<T> toCompletableFuture(ApiFuture<T> apiFuture) {
    CompletableFuture<T> completableFuture =
        new CompletableFuture<T>() {
          @Override
          public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = apiFuture.cancel(mayInterruptIfRunning);
            super.cancel(mayInterruptIfRunning);
            return result;
          }
        };

    ApiFutureCallback<T> callback =
        new ApiFutureCallback<T>() {
          @Override
          public void onFailure(Throwable throwable) {
            completableFuture.completeExceptionally(throwable);
          }

          @Override
          public void onSuccess(T t) {
            completableFuture.complete(t);
          }
        };
    ApiFutures.addCallback(apiFuture, callback, MoreExecutors.directExecutor());

    return completableFuture;
  }
}
takaaki7 commented 2 years ago

Is this a bug? Or my benchmark code is wrong?

rajatbhatta commented 2 years ago

Looking into this.

rajatbhatta commented 2 years ago

You can provide your own ExecutorProvider to use with the async API using SpannerOptions.setAsyncExecutorProvider(executor).

rajatbhatta commented 2 years ago

The one you're using is the default that is automatically created, which has the following default settings:

Try passing in your own ExecutorProvider and benchmark again.

takaaki7 commented 2 years ago

I think, generally non-blocking api throughput should not be limited by thread pool size. non-blocking api does exists for high concurrency with a few threads.

jonathan-ostrander commented 2 years ago

I'll be opening a Google Support Case through my company's (Hopper) account, but just for visibility we're running into the same issue using the async methods in Scala.

Here are 2 simple scripts that evaluate the time taken to execute 10k reads by primary key:

Sync:

package com.hopper.random

import com.google.cloud.spanner.DatabaseId
import com.google.cloud.spanner.Key
import com.google.cloud.spanner.SessionPoolOptions
import com.google.cloud.spanner.SpannerOptions
import com.google.cloud.spanner.Statement
import java.time.Instant
import scala.collection.compat.immutable.LazyList
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.util.Random
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

object SpannerScript extends App {
  implicit val ec: ExecutionContext =
    ExecutionContext.fromExecutor(
      Executors.newFixedThreadPool(100)
    )

  val databaseId = DatabaseId.of(
    "test-project",
    "test-instance",
    "test-table"
  )

  val options = SpannerOptions.newBuilder()
    .setNumChannels(100)
    .setAsyncExecutorProvider(
      SpannerOptions.FixedCloseableExecutorProvider.create(
        SpannerOptions.createAsyncExecutorProvider(100, 60L, TimeUnit.SECONDS).getExecutor
      )
    )
    .setSessionPoolOption(
      SessionPoolOptions.newBuilder()
        .setMinSessions(10000)
        .setMaxSessions(10000)
        .build()
    )
    .build()
  val client = options.getService().getDatabaseClient(databaseId)

  val idSet = new Array[String](10000)
  val rs = client.singleUse().executeQuery(
    Statement.of("SELECT id FROM foo LIMIT 10000")
  )

  var i = 0
  while (rs.next()) {
    idSet.update(i, rs.getString("id"))
    i += 1
  }
  rs.close()

  def readRow(): Future[Unit] = Future {
    client.singleUse().readRow(
      "foo",
      Key.newBuilder().append(idSet(Random.nextInt(10000))).build(),
      List("id", "metadata", "state", "updated_at").asJava
    )
  }

  val start = Instant.now.toEpochMilli()
  Await.result(
    Future.traverse(LazyList.continually(()).take(10000).toList)(_ => readRow()),
    atMost = Duration.Inf
  )
  val total = Instant.now.toEpochMilli() - start
  println(s"Finished 10000 queries in ${total} ms at rate ${10000.0 * 1000.0 / total} reads/second")
}

Sync output:

Finished 10000 queries in 5779 ms at rate 1730.4031839418585 reads/second

Async:

import com.google.cloud.spanner.DatabaseId
import com.google.cloud.spanner.Key
import com.google.cloud.spanner.SessionPoolOptions
import com.google.cloud.spanner.SpannerOptions
import com.google.cloud.spanner.Statement
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.time.Instant
import scala.collection.compat.immutable.LazyList
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.util.Random

object SpannerScript extends App {
  implicit val ec: ExecutionContext =
    ExecutionContext.fromExecutor(
      Executors.newFixedThreadPool(100)
    )

  val databaseId = DatabaseId.of(
    "test-project",
    "test-instance",
    "test-table"
  )

  val options = SpannerOptions.newBuilder()
    .setNumChannels(100)
    .setAsyncExecutorProvider(
      SpannerOptions.FixedCloseableExecutorProvider.create(
        SpannerOptions.createAsyncExecutorProvider(100, 60L, TimeUnit.SECONDS).getExecutor
      )
    )
    .setSessionPoolOption(
      SessionPoolOptions.newBuilder()
        .setMinSessions(10000)
        .setMaxSessions(10000)
        .build()
    )
    .build()
  val client = options.getService().getDatabaseClient(databaseId)

  val idSet = new Array[String](10000)
  val rs = client.singleUse().executeQuery(
    Statement.of("SELECT id FROM foo LIMIT 10000")
  )

  var i = 0
  while (rs.next()) {
    idSet.update(i, rs.getString("id"))
    i += 1
  }
  rs.close()

  def readRow(): Future[Unit] = {
    val apiFuture = client.singleUse().readRowAsync(
      "foo",
      Key.newBuilder().append(idSet(Random.nextInt(10000))).build(),
      List("id", "metadata", "state", "updated_at").asJava
    )
    Future {
      apiFuture.get()
    }
  }

  val start = Instant.now.toEpochMilli()
  Await.result(
    Future.traverse(LazyList.continually(()).take(10000).toList)(_ => readRow()),
    atMost = Duration.Inf
  )
  val total = Instant.now.toEpochMilli() - start
  println(s"Finished 10000 queries in ${total} ms at rate ${10000.0 * 1000.0 / total} reads/second")
}

Outputs:

Finished 10000 queries in 59673 ms at rate 167.579977544283 reads/second
rajatbhatta commented 1 year ago

I think, generally non-blocking api throughput should not be limited by thread pool size. non-blocking api does exists for high concurrency with a few threads.

This is how Java Spanner concurrency has been implemented currently (limited to 8 threads by default). Feel free to use your custom implementation for any custom handlings.

zobar commented 1 year ago

On closer examination, #2698 is not related to this ticket. I've confirmed that the default AsyncExecutorProvider has poor performance compared to the synchronous API, but by providing a larger thread pool, it is possible to achieve better performance than the synchronous API.