airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.09k stars 161 forks source link

Bug: NATS queue subscribers workers consume the same message multiple times #1114

Closed Jonas1312 closed 10 months ago

Jonas1312 commented 10 months ago

Hi!

Describe the bug

I have some faststream workers that have to process messages from a subject through a subscription.

A message has to be processed only once, so I use a queue group for this purpose.

I also need late ack, in case the worker dies, in order to retry processing the message.

The problem is that I noticed some messages are processed several times, and some messages are delivered late.

I'm not sure if it's a bug from my code, faststream or NATS...

How to reproduce

Code is in the zip file attached. faststream nats bug.zip

To run the NATS server:

docker compose up nats_js_test

To watch the subjects:

nats --server localhost:4222 subscribe "mysubject.*"

To run faststream workers:

faststream run mre_app:app --workers 15

To publish some messages:

nats pub mysubject.queue "{{.Count}}" --count=14

Expected behavior

We publish 14 messages, and there are 15 workers, so all messages should:

Observed behavior

Some messages are processed late: in the logs below, we can see that the first messages are processed as soon as they arrive (at 05 minutes and 18 seconds), while the next messages are received at 05 minutes and 48 seconds.

...
2024-01-04 21:04:49,274 INFO     - mysubject | myqueue | mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-04 21:05:18,347 INFO     - mysubject| myqueue | mysubject.queue | de006108-5 - Received
2024-01-04 21:05:18,348 INFO     - mysubject| myqueue | mysubject.queue | 7fa8fc2b-5 - Received
2024-01-04 21:05:18,350 INFO     - mysubject| myqueue | mysubject.queue | 53fd4080-e - Received
2024-01-04 21:05:18,353 INFO     - mysubject| myqueue | mysubject.queue | 78b50dd5-4 - Received
2024-01-04 21:05:18,357 INFO     - mysubject| myqueue | mysubject.queue | eb2353c1-5 - Received
2024-01-04 21:05:18,359 INFO     - mysubject| myqueue | mysubject.queue | 3e7c0161-1 - Received
2024-01-04 21:05:18,361 INFO     - mysubject| myqueue | mysubject.queue | 4446dad5-5 - Received
2024-01-04 21:05:18,366 INFO     - mysubject| myqueue | mysubject.queue | fbb659af-9 - Received
2024-01-04 21:05:18,369 INFO     - mysubject| myqueue | mysubject.queue | 358fe74d-a - Received
2024-01-04 21:05:18,370 INFO     - mysubject| myqueue | mysubject.queue | 6439cd36-2 - Received
2024-01-04 21:05:48,357 INFO     - mysubject| myqueue | mysubject.queue | 13e6cb4c-4 - Received  # late
2024-01-04 21:05:48,378 INFO     - mysubject| myqueue | mysubject.queue | e027f20f-e - Received  # late
2024-01-04 21:05:48,380 INFO     - mysubject| myqueue | mysubject.queue | 7bbb0898-8 - Received  # late

Some messages are processed twice:

