amazon-archives / aws-flow-ruby

ARCHIVED
137 stars 57 forks source link

SWF Schedules activities to disconnected workers still within 60 second polling window #31

Open Jud opened 10 years ago

Jud commented 10 years ago

My activity workers correctly process tasks 60% of the time, then they start throwing this error.

"I have not been able to poll successfully, and am now bailing out, with error undefined method `activity_type' for nil:NilClass"

Possibly something to do with the forking?

My workers look like:

AWS.config(access_key_id: AWS_SWF_ACTIVITY_KEY, secret_access_key: AWS_SWF_ACTIVITY_SECRET)

swf = AWS::SimpleWorkflow.new
domain = swf.domains[SWF_DOMAIN]

worker = AWS::Flow::ActivityWorker.new(swf.client, domain, task_list, ActivitiesClass)
worker.start if __FILE__ == $0

I'm running this worker on ubuntu/EC2.

@mjsteger

Jud commented 10 years ago

As a test, I performed the following with the workers as outlined above:

20.times do
  $rewquest_workflow_client.start_execution(b)
end

With the forking workers, there are many activities that are scheduled and "started" but never complete. The activity is a simple passthrough activity, without any real computation.

When I change to :use_forking => false, I run into no such issues, and 100% of the tasks complete, though the "I have not been able to poll successfully...." error is still being logged.

mjsteger commented 10 years ago

Looks like this is causing the error, as task can be nill if the poll is empty, and then it will ask for #activity_type on a nil object, getting the error you see. The reason why it doesn't occur when you turn forking off is because there is probably enough work that the polls aren't returning empty. We'll fix this up and get out a patch as soon as possible. Thanks for the great bug report!

Jud commented 10 years ago

@mjsteger this error could be a red herring -- I do get this error with :use_forking => false as well. Though, it will be nice to clean up that error message.

To restate the problem: When using a forking worker, jobs are accepted by the poller, but randomly never executed, and timeout.

Jud commented 10 years ago

@mjsteger after more testing, with :use_forking => false I can replicate the behavior where the activity worker accepts a task, but fails to execute, resulting in a timeout.

1) Start the worker 2) Immediately start an execution 3) Activity will be scheduled and started, but never executed

The task log looks like:

Mon Jan 20 11:11:46 GMT-500 2014 - 7 - WorkflowExecutionTimedOut
Mon Jan 20 11:09:46 GMT-500 2014 - 6 - ActivityTaskStarted
Mon Jan 20 11:09:46 GMT-500 2014 - 5 - ActivityTaskScheduled
Mon Jan 20 11:09:46 GMT-500 2014 - 4 - DecisionTaskCompleted
Mon Jan 20 11:09:46 GMT-500 2014 - 3 - DecisionTaskStarted
Mon Jan 20 11:09:46 GMT-500 2014 - 2 - DecisionTaskScheduled
Mon Jan 20 11:09:46 GMT-500 2014 - 1 - WorkflowExecutionStarted

The task is just a puts "hello", no reason for it to timeout. It seems like the poller is getting tasks, but maybe the process it is spinning up to execute the task isn't ready?

mjsteger commented 10 years ago

We're still looking into this issue. Just to be sure we're looking in the right place, you have a simple activity which simply puts, and you have a workflow which simply calls that one activity, correct? Also, the timeouts are probabilistic, correct? If so, can you comment on what % you see as failures, on average?

Jud commented 10 years ago

@mjsteger I'm working on a small script that shows the bug. I'll post it here.

Jud commented 10 years ago

@mjsteger after more testing, I've narrowed down the issue and I've reproduced it multiple times.

It seems like the TCP long polling connection isn't being terminated on the SWF end when an activity worker exits. I can reproduce with these steps (within the 60 second long polling window):

1) Start an activity task. 2) Stop an activity task. 3) Start an execution.

SWF seems to hand off the execution to the shutdown activity worker, and mark the task as started EVEN THOUGH the worker never actually processes the task. Looking at the debug logs you will see that the worker never even gets the task token.

Is this known behavior? It seems like the worker should be required to ack the token before a task is marked as started?

pmohan6 commented 10 years ago

