nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.92k stars 1.41k forks source link

Multiple Consumer Processes don't run in parallel but sequentially #4389

Closed robin-coac closed 7 months ago

robin-coac commented 1 year ago

Problem

I am trying to use nats for job queuing (producer-consumer problem) where a time consuming task is distributed randomly across multiple consumer.

I have two scripts.

  1. Code to populate the task-queue : producer.py
    
    import nats
    from nats.js.api import RetentionPolicy

NATS_HOST = '0.0.0.0' NATS_PORT = '4222' DOCUMENT_EXT_SUBJECT = "file_process"

nc = await nats.connect(servers=f"nats://{NATS_HOST}:{NATS_PORT}") js = nc.jetstream() await js.add_stream( name="sample-stream", subjects=[DOCUMENT_EXT_SUBJECT], retention=RetentionPolicy.WORK_QUEUE, )

for i in range(20): ack = await js.publish( subject=DOCUMENT_EXT_SUBJECT, payload=json.dumps( { "some_load" : "lsome_load", }).encode() )


2. Consumer Script : `consumer.py`

import asyncio import signal import sys import logging

import nats from nats.js.api import RetentionPolicy, ConsumerConfig, AckPolicy

consumer_config = ConsumerConfig( ack_wait=900, max_deliver=1, max_ack_pending=1, ack_policy=AckPolicy.EXPLICIT )

Nats

NATS_HOST = '0.0.0.0' NATS_PORT = '4222' DOCUMENT_EXT_SUBJECT = "file_process"

MAX_RECONNECT_ATTEMPTS = 10

A cpu bound task

def time_consuming_task(): import time; time.sleep(50) return

async def task_cb(msg): time_consuming_task() await msg.ack() print("acknowledged document-extraction !")

async def run():

logging.info("Started Document processing Consumer ...")

async def error_cb(e):
    sys.exit()

async def disconnected_cb():
    logging.info(f"Got disconnected from NATS server .. Retrying .")

async def reconnected_cb():
    logging.info("Got reconnected...")

nc = await nats.connect(
    servers=f"nats://{NATS_HOST}:{NATS_PORT}",
    error_cb=error_cb,
    reconnected_cb=reconnected_cb,
    disconnected_cb=disconnected_cb,
    max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS,
)

# Create JetStream context.
js = nc.jetstream()
## PERSIST ON THIS SUBJECT
await js.add_stream(
    subjects=[DOCUMENT_EXT_SUBJECT],
    name="sample-stream", 
    ## Extra
    retention=RetentionPolicy.WORK_QUEUE,
    )

await js.subscribe(
    DOCUMENT_EXT_SUBJECT, 
    stream="sample-stream",
    queue = "worker_queue", # also knowas as "deliver group". In core nats, it's called "queue group".
    cb=task_cb,
    manual_ack=True,
    config=consumer_config,
    )

def signal_handler():
    sys.exit()
for sig in ('SIGINT', 'SIGTERM'):
    asyncio.get_running_loop().add_signal_handler(getattr(signal, sig), signal_handler)

await nc.flush()
logging.info("Done ... ?")

if name == 'main': loop = asyncio.get_event_loop() try: loop.run_until_complete(run()) loop.run_forever() except Exception as e: print("Got error : ", e) finally: loop.close()



#### Expected vs Actual result:
I populate the queue using `producer.py`
Then I run `consumer.py` script in multple terminal as separate processes.

What's happening is, things don't happen in parallel.

Although there are 3 processes to consume message, when a task is being processed inside one of the consumer process, nats server doesn't push any task to other 2 consumer. Everything happens one after another.

Am I doing something wrong ? My initial guess was nats doesn't support blocking tasks. But even when I replace `time.sleep(50)` with `asyncio.sleep()`, I get the same output.

#### NATS Client and Server 
Server : Official Docker image ran as : 
`docker run --name nats --network nats --rm -p 4222:4222 -p 8222:8222 nats --http_port 8222`
Client : A python client `nats-py` version 2.3.1

