edgurgel / verk

A job processing system that just verks! 🧛‍
https://hex.pm/packages/verk
MIT License
721 stars 65 forks source link

Support dynamic node set (auto-scaling, Heroku, Kubernetes, etc.) #157

Closed sebastianseilund closed 5 years ago

sebastianseilund commented 6 years ago

Continuing discussion from #155.

Summary: Due to how node_id works it's currently infeasible to have job-running nodes that come and go (e.g. when running on Kubernetes, or when scaling up and down in Heroku).

Current problems:

I proposed a solution like this:

Before I read deeper about Verk, I assumed that tasks were RPOPLPUSHed (or similar) from a queue list to an inprogress list that are not node-specific, and that some mechanism would move tasks from inprogress back to the queue after a timeout (could be configured per queue). That would mean that a task has to finish within this timeout, or it will be executed twice (because we believe that the first run was aborted for some reason). This mechanism could run on all nodes, and I imagine it could periodically scan the inprogress list for tasks that have exceeded their timeout and push them back to the queue.

I believe this would solve both of the problems above, but it would introduce another problem:

If we were to implement this solution, what should happen to the current node_id behavior? Have it be a configuration option: Use either timeout or node_ids? Or remove the node_id behavior? The first option seems less invasive, but it does make configuration, documentation and implementation more complex by having to support multiple cases.

@edgurgel also wrote:

We could have some sort of "heartbeat" for each instance that is registered instance_id (a random node id + ip + host + pid?) and once this heartbeat is X seconds old (using maybe a TTL on Redis), then enqueued these jobs back? Keep a set of all instance_ids somewhere to know which ones need to be checked? Remove them once they are cleaned up? (I probably need to think more about this TBH)

The way I understand this, all nodes would have to know about each other, right? Also, in auto-scaling/scheduling environments node_ids aren't very useful IMO. So if we could get rid of them all together, it might be better.

To merge the timeout and heartbeat ideas, maybe the node that moved a job from the queue set to the in-progress set could be responsible for periodically updating the timeout of the job in the in-progress set?

myobie commented 6 years ago

I prefer the way rabbit works where it doesn't have a message timeout, but a connection timeout means all messages for that connection are then considered "available to be claimed."

If, instead of using a node_id for verk, one used a random UUID for each connection and that connection maintained a heartbeat, then this could be approximated. If the Supervisor knows to explode if the heartbeat request fails then it should, when reconnecting, have a different connection id and possibly choose to reclaim the jobs from the failed connection.

A query could be made at some interval for connection jobs whom's heartbeat is super old as long as the operation to copy them between connections is atomic.

Does this sound like a good idea? It differs from your above idea in that the connection has the heartbeat and not the individual jobs.

edgurgel commented 6 years ago

@sebastianseilund thanks for starting this discussion!

@myobie, I'm not 100% sure I understood the proposal. I think the best way to model this problem is:

We have 3 instances running and one of the instances crashes abruptly (so no time for any code to run). Will the remaining 2 instances pick up the jobs that were in progress from the instance that just crashed?

So if our design cover this case, the other possible scenarios are simpler I think.

myobie commented 6 years ago

There is a case where a worker seems to have crashed, but hasn’t. That is why I recommend a connection level heartbeat. Periodically all clients can look for stale jobs where the heartbeat is old and capture those jobs with an atomic operation. All clients must themselves self destruct if trying to set their heartbeat fails. If a connection is open (heartbeat is updated) then the machine is still trying and capturing its jobs somewhere else would lead to duplicate work.

One could do this at the job level as well. but it’s probably more granular than necessary. I’d be happy to contribute some code to test out different ideas.

In the end I agree entirely with you: if it works then it works and we’ll all be super happy 🙂

sebastianseilund commented 6 years ago

@myobie thanks for the input! Can you clarify what you mean by a "connection"?

I think the concept of using a heartbeat (i.e. an expiration time that's continuously updated by the worker node) sounds better than a timeout. To be clear, it would be the responsibility of Verk's QueueManager or WorkerManager (not 100% into the terminology yet) to update this heartbeat, not each individual job implementation.

hearbeat_interval

Let's say we have a hearbeat_interval config option. It defaults to e.g. 60 seconds. It can be configured differently per queue/job by the end user if necessary.

If heartbeat_interval is configured properly for each queue so that most tasks are expected to finish within the hearbeat_interval, then it'll only rarely need to be updated (i.e. written to Redis twice or more). So I'm not sure if the granularity is a large factor.

Heartbeat per job

If the heartbeat is set per job, I think the Redis data structure will be simple to reason about. For each Verk queue we'd have two sets:

When a job is enqueued it's pushed to the queue set. The Worker/QueueManager uses RPOPLPUSH (or a Lua script) to atomically move the item from the queue set to inprogress set. It also attaches a heartbeat value to the job data in Redis, defined as now + hearbeat_interval.

When the job finishes successfully it's removed from the inprogress set.

If it fails, and we want to retry, it's atomically removed from inprogress and added back to queue. If we don't want to retry I guess we just remove it from inprogress and report the error (whatever Verk already does).

The Worker/QueueManager keeps in its state a list of all the jobs it has in progress. When a job that it own's hearbeat is about to expire, it updates its hearbeat value in the inprogress set to now + hearbeat_interval. (note: we have to make sure that there's enough time to update heartbeat for all its jobs before any of them expires...)