@Jud, thanks a lot for the clear repro. The problem you are seeing happens when a client disconnects while a long poll request is open. In this case the service may not realize the disconnect and dispatch a task thinking that the poller is still there. This is usually not a problem in production where you have workers running and polling for tasks on a continuous basis. But you may run into this more frequently in testing if you bring up and take down pollers quickly. This is not necessarily a problem in the Flow Framework, but we are looking into resolving it. In the meantime you can try using a different task list for each test run, for instance use a uuid. This will ensure that tasks for each run go to the new poller.

Jud commented 10 years ago

@pmohan6 Thanks for the idea of using a different task list, though, won't that require coordination between the deciders and the activity workers?

I know this isn't really an issue w/ the framework, and thanks for looking into it for me.

Jud commented 10 years ago

@pmohan6 @mjsteger -- I don't think I agree that this isn't an issue in production. For instance, when we upgrade code and restart our workers, any new executions will time out until the socket is closed. We deploy multiple times a day. Any pointers on how to do seamless activity upgrades that doesn't require modifying every workflow that uses the activity?

Jud commented 10 years ago

@mjsteger @pmohan6 Would it be possible to work around this issue using timers? E.g. start a 5 second timer when the Decider starts an activity, and have the activity cancel the timer?

The activity tasks are given something like 200 seconds to complete, so waiting for them to time out isn't an option.

Jud commented 10 years ago

@mjsteger @pmohan6 I realize this might not be the correct forum for this request, but I'd imagine I'm not the only person who rolls code and needs to restart activity workers (Who are subsequently scheduled tasks after they are disconnected, requiring the entire timeout period to elapse before retrying).

Is there a better solution?

pmohan6 commented 10 years ago

@Jud Sorry about the delay in responding. I would suggest using activity heartbeats in this case so that your activities timeout early instead of waiting for the schedule task to timeout. You can set a short enough heartbeat so that when you take your workers down and SWF doesn't receive a heartbeat from the workers, it will timeout the activity task and retry (based on your retry policy).

I'm working on a fix for clean shutdown of activity and workflow workers to wait for the open long poll to finish before shutting down. After this fix, you will just need to do a clean shutdown when you want to restart workers so that there is no 'phantom' dispatch of tasks.

Jud commented 10 years ago

@pmohan6 I see where to set the timeout -- does aws-flow send heartbeats for me, or no? How would I send a heartbeat during a task?

pmohan6 commented 10 years ago

@Jud You will need to send heartbeats using the record_activity_heartbeat method in the activity_execution_context. Here is a ruby flow sample that demonstrates heartbeats -

You can read more about heartbeats here - http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-develop-activity.html#swf-dg-managing-activity-tasks

Hope that helps!

karvapallo commented 10 years ago

