datastax / dsbulk

DataStax Bulk Loader (DSBulk) is an open-source, Apache-licensed, unified tool for loading into and unloading from Apache Cassandra(R), DataStax Astra and DataStax Enterprise (DSE)
Apache License 2.0
85 stars 30 forks source link

Implement application-level retries #448

Open adutra opened 2 years ago

adutra commented 2 years ago

The transition from driver 3.x to driver 4.x a while ago in DSBulk 1.5.0 brought one unexpected consequence: client-side timeouts are now global to the whole statement execution. The driver docs say:

Unlike 3.x, the request timeout now spans the entire request. In other words, it's the maximum amount of time that session.execute will take, including any retry, speculative execution, etc.

What the docs don't say is that because the timeout is global to the session.execute call, timeouts are not retried anymore. The solution to this problem is now to use speculative executions.

Users of DSBulk can use speculative executions if they wish (they are disabled by default). But because speculative executions are hard to tune, and also because they don't work with rate limiters (see #447), I think it would be nice to implement a form of application-level retry when the statement execution fails.

Thanks to the Reactor framework, such a feature could certainly be implemented very easily, using the retryWhen operator, e.g. in LoadWorkflow:

private Flux<WriteResult> executeStatements(Flux<? extends Statement<?>> stmts) {
  Retry spec = new Retry() {

    private Duration delay;
    private long maxRetries = 3;

    @Override
    public Publisher<Boolean> generateCompanion(Flux<RetrySignal> retrySignals) {
      return retrySignals.flatMap(
          signal -> {
            Mono<Boolean> retryDecision;
            if (signal.totalRetries() < maxRetries && signal.failure() instanceof DriverTimeoutException) {
              retryDecision = Mono.just(true);
              if (delay != null) {
                retryDecision = retryDecision.delayElement(delay);
              }
            } else {
                retryDecision = Mono.error(signal.failure());
            }
            return retryDecision;
            });
    }
  };
  return dryRun
      ? stmts.map(EmptyWriteResult::new)
      : stmts.flatMap(statement -> Flux.from(executor.writeReactive(statement)).retryWhen(spec), writeConcurrency);
}

┆Issue is synchronized with this Jira Task by Unito

adutra commented 2 years ago

Duplicate of #443, sorry for that.