#### OS/Container environment:
PRETTY_NAME="Ubuntu 22.04.2 LTS"
NAME="Ubuntu"
VERSION_ID="22.04"
VERSION="22.04.2 LTS (Jammy Jellyfish)"
VERSION_CODENAME=jammy
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=jammy
robin-coac commented 1 year ago

Further insights into the problem, hacky solution

1. The Problem Scenario

I intend to use nats as celery+rabbitmq replacement. I want to deploy things in kubernetes, where each container will deal with processing a single file that can take upto 20 mins. Thus I kept max_ack_pending to 1. I would rather scale pods horizontally based on message queue size.

2. Setting max_ack_pending > 1 (say 20) in above consumer code, I get a partial parallelism.

Say If I were running consumer.py in 4 separate terminals, two of the process would pick up task. And after they were done, rest of two would pick up task and original two processes just stay idle. And so forth in an alternate fashion. Which I don't think should be the case. By the way, There were 1000 messages in the queue.

3. This is my hacky solution using pull-subsriber approach.

Even here, Noticed a a major issue. Suppose I have 1000 messages in a freshly created stream/queue. Then I start multiple consumer processes in 3 terminals. Now If I abruptly close my consumer processes (Ctrl+C), before the process can do acknowledgement to the nats-server, something strange happens. When I restart start my consumer processes now, they cannot fetch any message from the server. I get Timeout Error while fetching.

The only solution that worked was doing await msg.ack_sync() before I start processing my message as shown below. Even using await msg.ack() doesn't work. This is not a good solution because I can still loose my message due to some exception while processing.

import asyncio
import signal
import sys
import logging.config

logging.config.fileConfig("logging.conf", disable_existing_loggers=False)

import nats
from nats.js.api import RetentionPolicy, ConsumerConfig, AckPolicy

consumer_config = ConsumerConfig(
    ack_wait=900,
    max_deliver=1, 
    max_ack_pending=1, 
    ack_policy=AckPolicy.EXPLICIT,
) 

# Nats 
NATS_HOST = '0.0.0.0'
NATS_PORT = '4222'

DOCUMENT_EXT_SUBJECT = "file_process"
MAX_RECONNECT_ATTEMPTS = 10 

## A cpu bound task
def time_consuming_task():
    import time; time.sleep(50)
    return   

async def task_cb(msg):
    time_consuming_task()

from nats.errors import TimeoutError

async def run():

    logging.info("Started Document processing Consumer ...")

    nc = await nats.connect(
        servers=f"nats://{NATS_HOST}:{NATS_PORT}",
        max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS,
    )

    # Create JetStream context.
    js = nc.jetstream()
    ## PERSIST ON THIS SUBJECT
    await js.add_stream(
        subjects=[DOCUMENT_EXT_SUBJECT],
        name="sample-stream", 
        ## Extra
        retention=RetentionPolicy.WORK_QUEUE,
        ) 

    psub = await js.pull_subscribe(
        subject=DOCUMENT_EXT_SUBJECT,
        stream="sample-stream",
        durable="worker",
        config=consumer_config,
    )

    ## Keep pulling from Server Every 2 secs.
    async def constant_fetch():
        while True:
            try:
                await asyncio.sleep(2)
                msgs = await psub.fetch(1, timeout=5)
                for msg in msgs:
                    # This is a life saver. Idk what it does. 
                    await msg.ack_sync()
                    await document_extraction_cb(msg)
            except TimeoutError:
                print("fetch timed out . Retrying")
                pass
    await asyncio.get_event_loop().create_task( constant_fetch() )

    def signal_handler():
        sys.exit()

    for sig in ('SIGINT', 'SIGTERM'):
        asyncio.get_running_loop().add_signal_handler(getattr(signal, sig), signal_handler)

    logging.info("Done Extracting .. ?")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.create_task(run())
        loop.run_forever()
    except Exception as e:
        print("Got error : ", e)
    finally:
        loop.close()

Extra Notes :

I suspected one reason for this was my file-processing being synchronous. That why I tried doing this. But didn't work as well in both push and pull examples.

async def task_cb(msg):
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
            future = await asyncio.get_running_loop().run_in_executor(
                pool, time_consuming_task
            )
            result = await asyncio.gather(*future)
ripienaar commented 1 year ago

