openfaas / nats-queue-worker

Queue-worker for OpenFaaS with NATS Streaming
https://docs.openfaas.com/reference/async/
MIT License
129 stars 59 forks source link

Durable subscriptions? #75

Closed bmcustodio closed 4 years ago

bmcustodio commented 4 years ago

Expected Behaviour

Invocations made while no workers are running are eventually performed (i.e. after there's at least one worker running again).

Current Behaviour

All but the very last invocation made while no workers are running is actually performed.

Possible Solution

Support durable queue subscriptions, which should be just a matter of setting durable to a non-emptry value.

Steps to Reproduce (for bugs)

  1. Run one or more instances of the worker.
  2. Invoke a function asynchronously, and observe it being invoked.
  3. Scale the number of workers down to 0.
  4. Wait for some time, and observe (via the NATS monitoring endpoint) that all subscriptions have been removed.
  5. Invoke a function asunchronously two or more times.
  6. Scale the number of workers back to 1.
  7. Observe that the function is invoked only once, and with whatever values it was invoked the last.

Context

N/A

Your Environment

N/A

Kubernetes (OpenFaaS Operator)

N/A

N/A

bmcustodio commented 4 years ago
{
  "cluster_id": "nats-streaming",
  "server_id": "Txhzb7jApUfOeoxCwlJUZ8",
  "now": "2019-11-26T10:29:57.531347062Z",
  "offset": 0,
  "limit": 1024,
  "count": 1,
  "total": 1,
  "channels": [
    {
      "name": "faas-request",
      "msgs": 0,
      "bytes": 0,
      "first_seq": 0,
      "last_seq": 0,
      "subscriptions": [
        {
          "client_id": "faas-worker-queue-worker-64fd9c6fd8-r5f66",
          "inbox": "_INBOX.0vtp7J6Xw5mJ1OZDrpLBQO",
          "ack_inbox": "_INBOX.Txhzb7jApUfOeoxCwlJVEI",
          "queue_name": "faas",
          "is_durable": false,
          "is_offline": false,
          "max_inflight": 1,
          "ack_wait": 60,
          "last_sent": 0,
          "pending_count": 0,
          "is_stalled": false
        },
        {
          "client_id": "faas-worker-queue-worker-64fd9c6fd8-7ftlw",
          "inbox": "_INBOX.2SAQlk2AhgdfKx6xHmMfXV",
          "ack_inbox": "_INBOX.Txhzb7jApUfOeoxCwlJVJu",
          "queue_name": "faas",
          "is_durable": false,
          "is_offline": false,
          "max_inflight": 1,
          "ack_wait": 60,
          "last_sent": 0,
          "pending_count": 0,
          "is_stalled": false
        }
      ]
    }
  ]
}
alexellis commented 4 years ago

Can you edit and fill out the issue template, thank you 🙏

alexellis commented 4 years ago

@kozlovic do you remember how we landed on this configuration? Was it to do with not using a statefulset and the pod name changing, or to do with the default in-memory store?

alexellis commented 4 years ago

Thanks for your PR @bmcstdio - does this work only in HA / with database-backed storage, or does it also work with the in-memory single replica of NATS Streaming in the OpenFaaS helm chart?

bmcustodio commented 4 years ago

@alexellis it works with whatever configuration you may have, subject to the (well...) durability guarantees it provides in case of a (NATS Streaming) server crash.

kozlovic commented 4 years ago

@alexellis Not sure what you are asking me. Blame of main.go shows that you had durable variable and passing it as an option to the QueueSubscribe(), but durable was never set to anything. If it would not be a queue subscription, then you would not be able to have more than one durable subscription with the same durable name and channel name. But with queue, you can have a durable queue group with many members.

bmcustodio commented 4 years ago

@alexellis can we get #76 merged please? 🙏

bmcustodio commented 4 years ago

76 has been merged, so I am closing this.