nvdb-vegdata / nvdb-api-client

BSD-licensed open source Java library for consuming NVDB REST API
BSD 2-Clause "Simplified" License
13 stars 9 forks source link

AsyncResult støtter ikke resubscribe #126

Open daniesso opened 1 year ago

daniesso commented 1 year ago

Når man kaller roadObjectClient.getRoadObjectsAsync(featureTypeId, roadObjectRequest).get() får man tilbake en Flux som er knyttet til en ExecutorService som avsluttes når Flux-en termineres. Det fører til problemer dersom man forsøker å konsumere flux-en på nytt, fordi ExecutorService-en ikke lengre tar i mot nye jobber, og man får en litt kryptisk RejectedExecutionException. Jeg mener at det er et ganske vanlig brukstilfelle å konsumere en Flux på nytt. I vårt tilfelle hadde vi en jobb som brukte flux.retry(3) for å prøve på nytt ved feil fra API-et, men et forsøk på retry vil da gi overnevnte feil (og i vårt tilfelle faktisk svelge hele stacktrace-en 😞).

For meg ser det ut som en mulig løsning å flytte instansieringen av ExecutorService-en inn i AsyncResult.get, sånn her:

  public Flux<T> get() {
    return Flux.create(sink -> {
      ExecutorService executorService = Executors.newSingleThreadExecutor();

      executorService.execute(() -> {
        try {
          PagingIndicator pagingIndicator = doPage(sink, page);
          while (pagingIndicator.hasNext) {
            pagingIndicator = doPage(sink, pagingIndicator.currentPage);
          }
        } catch (Exception e) {
          sink.error(e);
        } finally {
          sink.complete();
          executorService.shutdown();
        }
      });
    });
  }

Da vil man i så fall instansiere en ny ExecutorService hvis man subscriber på flux-en på nytt. Hva tenker dere om det? Jeg lager gjerne en PR.