Using a pull consumer isn’t a hack. It’s a right thing to do.

You set ack wait and max ack outstanding so suspect when you had the period of timeouts it was because all (max ack outstanding) was waiting on acks that can then retry after 900 seconds

output from consumer info at the time would help us confirm that hypothesis

robin-coac commented 1 year ago

@ripienaar Thanks for reply here as well apart from the slack.

Testing your hypothesis

I changed consumer settings in code to be like this :

Configuration:

        Durable Name: worker
           Pull Mode: true
      Filter Subject: file_process
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 20s
       Replay Policy: Instant
  Maximum Deliveries: 1
     Max Ack Pending: 10
   Max Waiting Pulls: 512

State:

   Last Delivered Message: Consumer sequence: 1 Stream sequence: 168 Last delivery: 1m8s ago
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 10
     Redelivered Messages: 1
     Unprocessed Messages: 0
            Waiting Pulls: 1 of maximum 512

With Ack-Wait : 20 sec and Max Ack Pending : 10 , I closed the consumer-process while message was being processed but before acknowledgement could be sent to nats-server.

Immediately after that, I restarted consumer-process. But nope, this consumer isn't fetching messages even after 20 mins gone by. I think it's some bug. Whether it's a client side bug or not, I don't know. Someone might test in go client and see.

Only way to prevent this from happening is to acknowledge message as soon as It's received using await msg.ack_sync() OR set max_deliver = 0. Even await msg.ack() doesn't work !

Additionally, this doesn't address the problem with why push subscriber didn't work properly even with keeping max_ack_pending to high value.

Also, Please clear this confusion

When I run same consumer-processor-script multiple times, and each process fetches from nats individually. But for nats-server, I believe nats-server thinks there's only one consumer right.
That's why with max_ack_pending to 1, I don't get parallel processing. It should be at least equal to number to scripts I run. I was initially thinking, for each script I run, nats-server will think a new consumer has joined.

Some Images

image

Stream info

Information for Stream sample-stream created 2023-08-11 09:15:26

             Subjects: file_process
             Replicas: 1
              Storage: File

Options:

            Retention: WorkQueue
     Acknowledgements: true
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false

Limits:

     Maximum Messages: unlimited
  Maximum Per Subject: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

State:

             Messages: 4
                Bytes: 524 B
             FirstSeq: 1 @ 2023-08-11T09:15:26 UTC
              LastSeq: 9 @ 2023-08-11T09:15:27 UTC
     Deleted Messages: 5
     Active Consumers: 1
   Number of Subjects: 1
ripienaar commented 1 year ago

Sounds like those consumers should be resuming yes,

The consumer is like a orchestrator for clients and tracks them and mediate between the stream. So when you create a consumer and connect many clients to it, those clients are controlled by the properties of teh consumer.

So if you set max outstanding to 1 on a consumer that is applied to all the clients connected to that consumer as a whole, you generally only do that if you need strict ordering, but anyway the consumer is a middleman/proxy for all clients that consume against it as a group.

For example, 1 client acks message 10, another is handling 11 and crashes, eventually the first will get message 11 on the redeliver - see all the clients associated with 1 consumer is a related group.

It could be that your script is each creating a consumer when they start - in which case they all consume subject to the properties of that consumer but then of course you will get the same message delivered to multiple clients

robin-coac commented 1 year ago

Thanks for taking your time for this explanation. Although the problem remains, I feel a lot clear in my head. At the end of the day, I owe to the community and as I understand things clearly, I will definitely add precisely documented examples in nats-py repo.

Just about this bit. Could you clear this up ?

it could be that your script is each creating a consumer when they start - in which case they all consume subject to the properties of that consumer but then of course you will get the same message delivered to multiple clients

I didn't get it. if you look at my stream info, Active Consumers: 1 is always 1, Even if 4 scripts are running.

ripienaar commented 1 year ago

I am just detailing what we often see and users often run into - accidental consumers.

In your case you are fine but it’s important background for you to keep in mind.

bruth commented 7 months ago

@robin-coac Given this issue is fairly stale at this point I am going to close. Feel free to re-open if not. Do note that the Python client is up to version 2.7.2 in case it is client-related.