[#25] Received on "mysubject.start"
content-type: text/plain
correlation_id: 41c214e1-9a25-439d-8bf4-a06d7f3e20b8

mysubject.queue received task_id=5 at 2024-01-04 20:05:48.365336

...

[#41] Received on "mysubject.start"
content-type: text/plain
correlation_id: e37cd05a-5827-43c8-9199-3f0807ede7a1

mysubject.queue received task_id=5 at 2024-01-04 20:05:48.461788

When I set the number of workers in the @broker.subscriber(max_workers=15), the bug disappears. However, I cannot use that for my app unfortunately, since I need real parallelism for processing the tasks, and not concurrent processing.

Environment

Running FastStream 0.3.11 with CPython 3.11.5 on Darwin

Thanks for the help!

Lancetnik commented 10 months ago

@Jonas1312 can you show me your handler and broker code?

Jonas1312 commented 10 months ago

@Jonas1312 can you show me your handler and broker code?

it's in the zip file attached to the post above

https://github.com/airtai/faststream/files/13835151/faststream.nats.bug.zip

Lancetnik commented 10 months ago

Sorry, didn't see it I'll check

Lancetnik commented 10 months ago

@Jonas1312 according to NATS docs you should use durable PullSubscriber to reach exactly-once delivery. I tested the following code and it works fine as you wish.

import asyncio
import logging
import time
from datetime import datetime

from faststream import FastStream, Logger
from faststream.nats import ConsumerConfig, NatsBroker, NatsMessage, PullSub
from faststream.nats.annotations import NatsBroker as Broker

MSG_PROCESSING_TIME = 10

broker = NatsBroker(ping_interval=5, graceful_timeout=MSG_PROCESSING_TIME + 1, log_level=logging.DEBUG)
app = FastStream(broker=broker)

@broker.subscriber(
    subject="mysubject.queue",
    stream=JStream(
        name="mysubject", subjects=["mysubject", "mysubject.>"], declare=False
    ),
    durable="mydurable",
    pull_sub=PullSub(),
    config=ConsumerConfig(ack_wait=MSG_PROCESSING_TIME, max_ack_pending=-1),
)
async def process_msg(
    task_id: int,
    msg: NatsMessage,
    br: Broker,
    logger: Logger,
) -> None:
    logger.info(task_id)
    await msg.in_progress()

    await br.publish(
        subject="mysubject.start",
        message=f"mysubject.queue received {task_id=} at {datetime.utcnow()}",
    )

    start = time.time()
    while time.time() - start < MSG_PROCESSING_TIME:
        await asyncio.sleep(1)
        await msg.in_progress()

    await br.publish(
        subject="mysubject.end",
        message=f"mysubject.queue processed {task_id=} at {datetime.utcnow()}",
    )
Jonas1312 commented 10 months ago

Thanks for your answer @Lancetnik

I tried your code, but it still doesn't work for me unfortunately

There are 15 workers, 14 messages to process, so they should all be processed as soon as they arrive in the queue. In the logs below, the 8 first messages are processed immediately (at 11 minutes and 43 seconds), while the next ones are processed 30s later (12 minutes and 13 seconds):

2024-01-05 08:11:26,707 INFO     - Started child process [5187]
2024-01-05 08:11:26,713 INFO     - Started child process [5188]
2024-01-05 08:11:26,719 INFO     - Started child process [5189]
2024-01-05 08:11:26,728 INFO     - Started child process [5190]
2024-01-05 08:11:28,990 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:28,990 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:28,991 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,000 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,022 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,023 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,038 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,046 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,056 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,070 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,073 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,085 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,090 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,098 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:29,101 INFO     - mysubject| mysubject.queue |            - `ProcessMsg` waiting for messages
2024-01-05 08:11:43,498 INFO     - mysubject| mysubject.queue | 95fa82a0-e - Received
2024-01-05 08:11:43,500 INFO     - mysubject| mysubject.queue | 38e12540-d - Received
2024-01-05 08:11:43,501 INFO     - mysubject| mysubject.queue | a56f8fc2-d - Received
2024-01-05 08:11:43,503 INFO     - mysubject| mysubject.queue | 926fea7f-e - Received
2024-01-05 08:11:43,512 INFO     - mysubject| mysubject.queue | b5d6f01c-5 - Received
2024-01-05 08:11:43,514 INFO     - mysubject| mysubject.queue | 9817aae1-9 - Received
2024-01-05 08:11:43,517 INFO     - mysubject| mysubject.queue | b6b98171-f - Received
2024-01-05 08:11:43,520 INFO     - mysubject| mysubject.queue | 2342eb53-1 - Received
2024-01-05 08:12:13,512 INFO     - mysubject| mysubject.queue | 1c0399fb-5 - Received  # late
2024-01-05 08:12:13,513 INFO     - mysubject| mysubject.queue | 193d38fe-a - Received
2024-01-05 08:12:13,515 INFO     - mysubject| mysubject.queue | 7d410104-5 - Received
2024-01-05 08:12:13,528 INFO     - mysubject| mysubject.queue | 9ee6a36c-5 - Received
2024-01-05 08:12:13,598 INFO     - mysubject| mysubject.queue | b5d6f01c-5 - Processed
2024-01-05 08:12:13,598 INFO     - mysubject| mysubject.queue | 38e12540-d - Processed

And when I do nats --server localhost:4222 subscribe "mysubject.*", I see 54 messages, when there should be 3*14 messages, which means that some messages are processed several times.

Code to reproduce (with the pull sub mode):

import asyncio
import time
from datetime import datetime

from faststream import FastStream
from faststream.nats import ConsumerConfig, JStream, NatsBroker, NatsMessage, PullSub

NATS_URL = "nats://localhost:4222"
MSG_PROCESSING_TIME = 30
broker = NatsBroker(
    servers=NATS_URL,
    ping_interval=3,
    # wait for consumed messages processed correctly before apllication shutdown
    graceful_timeout=MSG_PROCESSING_TIME + 1,
)
app = FastStream(broker=broker)

@broker.subscriber(
    subject="mysubject.queue",
    stream=JStream(name="mysubject", subjects=["mysubject", "mysubject.>"], declare=False),
    durable="myowndurable",
    pull_sub=PullSub(batch_size=1, timeout=3),
    config=ConsumerConfig(ack_wait=MSG_PROCESSING_TIME, max_ack_pending=-1),
    # max_workers=15,
)
async def process_msg(task_id: int, msg: NatsMessage) -> None:
    await msg.in_progress()

    broker = NatsBroker(NATS_URL)

    async with broker as br:
        await br.publish(
            subject="mysubject.start",
            message=f"mysubject.queue received {task_id=} at {datetime.utcnow()}",
        )

    start = time.time()
    while time.time() - start < MSG_PROCESSING_TIME:
        await asyncio.sleep(1)
        await msg.in_progress()

    async with broker as br:
        await br.publish(
            subject="mysubject.end",
            message=f"mysubject.queue processed {task_id=} at {datetime.utcnow()}",
        )
Lancetnik commented 10 months ago

@Jonas1312 seems like you can't use max_pending=-1 and setup it explicitly https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending

The same with MaxWaiting https://docs.nats.io/nats-concepts/jetstream/consumers#pull-specific

Also I added manual ack_sync for Exatly Once semantic

In the following setup all messages are processing at once (msg.raw_message.metadata.num_delivered) and in the same time by different workers

import asyncio
import logging
import os
import time

from faststream import FastStream, Logger
from faststream.nats import ConsumerConfig, JStream, NatsBroker, NatsMessage, PullSub

MSG_PROCESSING_TIME = 30

broker = NatsBroker(
    ping_interval=5,
    graceful_timeout=MSG_PROCESSING_TIME * 1.2,
    log_level=logging.DEBUG,
)
app = FastStream(broker=broker)

@broker.subscriber(
    subject="mysubject.queue",
    stream=JStream(name="mysubject", subjects=["mysubject", "mysubject.>"], declare=False),
    durable="myowndurable",
    pull_sub=PullSub(batch_size=1, timeout=100.0),  # long-polling timeout
    config=ConsumerConfig(
        ack_wait=MSG_PROCESSING_TIME * 1.2,  # wait a little bit more than processing
        max_ack_pending=1000,  # explicit limit
        max_waiting=1000,  # explicit limit
    ),
)
async def process_msg(
    task_id,
    msg: NatsMessage,
    logger: Logger,
) -> None:
    logger.info(
        f"{os.getpid()} {msg.raw_message.metadata.num_delivered} {msg.raw_message.metadata.sequence.consumer} {task_id}"
    )
    await msg.in_progress()

    start = time.time()
    while time.time() - start < MSG_PROCESSING_TIME:
        await asyncio.sleep(1)
        await msg.in_progress()

    await msg.raw_message.ack_sync()   # ack sync for exactly-once semantic
Jonas1312 commented 10 months ago

Worked like a charm!

Thanks a lot for the help @Lancetnik !

pySilver commented 7 months ago

@Jonas1312 @Lancetnik sorry to dig into closed issue. I just wanted to ask - why messages was processed more than once before you guys arrived to a final solution?

Is that because it was distributed among workers multiple times or because re-delivery?

From my understanding we should design our processing flow in away so duplicate processing will not hurt our data (for instance we can assign some identifications to items of work and refuse to process if item is no longer in KV). Using exactly-once with manual double ack seems really tedious to use all the time.

Lancetnik commented 7 months ago

It was a double-delivery from the NATS side. We can scale our consumers by queue-group for PushSubscriber or durable for PullSubscriber (reccomended by NATS team way), but we tried to set correct options to make this decision working. NATS sends messages in a strange way until we sets max_ack_pending limit explicitly.

But, by default PullSub + durable works with exactly-once correctly.

pySilver commented 7 months ago

@Lancetnik thanks for explanation. That takes some magic off the topic! Though, they (NATS) could explain it a little bit better in docs (while they docs are great!)

Here is the link for others that will eventually arrive to this issue. It demonstrates this and other issues with limits:

https://natsbyexample.com/examples/jetstream/pull-consumer-limits/go

pySilver commented 7 months ago

@Lancetnik Actually I wanted to dig into this issue a little more to understand underlying logic better. I've tested your code from https://github.com/airtai/faststream/issues/1114#issuecomment-1877859307 and it works just fine

Just for future readers - NATS probably fixed this in the current version: 2.10.12

I see 42 messages as expected (no redelivery):

[#42] Received on "mysubject.end"
content-type: text/plain
correlation_id: ce8eb4a2-f561-47b2-89bc-988890c96a82

mysubject.queue processed task_id=13 at 2024-04-04 01:31:38.521003

and all of them were processed at the same time:

jetstream ❯ faststream run bug:app --workers=15
2024-04-04 03:31:21,756 INFO     - Started parent process [16613]
2024-04-04 03:31:21,760 INFO     - Started child process [16615]
2024-04-04 03:31:21,762 INFO     - Started child process [16616]
2024-04-04 03:31:21,763 INFO     - Started child process [16617]
2024-04-04 03:31:21,765 INFO     - Started child process [16618]
2024-04-04 03:31:21,767 INFO     - Started child process [16619]
2024-04-04 03:31:21,768 INFO     - Started child process [16620]
2024-04-04 03:31:21,771 INFO     - Started child process [16621]
2024-04-04 03:31:21,773 INFO     - Started child process [16622]
2024-04-04 03:31:21,777 INFO     - Started child process [16623]
2024-04-04 03:31:21,779 INFO     - Started child process [16624]
2024-04-04 03:31:21,785 INFO     - Started child process [16625]
2024-04-04 03:31:21,789 INFO     - Started child process [16626]
2024-04-04 03:31:21,798 INFO     - Started child process [16627]
2024-04-04 03:31:21,806 INFO     - Started child process [16628]
2024-04-04 03:31:21,812 INFO     - Started child process [16629]
2024-04-04 03:31:28,504 INFO     - mysubject | mysubject.queue | e556d3d3-9 - 1
2024-04-04 03:31:28,505 INFO     - mysubject | mysubject.queue | 732a0949-b - 2
2024-04-04 03:31:28,506 INFO     - mysubject | mysubject.queue | a4bff34b-2 - 3
2024-04-04 03:31:28,507 INFO     - mysubject | mysubject.queue | b9d854d2-c - 4
2024-04-04 03:31:28,508 INFO     - mysubject | mysubject.queue | ff70a3f6-c - 5
2024-04-04 03:31:28,509 INFO     - mysubject | mysubject.queue | ff4523c6-5 - 6
2024-04-04 03:31:28,509 INFO     - mysubject | mysubject.queue | ec8d2217-5 - 7
2024-04-04 03:31:28,509 INFO     - mysubject | mysubject.queue | 2c0cec31-e - 8
2024-04-04 03:31:28,510 INFO     - mysubject | mysubject.queue | c3d4c250-4 - 9
2024-04-04 03:31:28,511 INFO     - mysubject | mysubject.queue | 0bb2ca30-1 - 11
2024-04-04 03:31:28,511 INFO     - mysubject | mysubject.queue | 01222ffa-f - 10
2024-04-04 03:31:28,511 INFO     - mysubject | mysubject.queue | 9e3e63fb-4 - 12
2024-04-04 03:31:28,511 INFO     - mysubject | mysubject.queue | 223765aa-d - 13
2024-04-04 03:31:28,512 INFO     - mysubject | mysubject.queue | 731e0f27-9 - 14

and this happens with default setup:

@broker.subscriber(
    subject="mysubject.queue",
    stream=JStream(
        name="mysubject", subjects=["mysubject", "mysubject.>"], declare=False
    ),
    durable="mydurable",
    pull_sub=PullSub(),
    config=ConsumerConfig(ack_wait=MSG_PROCESSING_TIME),
)

So yeah. Just a note for future readers.