openfaas / nats-queue-worker

Queue-worker for OpenFaaS with NATS Streaming
https://docs.openfaas.com/reference/async/
MIT License
129 stars 59 forks source link

max_inflight env var is ignored #57

Closed avielb closed 5 years ago

avielb commented 5 years ago

Hey, I have an OpenFaaS cluster setup and running with k8s and I am using the async function invocation. As it is written in the docs (https://docs.openfaas.com/deployment/troubleshooting/) it is possible to configure parallel executions for each queue worker.

As it is written in https://github.com/openfaas/nats-queue-worker/blob/master/readconfig.go, there is an environment variable max_inflight to take care of a parallel amount of tasks (correct me if I am mistaken). I have set it using the helm chart as it is here: https://github.com/avielb/faas-netes/commit/878a6b9ddb2eb2d94c2aafb861d319b4dc778062

I can confirm seeing the variable set in the container but still when I execute the 3 async functions in a row they are executed one by one and not in parallel.

Expected Behaviour

Async function to be executed in parallel with 1 queue worker.

Current Behaviour

It is done serially one by one ignoring max_inflight env var.

Possible Solution

Steps to Reproduce (for bugs)

  1. Deploy k8s with Helm
  2. Deploy the helm chart using my fork: https://github.com/avielb/faas-netes/ cd chart/openfaas && helm install --debug ./ -f values.yaml -n openfaas
  3. Deploy a long-running function

Context

Your Environment

avielb commented 5 years ago

help? anyone?

alexellis commented 5 years ago

Hi @avielb, thanks for your interest in OpenFaaS.

OpenFaaS has 1300 members in the Slack community and is operated for free on a voluntary basis by the community which means you won't always get an answer immediately. This does not mean that you are being ignored.

Please show some of your workings / console output that shows the max_inflight variable is not being used by NATS Streaming?

In addition to expanding max_inflight you'll also need to increase the max timeout window in ack_wait such that ack_wait is ack_wait / max_inflight. So if the max timeout is 1m and you move from 1 to 2 in-flight you'll need to increase the timeout to 2m.

Also I am not sure why you've forked faas-netes? Can you confirm the contents of values.yml?

Alex

burtonr commented 5 years ago

I've confirmed this is also the case with Swarm... I think it may be related to the handler/nats_queue.go here

My settings:

queue-worker

    queue-worker:
        image: openfaas/queue-worker:0.7.1
        networks:
            - functions
        environment:
            max_inflight: "5"
            ack_wait: "5m5s" # Max duration of any async task / request
            basic_auth: "${BASIC_AUTH:-true}"
            secret_mount_path: "/run/secrets/"

To test, I created a node function as follows:

module.exports = (context, callback) => {
    setTimeout(() => {
        callback(undefined, {status: JSON.stringify(context)});
    }, 60000)
}

and the stack.yml used:

functions:
  test-inflight:
    lang: node
    handler: ./test-inflight
    image: test-inflight:latest
    environment:
      write_timeout: 1m30s
      read_timeout: 1m30s

Then, executed the following (added the query string so it was easier to see in the logs which test was executing):

curl -X POST -d "test 1" http://127.0.0.1:8080/async-function/test-inflight?test=test-1 \
  curl -X POST -d "test 2" http://127.0.0.1:8080/async-function/test-inflight?test=test-2 \
  curl -X POST -d "test 3" http://127.0.0.1:8080/async-function/test-inflight?test=test-3 \
  curl -X POST -d "test 4" http://127.0.0.1:8080/async-function/test-inflight?test=test-4 \
  curl -X POST -d "test 5" http://127.0.0.1:8080/async-function/test-inflight?test=test-5

Here is the func_queue-worker logs during that run (cleaned a little for clarity):

func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#60] Received on [faas-request]: 'sequence:38 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"cc057564-8b52-4cc6-8de1-6c1fbde2e263\"],\"X-Start-Time\":[\"1551758004322162308\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-1\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004323487744 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#61] Received on [faas-request]: 'sequence:39 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"791de82a-fe2c-47bf-aa8c-2db30e8fb446\"],\"X-Start-Time\":[\"1551758004452033618\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-2\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004452323780 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#62] Received on [faas-request]: 'sequence:40 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"9ae9eabe-70dd-48fe-8342-a50f6fcfc693\"],\"X-Start-Time\":[\"1551758004535878496\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-3\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004536455507 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#63] Received on [faas-request]: 'sequence:41 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"50fbc064-6c32-476b-a1cc-cc8340298526\"],\"X-Start-Time\":[\"1551758004619819782\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-4\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004620498231 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#64] Received on [faas-request]: 'sequence:42 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"be0ea050-58e4-4203-a161-5da9d56c8aba\"],\"X-Start-Time\":[\"1551758004703327458\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-5\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004703918210 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.

you can see that the timestamps are about a minute apart indicating that the queue was being processed sequentially.

Scaling the func_queue-worker to more replicas did result in multiple async functions executing in parallel, however, the max_inflight parameter had no effect on how many were being executed (as expected. Each replica would handle 1 request)

alexellis commented 5 years ago

@burtonr thanks for taking a detailed look at this. Did you see any configuration issues?

Looking at git blame - these files were heavily modified by @bartsmykla for the NATS reconnecting piece. Could there have been a regression introduced?

Alex

alexellis commented 5 years ago

I have spent a couple of hours on this and can reproduce the issue too.

@avielb can you try going back to an earlier queue worker and see if the issue still persists?

I'll ping @kozlovic

A few things I've tried:

    nc, err := stan.Connect(
        q.clusterID,
        q.clientID,
        stan.NatsURL(q.natsURL),
        stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
            log.Printf("Disconnected from %s\n", q.natsURL)

            q.reconnect()
        }),

        stan.PubAckWait(q.ackWait),
        stan.MaxPubAcksInflight(q.maxInFlight),
    )
    subscription, err := q.conn.QueueSubscribe(
        q.subject,
        q.qgroup,
        q.messageHandler,
        stan.DurableName(q.durable),
        stan.AckWait(q.ackWait),
        q.startOption,
        stan.MaxInflight(q.maxInFlight),
    )

