risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.06k stars 580 forks source link

Meta node can only handle ~600 Kafka sources. #18949

Open ka-weihe opened 1 month ago

ka-weihe commented 1 month ago

Describe the bug

I attempted to create 700 Kafka Plain Avro sources. However, after successfully creating approximately 600 sources, the meta-node becomes unresponsive, and I encounter the following error:

ERROR librdkafka: librdkafka: THREAD [thrd:main]: Unable to create broker thread

Additionally, when I shell into the meta-node and try to execute any commands, I get:

bash: fork: retry: Resource temporarily unavailable

To investigate, I ran the command (ps -eLf | wc -l) to monitor the number of OS threads while creating the sources. The thread count reached 32,769 after I had created 629 sources, at which point I could no longer execute commands—likely due to the OS exhausting its available resources for creating threads.

This behavior seems like a bug. I would expect the number of threads to be significantly lower than the number of sources. Even if there was only one OS thread per source, while inefficient, it would still fit within our use case requirements, but we are seeing more than 50 OS threads per source. It’s also worth noting that none of the sources were being actively used at any time.

Error message/log

ERROR librdkafka: librdkafka: THREAD [thrd:main]: Unable to create broker thread

To Reproduce

Expected behavior

I expect the sources to be created and the Meta node to not be unresponsive and use many resources.

How did you deploy RisingWave?

More or less like this: https://github.com/risingwavelabs/risingwave-operator/blob/main/docs/manifests/risingwave/risingwave-postgresql-s3.yaml

But higher limits

The version of RisingWave

PostgreSQL 13.14.0-RisingWave-2.0.1 (0d15632ac8d18fbd323982b53bc2d508dc7e148a)

Additional context

No response

xxchan commented 1 month ago

According to https://github.com/confluentinc/librdkafka/issues/1600, it seems each kafka consumer will create ~num_brokers threads, and threads cannot be shared across client instances.

In Meta, currently each source has a KafkaSplitEnumerator, which has a Kafka consumer, so this looks like exactly your problem. (1 source ~ 50 threads) In theory, we could improve Meta by sharing consumer connecting to the same broker.

However, I'm concerned that after fixing Meta, Compute nodes still cannot handle it, since it will have even more Kafka consumers (multiply by parallelism).

I'm feeling 50 brokers might be too large. Is it possible for you to divide it into multiple smaller clusters with fewer brokers? 🤔

ka-weihe commented 1 month ago

Thank you for the detailed response!

A few follow-up questions:

  1. Why does it create a consumer per source, even when unused?
    It seems that each KafkaSplitEnumerator creates its own Kafka consumer, leading to high thread usage, especially with many brokers. Is there a specific reason for this 1:1 mapping, and why are consumers created before the sources are actively used? Wouldn't it be more efficient to create consumers only when a source is actually consuming messages?

  2. Sharing consumers across sources/topics
    Sharing consumers across sources that connect to the same broker seems like a more efficient and scalable approach, significantly reducing thread usage. Could this be implemented both in Meta and the compute nodes to better handle a high number of topics/topics?

    Is it possible for you to divide it into multiple smaller clusters with fewer brokers?

No. I'm afraid that is not possible.

xxchan commented 1 month ago

Is there a specific reason for this 1:1 mapping

I think we haven't optimized it just because your scale is quite uncommon.

why are consumers created before the sources are actively used

This seems to be the Kafka library's limitation. Even just for fetching metadata, we need to create consumers. And the consumers will immediately connect to all brokers (instead of only the bootstrap server)

Could this be implemented both in Meta and the compute nodes to better handle a high number of topics/topics?

As I mentioned above, implementing this in meta should be quite feasible.

However, sharing consumers in compute node could be much more difficult. I found the Kafka library does indeed have an API for such usage: split_partition_queue. But currently we share nothing between different sources, and difference parallelisms (actors) of a source. To share consumers, it may require a quite large change on the implementation.

Besides, the split_partition_queue API also has a limitation: it doesn't work when you have multiple message streams for the same partition, but currently we naturally support multiple jobs on the same source.

We are also not sure about the performance implications of the change. So it requires investigation and testing if we want to go with this approach.

ly9chee commented 1 month ago

However, I'm concerned that after fixing Meta, Compute nodes still cannot handle it, since it will have even more Kafka consumers (multiply by parallelism).

Hi @xxchan do you mean if I had a Kafka cluster with 3 brokers and a source with 48 parallelisms, there'd be 144 (48*3) threads and connections within the CN node? That sounds crazy 😅.

xxchan commented 1 month ago

Yes. More precisely, the parallelisms is number of partitions of the Kafka topics, not necessarily RisingWave actors.


Edit: should be min(num_partitions, num_actors)

xxchan commented 1 month ago

There's an alternative solution besides the refactoring mentioned above: Use a pure async Rust Kafka library, e.g., rskafka. Then there will be no extra threads at all. All the threads will be managed by tokio.

Possible drawbacks:

ly9chee commented 1 month ago

Is it feasible to let the user switch between the async Rust or librdkafka libraries of Kafka source under different use cases? I mean, if users have lots of Kafka sources that cannot be created at all due to the OS exhausting its resources, they can switch to rskafka. Or, users care about the performance and, as you mentioned, the auth-related options, they can use the default librdkafka.

xxchan commented 1 month ago

Yes. My rough idea is to introduce kafka_v2 connector (if we decided to do it), and it will co-exist with original implementation for a while due to the limitations above. We won’t deprecate the old one unless these problems are resolved.

This is just my idea anyway. We will need to see others’ opinions to decide which way to go.