Yeah, I was running into the exac same issue. What I did was, I aborted (http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Request.html#abort-property) the long poll request before exiting the app. Here's some pseudo code for a Node.js SWF app:

var interruptCount = 0,
      FORCE_QUIT_LIMIT = 3,
      Task = function Task(){},
      task;

Task.prototype = {
    stopPoll: function stopPoll() {
        if (this.hasOwnProperty('poller')) {
            this.stop = true;
            this.poller.abort();
        }
    },
    poll: function poll() {
        var self = this;

        this.poller = swf.client.pollForActivityTask({
            domain: domain,
            taskList: {
                name: 'name'
            },
            identity: 'something'
        });
                this.poller
            .on('error', function onDone(response) {
                                 if (self.hasOwnProperty('stop') && self.stop) {
                    process.exit();
                    return;
                }
                        });
                        .send();
        }
}

task = new Task();

process.on('SIGINT', function onInterrupt() {
    interruptCount += 1;
    if (interruptCount == FORCE_QUIT_LIMIT) {
        process.exit();
    }
    log('SIGINT! Stopping when possible. ' + (FORCE_QUIT_LIMIT - interruptCount) + ' more to force quit.');
    task.stopPoll();
});
acant commented 10 years ago

@pmohan6 Just wondering about the state of the fix for clean shutdown of long polling workers you mentioned earlier?

pmohan6 commented 9 years ago

@acant Unfortunately, we don't have a fix for this yet. I'll post an update when we do have one.

DenisGorbachev commented 9 years ago

I'm running into the same issue using Node.js SDK...

@karvapallo Your solution seemed perfect, and it lowered the amount of errors. However, sometimes SWF still schedules the task to inactive worker. Have you been able to totally get rid of the errors?

CyborgMaster commented 8 years ago

I'm having the same problem described above. In production, whenever I deploy new code, tasks fail. This happens every time I deploy because SFW assigns tasks to dead workers. Retrying isn't an option in my case because the tasks are not idempotent (which is why I am using SWF in the first place, if they were idempotent, I could have just used SQS).

CyborgMaster commented 8 years ago

With the way the code is currently structured a task will be orphaned every time the worker is shut down as long as there is at least one free fork available when the worker is shut down.

The worker is in an infinite loop doing this (from worker.rb)

        loop do
          run_once(false, poller)
        end

and the important part of run_once looks like this:

        if @shutting_down
          Kernel.exit
        end
        poller.poll_and_process_single_task(@options.use_forking)

poll_and_process_single_task looks like this (summarized):

      def poll_and_process_single_task(use_forking = true)
        begin
          if use_forking
            @executor.block_on_max_workers
          end
          task = @domain.activity_tasks.poll_for_single_task(@task_list)
        if task.nil?
          return false
        end
        if use_forking
          @executor.execute { process_single_task(task) }
        else
          process_single_task(task)
        end
        return true
      end

Which means that the poll_for_single_task happens on the main process and process_single_task happens inside an executor fork.

So lets say you have 1 worker with 2 forks per worker allocated and your current workload is such that you always have exactly 1 task happening at a time. As soon as that task is completed, another one (and only) one is scheduled.

This means that the poll loop will get the first task, which will then start executing in a fork, leaving the main process to start the pole loop again, which will hit the poll_for_single_task and start a long poll. When the task completes, the child fork will terminate, and poll_for_single_task will pick up the next scheduled task, which will fork to execute the task and and the poll loop again.

This will continue correctly until the worker process in shutdown. The interrupt handler looks like this.

        @shutdown_first_time_function = lambda do
          @executor.shutdown Float::INFINITY
          Kernel.exit
        end

which causes the main process to wait for the executor to finish, waiting for the currently executing task, and then ends the main process. But if you remember, the main process was currently in a long poll, and there is no attempt to close that long poll here.

This causes the connection to be left open on AWS's end and AWS will happily allocate and return a task to that long poll as soon as one becomes available.

This is not only a problem during testing, as it occurs any time a worker is shut down, which happens every time you deploy new code, reboot your serve, etc. It is clearly caused by the Flow Framework handling polling incorrectly.

As a cludge right now, if you set the number_of_forks_per_worker to 1, then the @executor.block_on_max_workers that occurs before the long poll starts inside poll_and_process_single_task will cause the main process to block until the task is completed, that way if a task is in progress the worker will terminate gracefully after the task completes. However, if the worker is completely idle, then there will still be a poll in progress that will be abandoned and if SWF gets a task in that 60 second window it will still be orphaned.

CyborgMaster commented 8 years ago

It looks like the AWS SDK for ruby doesn't support closing the long polling, which means that the worker needs to wait for any current polling to complete and also complete the returned task if any before shutting down.

CyborgMaster commented 8 years ago

I think I may have a fix for this. I'm going to try a few things and then I'll open a PR if they work.

CyborgMaster commented 8 years ago

I have a fix at #118. I'm pretty confident that it is the right way to fix it. Feel free to leave any questions or feedback there.

beeradmoore commented 7 years ago

Am experiencing the same problem (I think), even with the hello_world sample. It makes sense that the decider is giving tasks to dead workers if you have restarted them with a new code deploy. But then wouldn't the solution be when deploying new code to increase version numbers so that new activities won't be given to older version workers? (At least from my understanding so far of SWF that is how it works). But yeah, then it is a pain for debugging when you need to stop/start these processes over and over.

If you had a cron job that detects if a worker is running on that server (or X number of workers) and starts needed workers incase some crashed, then you would have the same issue. Tasks would be given to a worker that no longer exists.

CyborgMaster commented 7 years ago

My fork takes the approach of letting the worker finish its current task before shutting it down. That way it doesn't shut down with an open long poll to AWS and AWS won't give it a task after it's gone.