Alex

alexellis commented 5 years ago

These are the changes that I've tried, but this didn't seem to resolve the issue. https://github.com/openfaas/nats-queue-worker/pull/58/files

kozlovic commented 5 years ago

@alexellis To be clear, a single subscription has always and will always be invoked with a single message at a time. The MaxInflight simply means that the server can send to the library up to that number of messages without receiving an ACK, but the callback will be invoked with 1 message, then when callback returns, present the 2nd message (which will likely be already in the library), etc.. You can always process a message in a go routine if you want (but then all your ordering goes out the window) and need to be mindful of ack because with auto-ack (default behavior), the ACK is sent by the library when the callback returns.

burtonr commented 5 years ago

@alexellis What's the plan here? Should we update the queue-worker to use a go routine so allow for processing multiple messages asynchronously (with the understanding that there is no guarantee of order)?

or do we update our documentation stating that if you want multiple long-running tasks to be performed at the same time, you must scale the queue-worker service to the number of functions to invoke at a time?

or do we add auto-scaling to the queue-worker (somehow) to automatically scale the service to the number of requests/messages if the current queue workers are waiting?

I'd be willing to take on whichever solution you think would be best, or at least give them a try.

alexellis commented 5 years ago

This is surprising because I'm sure this is the behaviour explained to me by Peter or Brian when they were with the NATS team. I think I even saw it working this way with our code.

If whatever changes we've made have caused a regression in behaviour, or if it actually never worked that way then we should just update the documentation to state parallelism x N = queue-worker x N.

kozlovic commented 5 years ago

@alexellis I can tell you that in NATS (and streaming), a single subscription will always dispatch messages one at a time. It has always been the case, and will probably always be. Most users want to have messages coming on a given subscription to be processed in order. If the library was dispatching in different go routines for every message on a subscription, ordering would not be possible.

burtonr commented 5 years ago

The OpenFaaS docs have been updated to explain that queue-worker needs to scale to match the expected parallel invocations. There is now a new page Async in the Reference section.

I'll close this issue, but feel free to continue to comment

burtonr commented 5 years ago

Derek close

avielb commented 5 years ago

@alexellis can you tell what version exactly? I have tried it also with 0.5.4 and still the same behavior. @burtonr that means that OpenFaaS queue workers won't be able to do more than 1 task at the time?

burtonr commented 5 years ago

@avielb That is correct. The nats-streaming will process the requests in the order they are received. If you need tasks executed in parallel, you can scale the nats-queue-worker service up to match the amount of tasks you want to be executed in parallel.

This is a feature of nats-streaming so that the order of execution will be maintained. The functions will still be invoked asynchronously though.

The OpenFaaS docs have been updated to explain this here: https://docs.openfaas.com/reference/async/