ka-weihe commented 4 weeks ago

This seems to be the Kafka library's limitation. Even just for fetching metadata, we need to create consumers. And the consumers will immediately connect to all brokers (instead of only the bootstrap server)

If that is the case, why not close the consumer after fetching the metadata?

We are also not sure about the performance implications of the change. So it requires investigation and testing if we want to go with this approach.

I agree that benchmarking is necessary, but intuitively, I would expect sharing consumers to be beneficial for much fewer than 600 sources. For example, with just 4 sources and 50 brokers, you're already dealing with over 200 OS threads, leading to significant context switching and other overhead. I suspect that this negatively impacts performance at scales far smaller (fewer brokers and sources) than what we are currently dealing with.

There's an alternative solution besides the refactoring mentioned above: Use a pure async Rust Kafka library, e.g., rskafka. Then there will be no extra threads at all. All the threads will be managed by Tokio.

This seems like a promising option. Have you also considered https://github.com/fede1024/rust-rdkafka? It seems to offer helpful abstractions for making things asynchronous and reducing thread overhead.

Performance may not be as good as librdkafka. I think it uses these excessive many threads to do things like fetch queue for maximized performance.

I don’t believe it’s intended to create a consumer for every topic with librdkafka. I think that the overhead from the OS threads would degrade performance rather quickly.

xxchan commented 4 weeks ago

If that is the case, why not close the consumer after fetching the metadata?

We need to periodically fetch metadata for sources on meta to detect whether there's split change (scaling) happens on the source, so the consumers are kept. Closing and re-establish connection sounds not good, but may also worth considering due to the problem now.

For example, with just 4 sources and 50 brokers

I do not doubt things can be a lot different at your scale. We will mainly need to ensure there's no regression for most users with much smaller scale.

Have you also considered fede1024/rust-rdkafka? It seems to offer helpful abstractions for making things asynchronous and reducing thread overhead.

This is exactly the library we are using. It doesn't solve the problem.

hzxa21 commented 4 weeks ago

https://github.com/confluentinc/librdkafka/issues/825#issuecomment-469156541 librdkafka as supported "sparse" connection but not "sparse" thread, meaning that it will still create one thread per broker but will only create connection to a broker on demand. The community mentioned that they will work on "sparse" thread but it seems to be suspended for some reason.

I think it is hard for us to hack librdkafka to support "sparse" thread. Given that librdkafka community mentioned that "The non-connected threads will be mostly idle though.", I wonder whether it is feasible to tune the maximum thread count to mitigate the issue when you have a use case that involves a big kafka cluster with many kafka sources: https://stackoverflow.com/questions/344203/maximum-number-of-threads-per-process-in-linux

fuyufjh commented 4 weeks ago

There's an alternative solution besides the refactoring mentioned above: Use a pure async Rust Kafka library, e.g., rskafka. Then there will be no extra threads at all. All the threads will be managed by Tokio.

I don't really want to bet on https://github.com/influxdata/rskafka. It's easy to start a PoC version to support Kafka protocol, but things will become much difficult when putting it into production, which may require 100% alignment with the official protocol. The subtitle of rskafka is "A minimal Rust client for Apache Kafka", thus, I'm afraid that we have to complete lots of things to make it really fit our requirements.

Technically, the Kafka client can subscribe to multiple topics and partitions, which means we possibly reuse one Kafka client for all Kafka source actors. This sounds difficult for our current design because each sources are supposed to have their own client properties. However, it seems much less difficult for the Meta service to reuse Kafka client. Worth to take an attempt, I think.

tabVersion commented 4 weeks ago

I don't really want to bet on influxdata/rskafka. It's easy to start a PoC version to support Kafka protocol, but things will become much difficult when putting it into production, which may require 100% alignment with the official protocol. The subtitle of rskafka is "A minimal Rust client for Apache Kafka", thus, I'm afraid that we have to complete lots of things to make it really fit our requirements.

+1. I think rskafka is a minimal client version meeting InfluxDB's requirements but not for general usage. It lacks a variety of SSL support and the fetch queue implementation. In the long term, we need to do much work to make it stable for RisingWave users. And I doubt the ROI on this.

ly9chee commented 4 weeks ago

I wonder whether it is feasible to tune the maximum thread count to mitigate the issue when you have a use case that involves a big kafka cluster with many kafka sources

It seems each thread has a stable overhead (e.i. stack, TLS, ...) even if most of them are idle.

Is it possible to reduce the number of topics by routing messages through a shared kafka source? For example, sending all kinds of messages to one topic and consuming this topic using a shared source, then creating specific mviews or sinks on top of this shared source by filtering the message types. Thus, only num_brokers * min(num_partitions, num_actors) threads are necessary to create.

But cdc sources seem not to fit in this approach due to the one topic one table fashion.

tabVersion commented 4 weeks ago

I wonder whether it is feasible to tune the maximum thread count to mitigate the issue when you have a use case that involves a big kafka cluster with many kafka sources

It seems each thread has a stable overhead (e.i. stack, TLS, ...) even if most of them are idle.

Is it possible to reduce the number of topics by routing messages through a shared kafka source? For example, sending all kinds of messages to one topic and consuming this topic using a shared source, then creating specific mviews or sinks on top of this shared source by filtering the message types. Thus, only num_brokers * min(num_partitions, num_actors) threads are necessary to create.

But cdc sources seem not to fit in this approach due to the one topic one table fashion.

Yes, we have such a design and you can see the progress here https://github.com/risingwavelabs/risingwave/issues/16003 (ready to public preview soon), thanks to @xxchan