nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.02k stars 161 forks source link

Consumer Stalls on high throughput #1285

Closed TroyKomodo closed 1 month ago

TroyKomodo commented 1 month ago

Observed behavior

When consuming high volumes of requests/s consumers freeze and stop receiving events.

Expected behavior

Things work flawlessly

Server and client version

nats-server: nats:latest docker running 2.10.17 nats-client: v0.1.1 async-nats: 0.35.1

Host environment

Docker container

Single deployment using docker-compose

services:
  nats:
    image: nats:latest
    pull_policy: "always"
    restart: unless-stopped
    ports:
      - "${NATS_BIND:-127.0.0.1:4222}:4222"
    command: -js -sd /data 
    volumes:
      - nats:/data
    deploy:
      resources:
        limits:
          memory: ${NATS_MEMORY:-4G}
volumes:
  nats:

Steps to reproduce

use std::sync::{atomic::AtomicUsize, Arc};

use async_nats::jetstream::{self, stream::RetentionPolicy};
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let nats = async_nats::connect("nats://127.0.0.1:4222").await.unwrap();
    let jetstream = async_nats::jetstream::new(nats);
    let stream = jetstream.get_or_create_stream(jetstream::stream::Config {
        name: "test".to_string(),
        retention: RetentionPolicy::WorkQueue,
        subjects: vec!["test".to_string()],
        ..Default::default()
    }).await.unwrap();
    let consumer = stream.create_consumer(jetstream::consumer::pull::Config {
        name: Some("test".to_string()),
        durable_name: Some("test".to_string()),
        ack_policy: jetstream::consumer::AckPolicy::Explicit,
        max_ack_pending: 1_000_000,
        ..Default::default()
    }).await.unwrap();

    const PERMITS: usize = 50_000;

    let message_count = Arc::new(AtomicUsize::new(0));
    let semaphore = Arc::new(tokio::sync::Semaphore::new(PERMITS));
    let mut messages = consumer.messages().await.unwrap();

    {
        let semaphore = semaphore.clone();
        let message_count = message_count.clone();
        tokio::spawn(async move {
            while let Some(msg) = messages.next().await.transpose().unwrap() {
                let ticket = semaphore.clone().acquire_owned().await.unwrap();
                message_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                tokio::spawn(async move {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    msg.ack().await.unwrap();
                    drop(ticket);
                });
            }
        });
    }

    loop {
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        let mc = message_count.swap(0, std::sync::atomic::Ordering::Relaxed);
        println!("Received {}/s messages, inflight {}", mc, PERMITS - semaphore.available_permits());
    }
}
nats pub test --count=100000000 "{{Count}} @ {{Time}}"

https://github.com/user-attachments/assets/c6e9274e-67a3-4c41-b2d3-b88b6db2aeb5

TroyKomodo commented 1 month ago

Sometimes its also observed that the stream closes because of the idle heartbeat miss (even though there are messages in the queue it just does not recieve them)


also no server logs.


added tracing to get tracing logs

https://haste.potat.app/raw/rZgdXc

at all points even when the stream has reported 0 inflight 0rps the stream on jetstream has messages unprocessed.

wallyqs commented 1 month ago

thanks for the updates, we will take a look

Jarema commented 1 month ago

Thanks for detailed report.

There are two issues with your example:

Issue 1

You're using Core NATS publish that has zero backpressure control. This way, you flood the JetStream with messages in a way that it overwhelms it. Solution: Use JetStream publish that has acks and control how many publishes didnt get acks back.

Issue 2

