Closed Harper04 closed 4 years ago
The concurrency limit is per-queue per-node, not global across all connected nodes. That would require coordination between each node in order to negotiate queue limits. To achieve global limits nodes would need to coordinate on start up, shut down, crashes, scaling, etc.
So please consider this either as a documention issue or feature request.
Considering the issues stated above I'm leaning heavily toward documentation about this. I'm curious what other people think about globally enforced queue limits. I've been asked about it a few times, but I haven't heard many use cases.
We should be able to address some of the issues that lead you to discover this though. If the crontab scheduler isn't working correctly that is definitely a bug. The scheduler uses uniqueness to schedule distinct jobs. Do you happen to have custom unique settings in the worker you are scheduling?
No, but the crontab scheduler only seems to use period. https://github.com/sorentwo/oban/blob/master/lib/oban/crontab/scheduler.ex So if a job is still running the next one will happily scheduled and executed on the same or another node.
No, but the crontab scheduler only seems to use period.
Yes, scheduler call specifies a period and that is merged with the default fields and states. This is the inspected unique
value from the worker.new(args, opts)
call in scheduler.ex
:
unique: %{
fields: [:args, :queue, :worker],
period: 59,
states: [:available, :scheduled, :executing, :retryable, :completed]
},
So far I haven't been able to recreate the situation with cron jobs that you are describing. Do you have any more details (the crontab itself, or part of the worker) that you can share? Even better would be an example or test case that reproduces the behavior.
No Problem!, i condensed the edge case to this:
[queues: [default: 10], prune: :disabled, crontab: [{"* * * * *", XXX.Task.Dummy}]]
defmodule Tide.Task.Dummy do
use Oban.Worker, max_attempts: 1
@impl Oban.Worker
def perform(_,_) do
IO.puts("i am a dummy start")
:timer.sleep(90000)
IO.puts("i am a dummy end")
end
end
This gives me:
➜ xxx git:(master) ✗ iex -S mix
i am a dummy start
i am a dummy start
i am a dummy end
i am a dummy start
iex(4)> Oban.Job |> Repo.all() |> Enum.filter(fn s -> s.state != "completed" end) |> Enum.map(&Map.take(&1, [:state, :worker, :queue]))
[
%{queue: "default", state: "executing", worker: "Tide.Task.Dummy"},
%{queue: "default", state: "executing", worker: "Tide.Task.Dummy"}
]
iex(5)>
If i am correct all features must match to consider a job unique. After 59 seconds the period does not match anymore. For my workaround i am using:
@unique_opts [
period: 60 * 60 * 24 * 365 * 99,
fields: [:queue, :worker],
states: [:available, :scheduled, :executing]
]
The scheduler has his own queue; this way i still can enqueue tasks manually and keep the scheduler working. (This is more inefficient because i just try to schedule every cronjob every second
Now I understand the situation! Thanks for the detailed example. Overriding the unique period and states is exactly the workaround I would suggest (and is why the scheduler uses put_new
for the :unique
key).
The following setup does what you're aiming for:
Oban.Test.Repo.start_link()
defmodule Oban.Dummy do
use Oban.Worker, max_attempts: 1
@impl Oban.Worker
def perform(_, _) do
IO.puts("i am a dummy start")
:timer.sleep(90_000)
IO.puts("i am a dummy end")
end
end
unique_opts = [
period: 60 * 60 * 24 * 365 * 99,
states: [:available, :scheduled, :executing]
]
Oban.start_link(
repo: Oban.Test.Repo,
queues: [default: 10],
crontab: [{"* * * * *", Oban.Dummy, args: %{scheduled: true}, unique: unique_opts}]
)
The output looks like this:
{:ok, #PID<0.268.0>}
i am a dummy start
i am a dummy end
i am a dummy start
i am a dummy end
There are a couple things in there to make it work:
unique
opts are put in the directly in the crontab definitionargs
, which allows you to use the the worker from the crontab scheduler and from other locationsI'll update the caveats section of the docs to include this situation and the workaround.
Thanks a lot!
Hi @sorentwo , first—Oban looks fantastic! Thanks for all your incredible work. We're considering migrating from Exq to Oban and I'm tyring to do my homework first. Your interface around unique jobs is amazing! (You'll see why that matters below.) In case it's helpful, I thought I'd provide a use case (and a theoretical, but fairly clunky solution) for a globally enforced queue limit. You wrote:
Considering the issues stated above I'm leaning heavily toward documentation about this. I'm curious what other people think about globally enforced queue limits. I've been asked about it a few times, but I haven't heard many use cases.
Our use case: OpenFn provides task automation and data integration for non-profits. Basically, we listen for HTTP requests to particular URLs and then execute snippets of our client's javascript in isolated VMs. Those snippets of javascript most often do things like upserting patient records to a database, sending a payment via mobile money, or adding a contact to a CRM. On occaision, our clients have issues with their "destination systems" because of API limits or other resource constraints and they want to be able to dial down the concurrency with which we process their payloads. E.g., if we receive 1000 independent HTTP requests in a minute (and they can't be lumped together into a bulk operation) the client might want us to only make one request at a time to their destination DB. I know this is a bit arbitrary, but it's actually come up a lot for us. Not hard at all with a single node, but running lots of nodes on Kubernetes it's really tricky to control.
Unless there was some built-in global concurrency setting, I'd imagine that we'd solve this by:
(1/new_number_of_nodes_including_me)
of queues from each existing node. (So if 3 existing nodes were subscribed to 12 queues each, we'd "take" 3 nodes from each: unsubscribing node A from those queues [10,11,12]
, ubsubscribing node B from queues [22, 23, 24]
, etc., and subscribing to those 9 queues ourself [10,11,12,22,23,24,34,35,36]
, leaving 4 total nodes with 9 queues each.)configurable_x
seconds to let other nodes know that the queue still had a single living subscriber.To acheive this the way we do with Exq, we'd need to be able to (at runtime) have a particular node "subscribe" or "unsubscribe" to a particular queue. Is that possible? I thought that start_queue/3
might be the right idea, but that seems to add a queue to all nodes.
Is there something like Exq.subscribe/2 that might help us accomplish this global concurrency limitation? (Also note that we've got a single queue _per_user, they can be created dynamically, and users can dial up and down the concurrency on the fly.)
@taylordowns2000 Sorry to leave this hanging for so long, there has been a lot in flux lately. I understand your situation and this is an issue that comes up often.
I propose adding a local_only
option to start_queue/3
so that apps can manually start a queue on a single node. It will require your application to coordinate nodes and handle starting the queue, but it makes it possible.
Global queue limits are in the longer roadmap, but require changes to how queues are defined and may be more of an optional "plugin".
No worries on the delay—yes, this would work perfectly. Also very useful for manual debugging... sometimes we want to stop a queue across all nodes at once, and then start it up on a single node with limited concurrency.
Hi @sorentwo , I've just got the green light to implement this but was planning on using the oban_beats
table to do so. Am I right in understanding that it's now been removed?
Is there any way to see which nodes are subscribed to which queues?
Thanks!
@taylordowns2000 That's correct, mostly. Support for publishing oban_beats
was moved into the Lifeline Plugin in Oban Pro. Global queue limits are still on the roadmap, but it will be a Pro feature as well.
OK, understood. Thanks for the response.
On Mon, Sep 28, 2020 at 7:32 PM Parker Selbert notifications@github.com wrote:
@taylordowns2000 https://github.com/taylordowns2000 That's correct, mostly. Support for publishing oban_beats was moved into the Lifeline Plugin https://hexdocs.pm/oban/lifeline.html#content in Oban Pro. Global queue limits are still on the roadmap, but it will be a Pro feature as well.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/sorentwo/oban/issues/114#issuecomment-700207991, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACCUBLNSLCBVBXGYRADEXJDSIDJEBANCNFSM4JSQIZQQ .
-- Taylor Downs | Head of Product Open Function Group https://www.openfn.org/ | How Data Integration Makes ICT4D Work https://youtu.be/GVB9voGp6T4
Hello and thanks for this awesome library!
Some Background: Beside the seemingly intended use case of one/off tasks (mail sending / async media convert etc.) for oban we try to use oban for intervalled/crontab maintenance jobs like imports.
For this we dont care about how many jobs are scheduled but we require that only one Oban.Worker of a kind is executing at any point in time for all connected nodes.
We tried using the crontab feature but noticed that the scheduler doesnt care if a job is still running at the next scheduled time and starts the new one (this is common crontab behaviour so no bug for me).
We also tried using the uniqueness feature (available, scheduled) scheduling the next task as the first step for a job. That didnt work either for jobs running longer than the configured intervall.
Our workaround is an external scheduler using available,scheduled,executing uniqueness.
Okay now why am i filing a bug report? Because we also tried using queues setting concurrency to 1 and got really surprised!
Environment
elixir --version
) Erlang/OTP 22 [erts-10.5.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace] Elixir 1.9.2 (compiled with Erlang/OTP 22)Current Behavior
How to reproduce:
queues: [default: 1]
iex -S mix
(connected to the same database but no pg2 connection as oban does not rely on this; right?)Oban.Job |> Repo.all() |> Enum.filter(fn s -> s.state != "completed" end) |> Enum.map(&Map.take(&1, [:state, :worker]))
Expected Behavior
Enforce the concurrency limit across all nodes connected to the database.
Rereading the documentation i see no mention about obans behaviour in a distributed case (queues per node or globally). Because of all the praise, enterprise feature talk and use of database triggers etc. it was implied to me that i get the same guarantees for one or multiple nodes.
So please consider this either as a documention issue or feature request.
regards