openfaas / nats-queue-worker

Queue-worker for OpenFaaS with NATS Streaming
https://docs.openfaas.com/reference/async/
MIT License
129 stars 59 forks source link

Per-function ack_wait #115

Closed kevin-lindsay-1 closed 1 year ago

kevin-lindsay-1 commented 3 years ago

In a previous conversion @alexellis and I discussed some items related to the queue worker, one of which being to verify whether or not the queue worker ack_waits for multiple functions using 1 "global" setting, or on a per-function basis.

Expected Behaviour

When discussing multiple functions being listened to at the same time on a single queue worker, we discussed potentially preferred behavior, in the effort to have a single queue worker, and have it autoscale to meet demand, rather than having a static replica count and different wait times per queue.

Given the following:

We assume a kubernetes environment or environment with a similar orchestration layer and pattern to kubernetes, and we assume the event triggering the pod is a graceful shutdown command, such as a Node draining for maintenance and scheduling resources on a different Node.

Expecting events with rough timing; the sections in the format [duration] are the general timings from the start of this example timeline

Current Behaviour

An example of this timing with the same settings and format as above, functional (non-timing) differences in bold italics:

The major differences from the above:

Possible Solution

As we had discussed previously, it would likely be advantageous to have different ack_wait times per function, and instead have a single queue worker that simply has no ack_wait of its own, and rather only knows about a graceful shutdown duration, which the user would have to configure in advance, knowing the ack_wait of their environment's longest running function.

The difference in this implementation would likely be to have multiple subscriptions with different AckWait periods in the queue worker, which may require more channels, rather than the current implementation, which only listens to 1 channel (based on what I see in the environment variables, specifically the variable faas_nats_channel).

The issue

The big wrench in this discussion is the max_inflight for a particular function's queue. For example, let's say I have a function with a concurrency limit of 100 (watchdog.max_inflight) and a maximum pod count of 10 (com.openfaas.scale.max). From those values, you can presume that that function's queue should not have more than 1000 (queueWorker.max_inflight) being attempted at once, because otherwise you'd be trying to send invocations to a function that would not be able to handle the request because all pods are busy.

The questions that occurs to me which effectively prevent this solution from actually working as expected is:

how does a queue worker know how many maximum in-flight invocations it should be able to send to a function?

I would say that this could be calculated by watchdog.max_inflight * com.openfaas.scale.max. The queue worker would then potentially not need its own max_inflight, and instead be able to be autoscaled based on cpu/memory or a custom metric.

how does a queue worker know how many are already in-flight by other replicas?

I would say that this doesn't have to be perfectly immediate between pods, and you could potentially accomplish this an external lookup (metrics or some such), and as long as it's able to prevent an endless flood of 429s, it should be fine with not being immediate (at least, for the pro queue worker, which can retry on 429s from pods).

Steps to Reproduce (for bugs)

Context

Your Environment