by using

   while let Some(msg) = messages.next().await.transpose().unwrap() {

You're swapping Option<Result<Message>> into Result<Option<Message>>. This means that if there are any errors returned, it will stop the while loop. And you will get errors when JetStream is flooded with messages as described in Issue 1. The client will never recover, even if you stop that publisher. Solution: Do not transpose like this and check for errors.

wallyqs commented 1 month ago

@TroyKomodo do you know if the container ever exit with an OOM due to the way it is publishing and then it got restarted? as @Jarema points out publishing without waiting acks would cause issues in JS so not recommended but interested in figuring out how you ran into the panic

Callum-A commented 1 month ago

Hi all, we encounter an issue very similar to this. Ours seems to stall every so often for a second. We use a NATS cluster and simply subscribe and receive messages using subscriber.next. Can provide additional details and try to repro locally tonight

Jarema commented 1 month ago

@Callum-A I do not see issue with the library with provided example above, however your scenario might be a different case, so definately post an example so we can analyze, help and reproduce.

Callum-A commented 1 month ago

I'll see if I can create an example tonight, it's hard to share as it's company code so will try and repro as closely as I can on a toy example. Some more info we see about 100/s and we hit sometimes 2-3 times a day nothing for seconds. As far as we are aware it's not another thread blocking us, then after this time window we catch up (this latency then causes alerts to fire on our app).

Callum-A commented 1 month ago

Some added details:

We use one NATS client for the multiple subscriptions we have to different subjects and we use call .next within a Tokio select!

Callum-A commented 1 month ago

Okay I've started to, it seems in a select with multiple consumers when it is spammed with messages in a loop I see ~7s gaps between messages. Even if it is lagging surely we should still see a flow of messages without pauses!

TroyKomodo commented 1 month ago

The server did not oom as far as I am aware no oom records in kernel. I used the jetstream publish with ack so that it has back pressure support. The consumer no longer freezes however the performance is very disappointing.

Running nats bench I was able to see ~60k consumed messages while publishing on a single pub/sub Using file based storage.

Using the rust driver I see only around 10k and that's only if I use in memory. If I use file based it slows drastically to 100 messages per second publish and naturally 100 per second ack.

What logs or things do u want me to so I can diagnose the issue?

I'll put up a repo when I get home so you can see exactly my setup.

-Troy

Jarema commented 1 month ago

@TroyKomodo Performance is an entirely different topic in context of snippet you have provided.

First of all, we would need a code that shows that performance you see, and how you run it, as I assume it's not the example from above, as that is clearly not meant to be performant, but rather to show issue.

Additionaly:

  1. Are you running the code in release mode?
  2. How do you handle the backpressure of ack publish?

I'm able to achieve ~110k messages/second, while publishing and consuming on the same conneciton (where bench uses separate for pub/sub).

Here is a quickly sketched example:

use std::env;

use async_nats::jetstream::{self, consumer::PullConsumer, context::PublishAckFuture};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
    let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
    let client = async_nats::connect(nats_url).await?;
    let jetstream = jetstream::new(client);
    let stream_name = String::from("EVENTS");

    let consumer: PullConsumer = jetstream
        .create_stream(jetstream::stream::Config {
            name: stream_name,
            subjects: vec!["events.>".into()],
            ..Default::default()
        })
        .await?
        .create_consumer(jetstream::consumer::pull::Config {
            durable_name: Some("consumer".into()),
            ..Default::default()
        })
        .await?;

    let (tx, mut rx) = tokio::sync::mpsc::channel(25_000);
    tokio::task::spawn(async move {
        loop {
            let ack: PublishAckFuture = match rx.recv().await {
                Some(ack) => ack,
                None => {
                    println!("All acks received");
                    break;
                }
            };
            ack.await.unwrap();
        }
    });

    let messages_count = 1_000_000;

    tokio::task::spawn(async move {
        for i in 0..messages_count {
            let ack = jetstream
                .publish(format!("events.{i}"), "data".into())
                .await
                .unwrap();
            tx.send(ack).await.unwrap();
        }
    });

    let now = std::time::Instant::now();
    let mut messages = consumer.messages().await?.take(messages_count as usize);

    while let Some(message) = messages.next().await {
        let message = message?;
        message.ack().await?;
    }
    let seconds = now.elapsed().as_secs_f64();
    let msgs_per_second = messages_count as f64 / seconds;

    println!(
        "Processed {} messages in {:?} seconds ({:.2} messages per second)",
        messages_count, seconds, msgs_per_second
    );
    Ok(())
}
TroyKomodo commented 1 month ago

The example you provides is a bit misleading. While it is true that we get an AVERAGE performance of ~110k messages per second. Lets look at a time line of when we process messages.

use std::{env, sync::{atomic::AtomicUsize, Arc}};

