spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
98 stars 97 forks source link

Missing graceful shutdown option when `kpl-kcl-enabled: true` #222

Open jakub-rogowski opened 3 weeks ago

jakub-rogowski commented 3 weeks ago

In what version(s) of Spring Cloud Stream Binder for AWS Kinesis are you seeing this issue?

4.0.3

Describe the issue

We are attempting to gracefully shut down KCL Schedulers. However, this cannot be accomplished through the binder configuration because the KclMessageDrivenChannelAdapter has a hardcoded shutdown process:

@Override
protected void doStop() {
    super.doStop();
    this.scheduler.shutdown();
}

Additionally, the KinesisMessageChannelBinder has a hardcoded adapter creation method, which complicates the use of the Scheduler's method: CompletableFuture<Boolean> startGracefulShutdown().

KclMessageDrivenChannelAdapter adapter =
        new KclMessageDrivenChannelAdapter(this.amazonKinesis, this.cloudWatchClient, this.dynamoDBClient,
                streams);

Expected behavior

Is it possible to configure this behavior easily using the binder, or are we missing something related to the graceful shutdown process?

artembilan commented 3 weeks ago

According to the Javadocs I don't see too much difference why would we use one over the other:

    /**
     * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
     * services were passed to the worker by the user, worker will not attempt to shutdown those resources.
     *
     * <h2>Shutdown Process</h2> When called this will start shutdown of the record processor, and eventually shutdown
     * the worker itself.
     * <ol>
     * <li>Call to start shutdown invoked</li>
     * <li>Lease coordinator told to stop taking leases, and to drop existing leases.</li>
     * <li>Worker discovers record processors that no longer have leases.</li>
     * <li>Worker triggers shutdown with state {@link ShutdownReason#LEASE_LOST}.</li>
     * <li>Once all record processors are shutdown, worker terminates owned resources.</li>
     * <li>Shutdown complete.</li>
     * </ol>
     */
    public void shutdown() {
    /**
     * Requests a graceful shutdown of the worker, notifying record processors, that implement
     * {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
     * checkpoint.
     *
     * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
     * previous future.
     *
     * <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
     * lost after requesting shutdown, but before the notification is dispatched.</b>
     *
     * <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
     * allow the record processors a chance to checkpoint a final time.
     * <ol>
     * <li>Call to request shutdown invoked.</li>
     * <li>Worker stops attempting to acquire new leases</li>
     * <li>Record Processor Shutdown Begins
     * <ol>
     * <li>Record processor is notified of the impending shutdown, and given a final chance to checkpoint</li>
     * <li>The lease for the record processor is then dropped.</li>
     * <li>The record processor enters into an idle state waiting for the worker to complete final termination</li>
     * <li>The worker will detect a record processor that has lost it's lease, and will terminate the record processor
     * with {@link ShutdownReason#LEASE_LOST}</li>
     * </ol>
     * </li>
     * <li>The worker will shutdown all record processors.</li>
     * <li>Once all record processors have been terminated, the worker will terminate all owned resources.</li>
     * <li>Once the worker shutdown is complete, the returned future is completed.</li>
     * </ol>
     *
     * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
     *         completed successfully. A false value indicates that a non-exception case caused the shutdown process to
     *         terminate early.
     */
    public CompletableFuture<Boolean> startGracefulShutdown() {

The most important part is about Once all record processors are shutdown. That's what we would like to have when we process data. Why do you need exactly startGracefulShutdown()? If you have a good argument, I might just change that doStop() to call this one instead of regular shutdown(). Thanks

ashwinw commented 3 weeks ago
     * <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
     * allow the record processors a chance to checkpoint a final time.
     * <ol>

This part is of interest to us. We are seeing double processing of the messages during the abrupt shutdown since the checkpointing is not complete.

ashwinw commented 3 weeks ago

If you have a good argument, I might just change that doStop() to call this one instead of regular shutdown().

Yes, agree, this seems like a reasonable change.

jakub-rogowski commented 3 weeks ago

We also have few errors during shutdown, e.g.:

Exception thrown while fetching records from Kinesis 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: executor not accepting a task

Unknown exception while publishing 20 datums to CloudWatch 
java.lang.RuntimeException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: event executor terminated

I overrode doStop() just to test the behavior with startGracefulShutdown():

    @Override
    protected void doStop() {
        super.doStop();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for  graceful shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now gracefully.");
    }

Since then, I have seen no error at all during shutdown so far, even during processing the data.

artembilan commented 3 weeks ago

Sounds good.

Any chances that you can move this issue into https://github.com/spring-projects/spring-integration-aws? And provide such a contribution over there? Unfortunately there might be nothing to do in this project.

Although I have a question why do we need that gracefulShutdownFuture.get(20, TimeUnit.SECONDS);? Why we cannot just let it go and continue with normal application shutdown? I believe that processes in-flight are let be finished by normal Spring ApplicationContext.close().

If you think that we still need that gracefulShutdownPeriod, then we indeed can introduce such an option and expose it here for Kinesis Binder. That way we may have it conditional: if gracefulShutdownPeriod is provided, then startGracefulShutdown(), otherwise regular shutdown().

I also wonder what is that executor not accepting a task and why it is closed before we are done with record processing. Isn't that already a KCL concern that there is a race condition in their shutdown()?

jakub-rogowski commented 2 weeks ago

Any chances that you can move this issue into https://github.com/spring-projects/spring-integration-aws? And provide such a contribution over there?

Sure, I'll move it there.

Although I have a question why do we need that gracefulShutdownFuture.get(20, TimeUnit.SECONDS);?

I just followed this guide: https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html and used the provided code. But it looks quite useful.

Why we cannot just let it go and continue with normal application shutdown? I believe that processes in-flight are let be finished by normal Spring ApplicationContext.close().

This I need to check.

I also wonder what is that executor not accepting a task and why it is closed before we are done with record processing. Isn't that already a KCL concern that there is a race condition in their shutdown()?

Yes, probably you're right. I'm thinking about opening an issue there.