All Worker/QueueManagers periodically (e.g. every 60 seconds, could be configured, too) scans the inprogress set. If a job's heartbeat has expired, then we assume that its worker node is gone, and we atomically remove it from inprogress and add it back to queue. My Redis/Lua-fu is not strong enough to say off-hand how this would be done, but I imagine there's a way to do this efficiently enough.

Apologies if I recite something that Verk already does, call things by different names or otherwise don't make sense - I do not know the codebase that well :)

Self-destructing if updating heartbeat fails (after a few retries) sounds reasonable.

Heartbeat per worker node

Another option, as @myobie mentioned, would be to have a single heartbeat for the node. I'm a little unsure about how nodes then discover that other nodes' hearbeats have expired. How do they discover each other? And if they do, how do they selectively re-queue all jobs from that worker node?

edgurgel commented 6 years ago

Heartbeat per worker node

I'm keen to investigate this first as it's more generic and it can handle all queues related to a node etc. I'm probably going to hack something this weekend so I can explain my idea after trying it out :)

Thanks everyone for this discussion. This is really really helpful.

myobie commented 6 years ago

@sebastianseilund sorry to be a few days responding.

What I mean by "connection" is treating the three redis connections as one unit. If one fails, then they all must fail. Then I think you can say a worker has a "connection" and that connection has a heartbeat.

@edgurgel I am not as available as I'd like over the next two weeks, but if you have something you want to get feedback on then include me (and I assume everyone else on the thread) and I will make time to review it. Thanks for doing the hard work (:

edgurgel commented 6 years ago

Hey team here is my first stab at solving this issue: https://github.com/edgurgel/verk/pull/159/files

The idea is:

(frequency) = 60 seconds ?

We may need to review some edge cases like what if we still have unfinished jobs while removing a queue from the list of running queues etc but I will work on them case by case

I need to review this as clearly it's just a stab at the final solution. I've played with some instances running locally and so far so good.

sebastianseilund commented 6 years ago

Sounds very reasonable to me. Thank you so much for spending time on it! Let me know if there's anything I can do to help or review.

sebastianseilund commented 6 years ago

For anyone else using Verk in Kubernetes: It actually works quite well to use a StatefulSet and use environment variables to name the node_ids, as long as you don't need to scale the number of workers up/down a lot.

Define your worker like this:

apiVersion: v1
kind: Service
metadata:
  name: my-worker-svc
  labels:
    app: my-worker-svc
spec:
  clusterIP: None
  selector:
    app: my-worker-svc

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: my-worker
  labels:
    app: my-worker
spec:
  selector:
    matchLabels:
      app: my-worker
  serviceName: my-worker-svc
  # We run two workers in this example
  replicas: 2
  # The workers don't depend on each other, so we can use Parallel pod management
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: my-worker
    spec:
      # This should probably match up with the setting you used for Verk's :shutdown_timeout
      terminationGracePeriodSeconds: 30
      containers:
        - name: my-worker
          image: my-repo/my-worker
          env:
            - name: VERK_NODE_ID
              valueFrom:
                fieldRef:
                  fieldPath: metadata.labels['statefulset.kubernetes.io/pod-name']

Kubernetes will then make sure to always have spec.replicas number of pods running. Each will get a label, statefulset.kubernetes.io/pod-name, with value my-worker-{n} (e.g. my-worker-0), which we expose to the container as the VERK_NODE_ID environment variable using a fieldRef.

In your config/prod.exs:

config :verk,
  node_id: {:system, "VERK_NODE_ID"}

@edgurgel if you'd like, I can submit a PR updating docs to reflect this? Not sure if it belongs in the readme, or if there should be another infrastructure docs file?

edgurgel commented 6 years ago

@sebastianseilund Thank you so much for sharing this! It would be amazing to have this part of the README 👍

Update: I'm still playing with the "resilient-verk" branch trying to find reasonable integration tests to cover this feature. Failure detection is a complex topic 😆

sebastianseilund commented 6 years ago

Done: https://github.com/edgurgel/verk/pull/169

edgurgel commented 5 years ago

Good news:

https://github.com/edgurgel/verk/releases/tag/v1.6.0

We can now optionally forget about Node ID handling and simply set the configuration

generate_node_id to true

This is experimental and optional for now but It should be the only option available once Verk 2.0 is released.

I close this ticket once I've added a section to the README.md explaining how it works etc.

edgurgel commented 5 years ago

https://github.com/edgurgel/verk/commit/3219d4e235d0452519421a64ada0e1cc1f3a1632