use async_nats::jetstream::{self, consumer::PullConsumer, context::PublishAckFuture};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
    tracing_subscriber::fmt::init();

    let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
    let client = async_nats::connect(nats_url).await?;
    let jetstream = jetstream::new(client);
    let stream_name = String::from("EVENTS");

    let consumer: PullConsumer = jetstream
        .create_stream(jetstream::stream::Config {
            name: stream_name,
            subjects: vec!["events.>".into()],
            ..Default::default()
        })
        .await?
        .create_consumer(jetstream::consumer::pull::Config {
            durable_name: Some("consumer".into()),
            ..Default::default()
        })
        .await?;

    let (tx, mut rx) = tokio::sync::mpsc::channel(25_000);
    tokio::task::spawn(async move {
        loop {
            let ack: PublishAckFuture = match rx.recv().await {
                Some(ack) => ack,
                None => {
                    tracing::info!("All acks received");
                    break;
                }
            };
            ack.await.unwrap();
        }
    });

    let messages_count = 1_000_000;

    tokio::task::spawn(async move {
        for i in 0..messages_count {
            let ack = jetstream
                .publish(format!("events.{i}"), "data".into())
                .await
                .unwrap();
            tx.send(ack).await.unwrap();
        }
        tracing::info!("Published {} messages", messages_count);
    });

    let now = std::time::Instant::now();
    let mut messages = consumer.messages().await?.take(messages_count as usize);

    let counter = Arc::new(AtomicUsize::new(0));

    let counter_clone = counter.clone();
    tokio::task::spawn(async move {
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            let count = counter_clone.swap(0, std::sync::atomic::Ordering::Relaxed);
            tracing::info!("Processed {count} messages per second");
        }
    });

    tracing::info!("Starting to process messages");

    while let Some(message) = messages.next().await {
        let message = message?;
        message.ack().await?;
        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    }
    let seconds = now.elapsed().as_secs_f64();
    let msgs_per_second = messages_count as f64 / seconds;

    tracing::info!(
        "Processed {} messages in {:?} seconds ({:.2} messages per second)",
        messages_count, seconds, msgs_per_second
    );
    Ok(())
}
2024-07-17T00:31:02.147882Z  INFO async_nats: event: connected
2024-07-17T00:31:02.149885Z  INFO demo: Starting to process messages
2024-07-17T00:31:03.150627Z  INFO demo: Processed 2400 messages per second
2024-07-17T00:31:04.151779Z  INFO demo: Processed 2200 messages per second
2024-07-17T00:31:05.152354Z  INFO demo: Processed 1600 messages per second
2024-07-17T00:31:06.153362Z  INFO demo: Processed 1600 messages per second
2024-07-17T00:31:07.154480Z  INFO demo: Processed 1600 messages per second
2024-07-17T00:31:08.155375Z  INFO demo: Processed 1600 messages per second
2024-07-17T00:31:08.958283Z  INFO demo: Published 1000000 messages
2024-07-17T00:31:09.040892Z  INFO demo: All acks received
2024-07-17T00:31:09.156330Z  INFO demo: Processed 28316 messages per second
2024-07-17T00:31:10.157328Z  INFO demo: Processed 211488 messages per second
2024-07-17T00:31:11.158438Z  INFO demo: Processed 205026 messages per second
2024-07-17T00:31:12.159329Z  INFO demo: Processed 223104 messages per second
2024-07-17T00:31:13.160704Z  INFO demo: Processed 210666 messages per second
2024-07-17T00:31:13.712337Z  INFO demo: Processed 1000000 messages in 11.562462365 seconds (86486.77 messages per second)

So yeah the average is okay i suppose but what if the consumer is just constantly publishing? we just never get to actually processing the messages then i suppose.

in contrast the cli bench seems to show we get a constant 67k/s however this might also be misleading if the cli tool is just lying in the progress bars.

https://github.com/user-attachments/assets/e5e9db2d-5ec6-431c-84fa-9a9ac2d3a17c

Jarema commented 1 month ago

@TroyKomodo It's about draining JetStream resources:

High pub ack pending limit:

Pub ack semaphore: 25000
Processed 2800 messages in the last second (155281 published)
Processed 1600 messages in the last second (163247 published)
Processed 2000 messages in the last second (146490 published)
Processed 2000 messages in the last second (139760 published)
Processed 2800 messages in the last second (169841 published)
Processed 2400 messages in the last second (160518 published)
All acks received
Processed 192492 messages in the last second (64863 published)
Processed 325961 messages in the last second (0 published)
Processed 324118 messages in the last second (0 published)
Processed 1000000 messages in 9.451897917 seconds (105798.86 messages per second)

You can see, that consumer is pretty slow while publisher is active.

Low ack semaphore

Pub ack semaphore: 1000
Processed 44000 messages in the last second (127161 published)
Processed 47400 messages in the last second (136473 published)
Processed 49000 messages in the last second (135770 published)
Processed 52000 messages in the last second (142425 published)
Processed 47400 messages in the last second (139502 published)
Processed 47600 messages in the last second (138247 published)
Processed 50200 messages in the last second (148120 published)
All acks received
Processed 273899 messages in the last second (32302 published)
Processed 336627 messages in the last second (0 published)
Processed 1000000 messages in 9.154214791 seconds (109239.30 messages per second)

The total time is very similar, but the balance between publishing and consuming is shifted.

You can reduce impact of publishing on consuming by having Stream with 3 replicas, as consumers can have their leader on any of the Stream replicas, balancing out the load on the nodes.

TroyKomodo commented 1 month ago

I see thank you then this solves my issue. We can close if @Callum-A is different / sorted.

Jarema commented 1 month ago

@Callum-A does it solve your issues too?

Callum-A commented 1 month ago

This can be closed. We've gone with a different approach and I'm unable to reliably repro