aws / aws-swf-flow-library

AWS Simple Workflow Flow framework library
Apache License 2.0
61 stars 54 forks source link

[SWF] shutting down an activity worker is a nightmare of exceptions and lost tasks #2

Open manikandanrs opened 8 years ago

manikandanrs commented 8 years ago

From @danwashusen on February 25, 2016 5:2

We use SWF to coordinate long running task processing in combination with EC2 auto-scaling and we've noticed a bunch of issues with SpringActivityWorker and the JVM shutdown process.

  1. As far as I can tell with the default functionality (disableServiceShutdownOnStop=false) tasks fail to complete because the 'service' (SWF client) is shut down before 'pollExecutor' and 'poller', as a result tasks can't communicate their results.
  2. Setting disableServiceShutdownOnStop=true seems to fix the task results issue mentioned in 1 but causes other activity tasks to 'timeout' because the poller could still be long-polling. If a task arrives during the shutdown process the JVM locks up trying to submit the task to an executor service that is shutting down.
  3. The 'stop' method on SpringActivityWorker gives no indication (logs, etc) that tasks are being abandoned if they don't complete in terminationTimeoutSeconds (it could check the result of 'awaitTermination').

We've managed to work around issue 3 by overriding the 'stop' method and looping on 'awaitTermination' until it returns true (see below).

Issue 2 seems to be the real kicker; during JVM shutdown spring calls 'stop' on SpringActivityWorker which starts the shutdown process, skipping the 'service' shutdown (as configured) and stopping the 'pollExecutor' and 'poller'. However (as far as I can tell) because the 'service' hasn't been shutdown (which would send an abort to any open requests) its possible for an existing long polling 'pollForActivityTask' request to be running and fetching new tasks. These tasks end up failing with a 'start to close' timeout because they are associated with an instance that was in the process of shutting down.

Maybe I'm missing something obvious because I can't find anyone else complaining about this. We've managed to work around all these issues with the following extension of 'SpringActivityWorker', but honestly the whole shebang makes me anxious.

public class GracefulShutdownSpringActivityWorker extends SpringActivityWorker implements ApplicationListener {
    private final Logger logger = LoggerFactory.getLogger(GracefulShutdownSpringActivityWorker.class);

    public GracefulShutdownSpringActivityWorker() {
    }

    public GracefulShutdownSpringActivityWorker(AmazonSimpleWorkflow service, String domain, String taskListToPoll) {
        super(service, domain, taskListToPoll);

        // this value must be true to avoid delayed shutdown issues
        // activities will not be able to report their completed/failed state if the swf client is shutdown...
        setDisableServiceShutdownOnStop(true);
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event.getClass().equals(ContextClosedEvent.class)) {
            // tell the poller to not fetch any more tasks (this locks up the poller threads)
            logger.info("Suspend polling for new activity tasks...");
            super.suspendPolling();
        }
    }

    /* the default impl. of this method leaves dangling activities that eventually 'time out', this shutdown process ensures that activities complete and report back */
    @Override
    public void stop() {
        if (!isDisableServiceShutdownOnStop()) {
            logger.warn("disableServiceShutdownOnStop is set to false, activities will not be able to report their completed/failed state!");
        }

        // its possible that despite suspending polling we get a task (because it might be currently polling for a task)
        // sleeping for 90 seconds ensures that any tasks accepted before we suspended polling get processed
        sleep();

        // request a shutdown (all running activities should complete)
        logger.info("Stopping the worker...");
        super.stop();

        // now release the suspended polling latch which should now exit because the poller is terminating
        super.resumePolling();

        // wait until all activities complete
        try {
            while (!super.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.info("Still waiting for activity worker to shutdown...");
            }
            logger.info("Done waiting for activity worker to shutdown...");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Failed while waiting for task executor to complete currently running tasks...", e);
        }
    }

    private void sleep() {
        final Duration duration = Duration.standardSeconds(90);
        logger.info(String.format("Sleeping for %s to allow dangling activity tasks to complete...", duration));
        ThreadUtils.sleepIfYouCan(duration);
    }
}

Copied from original issue: aws/aws-sdk-java#642

manikandanrs commented 8 years ago

From @fulghum on May 10, 2016 18:39

Thanks for the feedback on the SWF Flow library. We've pinged the SWF team and passed along the feedback to them.

omnipitous commented 7 years ago

Not sure how helpful this is (or consistent) but I was able to reproduce inconsistent downscaling behavior. Here is at least one key difference that might be useful in debugging the problem: A successful shutdown that correctly calls my configured shutdown hook is initiated with the following InterruptedException: {"exception":{"stacktrace":"java.lang.InterruptedException\n\tat java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)\n\tat java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)\n\tat java.util.concurrent.Semaphore.acquire(Semaphore.java:312)\n\tat com.amazonaws.services.simpleworkflow.flow.worker.ActivityTaskPoller.pollAndProcessSingleTask(ActivityTaskPoller.java:77)\n\tat com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker$PollServiceTask.run(GenericWorker.java:94)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)","exception_class":"java.lang.InterruptedException"},"source_host":"ip-10-66-6-131","method":"uncaughtException","level":"ERROR","message":"Failure in thread SWF Activity Poll TransformTaskList 1","mdc":{},"@timestamp":"2016-10-19T22:06:01.638Z","file":"GenericWorker.java","line_number":"156","thread_name":"SWF Activity Poll TransformTaskList 1","@version":1,"logger_name":"com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker","class":"com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker$1"}

