Open MrJoy opened 5 years ago
Sidekiq Pro does something similar: it uses a small pool of fetch threads to pull jobs, so you might have 25 goroutines running jobs but only 2 goroutines fetching. Would you assume that all 25 can call ACK/FAIL, the issue is really blocking on the fetch?
I'm most interested in any of: profiling data, benchmarks that show the problem, tests that trigger problems and PRs that improve the situation. Do you have any data or PRs you can share?
My data gathering has been a bit less systematic than you'd like. I'll see about getting some more robust data for you.
I have a PR that is mostly put together, barring some tidying up -- and possible revisiting of the shutdown ordering.
Writing tests is perhaps a bit challenging, since pipelining isn't about correctness but latency/throughput. I'll see what I can come up with, though.
I debated adding a third knob for the ACK/FAIL side of things, but thus far have not done so. My PR basically takes the value for "fetcher" count, and uses that for "reporters" as well, so your connection pool winds up being 2X + 1
(X fetchers, X reporters, and 1 heartbeat connection). There's two channels involved: One going from fetcher -> worker, and one from worker -> reporter. Both are unbuffered, to provide backpressure so jobs don't linger in limbo any more than they have to.
I'm mostly sure the problem is only fetching, so perhaps it would be best to have the connection pool be fetchers + workers + 1, and let each worker report its own job status.
If reporting turns out to present problems as well, I could trivially add a third knob for reporters, which by default picks up whatever the value for fetchers is but that feels a bit like sandblasting a soup-cracker until I have more data showing it might be of value. Maybe I'll add it to the test barrel.
And this is absolutely not ready for a PR yet, but check out the changes on this branch of our experimental fork: https://github.com/ValiMail/faktory_worker_go/tree/feature/decouple_pool_size_from_worker_count
(The commit history is intensely sloppy, given the late-night-experiment nature of the work. I will of course rebase and tidy up before actually submitting anything.)
Ah, right I set up a dummy PR to make it easier to view the net changes: https://github.com/ValiMail/faktory_worker_go/pull/1
I did some rough benchmarks last night that involve having N listeners listening to three queues, and looking at the CPU load of Faktory and seeing how it changes as concurrency increases. This is with zero jobs in queues, and zero jobs being produced into queues. I.E. just overhead of fetching jobs.
I need to actually run the math on the samples I collected but eyeballing it:
My testing was hampered a little bit by me not knowing enough about the intricacies of macOS to be able to adequately tune kernel parameters. I was easily able to get around limitations on FDs/sockets/etc, but ran into a hard limit of 4096 connections via the unix-domain socket from Faktory to Redis. NFI which kernel param I need to tweak to resolve that.
I haven't been able to readily recreate the behavior of pathologically slow times to push jobs into Faktory when there's lots of workers listening. But then, everything about my test environment is different here and I don't really have a good path forward to test in a more apples-to-apples way. I'm tending to be pretty CPU-bound. There's definitely interference/cross-talk but based on how things behaved my gut says that reporting job results may be playing more of a role than I previously expected -- so it's probably not as simple as producing jobs vs. fetching them. I'm still looking at test design to try and isolate/illuminate any such effects.
For my tests, I compared the current Run()
method, and two variations of my approach. I reorganized things in a new branch so there's now a RunPipelined()
and RunPipelined2()
which are my original approach and your suggestion of having workers report results to Faktory themselves respectively. This reduces changes to runner.go
down to basically just some extra fields in the Manager
struct, and a couple extra steps in the Terminate
method. The extra fields just go unused in Run()
.
If I had to pick one of the two alternatives over the other, I think I still favor my approach of job reporting being constrained as well. It is slightly more complex, but has the benefit of allowing tighter controls on the number of sockets opened to the Faktory server. After all, it can only have 65,536 sockets open at once, in total -- and the closer we get to that threshold the more secondary problems we have (FIN_WAIT and related issues). I'd prefer not to have to shard Faktory for at least 6-9 months if I can avoid it.
For my testing setup involving actual job flows: Faktory running on my personal laptop, across my home WiFi, and my work laptop running a program to produce jobs, and another to consume them. The networking situation presents a bit of a roadblock to proper testing, so I need to see about reframing my tests a bit to work better on just my work laptop. I did some local-only testing, but I was doing jobs that sleep for 100ms which resulted in me swamping Faktory -- 250-300% CPU usage -- quite rapidly, regardless of which implementation I was using. I'm going to repeat that with longer-lived jobs tonight.
Using the current Run()
method: I can pretty easily do 1,000 jobs/sec with a worker pool of 2048 workers, and jobs that just sleep for 2 seconds. And that, of course, makes total sense.
The act of producing jobs did interfere with job throughput, somewhat. I.E. I have a screenshot somewhere (home laptop, I think) showing a very jagged line, peaking at 1,000/sec, then it suddenly stops being jagged and is right at 1,000/sec -- because job production stopped. I saw this effect both with everything colocated on my laptop, and 10 producer goroutines -- and with the dual-laptop configuration and a much larger number of producer goroutines. (I wound up with 1024 because the network situation was making it hard for me to produce jobs fast enough that the workers weren't just sitting idle.)
I can achieve the same throughput with 256 job-fetcher goroutines + 256 job-reporter goroutines. On the basis of "sockets are a precious resource" alone, that presents to me as a net-win -- at least for customers who have jobs that spend a long time in-flight. With longer job durations, the ratio can be even wider. I anticipate that for our situation I'll probably wind up with two pools of workers: One eating a queue of jobs that tend to have a 0.5-1s duration, and with a ratio of something like 256 fetchers / 2048 workers. Another that eats all* queues -- but the jobs from the other queues tend to be in the 30s-5m range. That pool might have a spread of more like 64 fetchers / 8192 workers.
I'll see about running more tests, and organizing my data for you tonight.
In the meantime, you can see my branch with more isolated changes here:
https://github.com/ValiMail/faktory_worker_go/pull/2
(* - I'm using strict queue ordering so I can establish back-pressure. The heavier jobs produce the lighter jobs as an output and I managed to run us out of heap when I switched to the priority approach because we couldn't consume the little jobs as fast as they were being produced when the big jobs got a chance to get executed. Oops. Random aside: A mechanism whereby I can have a worker that checks to see if queue X is empty, before trying to fetch and execute jobs from queue Y would be super handy for letting me tune workers for the demands of different job types while still preserving back-pressure. But I'm guessing that's a little out-of-scope for where you're aiming Faktory...)
Currently, the connections value for FWG is used in a way that guarantees each connection will sit and block waiting for a job, then dispatch the work in its corresponding goroutine. This approach is simple, and effective -- to a point. We've found that as we increase the number of connections to Faktory, the load on Redis (or within Faktory -- we haven't profiled it very close yet) becomes a problem with many connections all waiting for jobs to perform. In our situation, we have a concurrency of 64 per node (which leaves our nodes pretty massively underutilized at the moment), and 8 nodes. Somewhere between 26-32 nodes, the contention around waiting for jobs becomes a significant issue that impacts the ability of other processes to add jobs to the queue.
We'd really like to be able to move to a concurrency of more like 1,024 per node, because we have a workload amenable to very wide-fan-out, with low per-job resource consumption (it's mostly I/O bound waiting on external APIs).
It seems to me that coupling fetching concurrency with dispatch concurrency makes it hard to address what are naturally separate bottlenecks. The right concurrency level (both per-node and aggregate across all nodes) is a separate question from the resource-intensiveness and latency of individual jobs which is a separate question from the right number of processes to poll for jobs simultaneously to ensure that available work capacity isn't starved.
My prototyping suggests we could decouple the process of fetching jobs / reporting results from the process of dispatching them quite effectively, leading to a simple means to let people address these two needs separately while only adding one more (optional) knob for them to turn. For those of us with jobs that have a high latency (and perhaps low resource consumption), but come in large bursts that make having a high concurrency (both per-node if the per-job resource consumption is small and aggregate across all nodes when the per-job resource consumption is high).
There would be some added complexity in FWG, particularly around the shutdown process but my prototyping suggests this can be done in a way where the new knob is optional and with a relatively tightly compartmentalized increase in code complexity. This would make it largely transparent for most users, while providing an important capability for those of us pushing tens of millions of jobs a day and looking to increase that considerably.
Worst-case, we could move this into a separate run method in the same package, making the structural differences completely opt-in for those who have a need for such decoupling.
Thoughts?