A *FAILED shutdown that never hits the hook initiates with the following AbortedException: {"exception":{"stacktrace":"com.amazonaws.AbortedException: \n\tat com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)\n\tat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)\n\tat com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)\n\tat com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)\n\tat com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)\n\tat com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)\n\tat com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1191)\n\tat com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:753)\n\tat com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:97)\n\tat com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)\n\tat com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1142)\n\tat com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:853)\n\tat com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:576)\n\tat com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:362)\n\tat com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:328)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:307)\n\tat com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient.invoke(AmazonSimpleWorkflowClient.java:3096)\n\tat com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient.pollForActivityTask(AmazonSimpleWorkflowClient.java:1671)\n\tat com.amazonaws.services.simpleworkflow.flow.worker.SynchronousActivityTaskPoller.poll(SynchronousActivityTaskPoller.java:158)\n\tat com.amazonaws.services.simpleworkflow.flow.worker.ActivityTaskPoller.pollAndProcessSingleTask(ActivityTaskPoller.java:81)\n\tat com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker$PollServiceTask.run(GenericWorker.java:94)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)","exception_class":"com.amazonaws.AbortedException","exception_message":""},"source_host":"ip-10-66-6-131","method":"uncaughtException","level":"ERROR","message":"Failure in thread SWF Activity Poll TransformTaskList 1","mdc":{},"@timestamp":"2016-10-19T22:07:03.340Z","file":"GenericWorker.java","line_number":"156","thread_name":"SWF Activity Poll TransformTaskList 1","@version":1,"logger_name":"com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker","class":"com.amazonaws.services.simpleworkflow.flow.worker.GenericWorker$1"}

I haven't dug deep enough in the SDK code to see why one would be handled differently than the other but may be a good lead... still digging...

omnipitous commented 7 years ago

Update: It seems I am seeing our shutdown hook being called at least some of the time. When the above Aborted exception occurs, my current hypothesis is that since the abort is happening in the actual stream read: com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71) we are succeeding enough to pull the work off of the TaskList BUT then the service is shutting down before that work is sent to the executor? A no mans land between read and execute...

I was able to reproduce this on one of our activity workers that consistently take at least 1-2 seconds to do its work (so my window of interruption is slightly longer). I sat in a second SSH window with the first window tailing our log. When the log starts to move I systemctl stop the service. The above logs came from a 1-1 (1 successful shutdown 2nd one failed). Today I succeeded twice before failing. Either way fairly high risk of occurrence.

electronic-dk commented 6 years ago

@manikandanrs @danwashusen I've also encountered this issue and thank you for posting this solution, it helped me a lot. I've also found a couple of interested things.

First of all, if a worker is shutting down and during this process a new task is accepted (by one of the already running poll requests) then the whole process hangs since amazon uses a client-blocking rejection handler for the task executors task pool (see GenericActivityWorker.java:99). And it looks like it's impossible to change this implementation (to, say, the default one which would just throw an exception). It's possible to extend the GenericWorker and override the createPoller method, by it's impossible to use this extended variant in SpringActivityWorker so I had to create my own SpringActivityWorker which would use my implementation of the GenericWorker

Another thing is: by default the poll requests have no timeout, so it would be probably a good idea to set their timeout to something less than the time that the graceful shutdown worker sleeps after suspending polling. This way all the running poll requests will have either accepted a task or disconnected and started waiting on the countdown latch (because of the suspending). This should guarantee that after the sleep() method there will be no running long poll requests. Unfortunately, as far as I can tell, this timeout is not configurable, so I had to implement my own MyActivityTaskPoller which extends ActivityTaskPoller by essentially copying the existing one and only changing the timeout.

Also, in conjunction with the above items in my comment (default rejection handler and custom ActivityTaskPoller) it's possible to implemented a safety net: MyActivityTaskPoller would catch the RejectedExecutionException and fail the task which arrived. This way this task would be immediately marked as failed in amazon and the workflow would continue instead of eventually timing out.

This all was done as a PoC: copying the existing classes isn't a very good practise so a more robust and clean approach is needed. Is it possible to submit a pull request to the swf framework addressing my points?

Just to sum up:

  1. The ability to provide any rejection execution handler to the poll executor.
  2. The ability to set a custom GenericWorker in SpringActivityWorker
  3. The ability to set a poll request timeout to ActivityTaskPoller

Please, let me know if I missed anything and/or is this can be done with the existing code. I tried to look carefully but of course I may have missed something.

UPD: Another improvement I made is in ActivityTaskPoller: instead of waiting on the semaphore infinitely, I added a timeout: this way, there won't be dangling polling tasks left during the shutdown process. But then I just decided to just use SynchronousActivityTaskPoller, this would solve most of my problems, but I'm still left with the issue of not being able to change the generic worker in SpringActivityWorker