BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
260 stars 39 forks source link

Too many threads in the process for a single consumer #231

Open mruslan97 opened 5 months ago

mruslan97 commented 5 months ago

Hello everyone! I have a question regarding the number of threads being created. As far as I know, librdkafka creates the following number of threads: Total number of threads = 2 + Number of brokers. However, when using the library in .NET, approximately 10-15 threads are consumed when creating and connecting a consumer, with the number of brokers being 3. What could be the reason for this? Are these some overheads of the .NET library? Is it possible to reduce their number in any way?

BEagle1984 commented 5 months ago

This issue definitely seems to be related to Silverback. I've never noticed this but I'll investigate further and report back. Please bear with me as I'm quite busy at the moment.

Thanks for your patience.

mruslan97 commented 5 months ago

Thank you for the response, I will be waiting. Actually, I have more suspicions about the Confluent.Kafka library, on which Silverback is based.

BEagle1984 commented 5 months ago

I was actually too quick to jump to conclusion. I did an extra test with a very simple consumer which connects with a local broker (cluster with 2 nodes running in docker) and the threads seem to be under control: image

I previously inspected a consumer connecting with a Confluent Kafka on AWS and I'm not sure how many nodes we have and what other things could come in play.

What's your use case? In which case do you see too many threads?

mruslan97 commented 5 months ago

Testing in the dev environment, we have Kafka with 3 brokers. ASP.NET on .NET 8 with the latest libraries. During the application startup with 1 test consumer before Kafka initialization: Current process thread id = 73171 count: 48 After: Current process thread id = 73171 count: 61 If I start the other consumers, the thread count exceeds 100. But these are not application threads, they belong to the process itself as far as I understand right (librdkafka)

image

image

BEagle1984 commented 5 months ago

I don't know how to figure out which threads are being spawn by librdkafka in dotTrace. I did a new trace and while I still see only 4 threads from librdkafka in process explorer: image

In dotTrace there's a bunch of native threads, which I cannot clearly identify: image

How did you parametrize the trace? (I'm a bit of a noob, I've to admit that.)

mruslan97 commented 5 months ago

To be honest, I also don't fully understand how to get a proper display of all threads. In sampling mode, it only highlights hotspots, and as I understand, not all threads are captured. The thread count was 160 when i started dotTrace.

image

consumer_poll is highlighted as problematic, but when drilling down, the thread display seems to be broken (possibly due to the beta version)

image
mruslan97 commented 5 months ago

I can also provide the method of initializing Silverback. A lot of reflection is used to simplify the configuration. Could this be the reason? Here is an example for consumers:

image

`public static IKafkaEndpointsConfigurationBuilder ConfigureTopics( this IKafkaEndpointsConfigurationBuilder kafkaEndpointsConfigurationBuilder, ICrypto crypto, string kafkaPrefix) { // Consumer mapping with a message and a topic var consumerTypes = AppDomain.CurrentDomain.GetAssemblies() .SelectMany(assembly => assembly.GetTypes()) .Where(t => t.GetInterfaces().Any(i => i.IsGenericType && (i.GetGenericTypeDefinition() == typeof(IKafkaConsumer<>) || i.GetGenericTypeDefinition() == typeof(IKafkaBatchConsumer<>))) && t.GetCustomAttribute() != null);

    foreach (var consumerType in consumerTypes)
    {
        var asmName = consumerType.Assembly.GetName().Name.Split(".")[0];

        var kafkaAttribute = consumerType.GetCustomAttribute<KafkaAttribute>();
        var messageType = consumerType.GetInterfaces()
            .First(i => i.IsGenericType && (i.GetGenericTypeDefinition() == typeof(IKafkaConsumer<>) ||
                                            i.GetGenericTypeDefinition() == typeof(IKafkaBatchConsumer<>)))
            .GetGenericArguments()[0];

        kafkaEndpointsConfigurationBuilder
            .AddInbound(messageType,
                endpoint =>
                {

                    endpoint.ConsumeFrom(kafkaPrefix.ToLower() + kafkaAttribute.Topic.ToLower());
                    endpoint.Configure(config =>
                    {
                        config.GroupId = asmName + "-" + consumerType.Name;
                        config.AutoOffsetReset = kafkaAttribute.AutoOffsetReset;
                    });

                    if (consumerType.GetInterfaces().First().GetGenericTypeDefinition() ==
                        typeof(IKafkaBatchConsumer<>))
                    {
                        TimeSpan? maxWaitTime = null;
                        if (kafkaAttribute.MaxWaitTime > 0)
                        {
                            maxWaitTime = TimeSpan.FromSeconds(kafkaAttribute.MaxWaitTime);
                        }

                        endpoint.EnableBatchProcessing(kafkaAttribute.BatchSize, maxWaitTime);
                    }

                    endpoint.DeserializeUsing(new KafkaCryptoDeserializer(crypto, new JsonMessageSerializer()));

                    endpoint.IgnoreUnhandledMessages();
                    if (kafkaAttribute.RetryCount > 0)
                    {
                        endpoint.OnError(e => e.Retry(kafkaAttribute.RetryCount)
                            .ThenSkip());
                    }
                    else
                    {
                        endpoint.OnError(e => e.Skip());
                    }
                }
            );
    }
}`
BEagle1984 commented 5 months ago

Try to inspect the running process using process explorer: https://learn.microsoft.com/en-us/sysinternals/downloads/process-explorer#download It's way simpler and easier to see where the threads are coming from.

I really doubt that whatever you do in .NET could have an impact in the threads being spawned by the wrapped C library.

mruslan97 commented 5 months ago

Unfortunately, I'm on macos. I will try to look for alternatives and continue my investigation.

BEagle1984 commented 5 months ago

Keep me posted @mruslan97, and let me know if I can support you further. If there's indeed an issue, I'm very interested in solving it.

If you can create a reproduction project, perhaps by consuming from a local broker, I'd be happy to take a look and try to figure out what's going on.

mruslan97 commented 5 months ago

@BEagle1984 I haven't been able to capture a full dump with all the application threads yet. But I suspect that I made some configuration mistakes. We have several consumers reading from the same topic (yes, I know it's a bad practice), and for each of them, I am calling .AddInbound in a loop. Probably, this isn't necessary, and a single .AddInbound with several subscribers added through AddSingletonSubscriber would suffice?

BEagle1984 commented 5 months ago

It depends. Both approaches work, but the main difference lies in the error handling.

The subscribers won’t be transactional. If you use a single consumer (single inbound) and one subscriber throws an exception, the error policy will either skip or retry the message for all subscribers. If you can tolerate inconsistencies (i.e., some subscribers process the message successfully while others don’t) and/or your subscribers are generally idempotent, this setup is fine.

Otherwise, use a single subscriber that acts as a proxy, invoking all involved parties and handling the transaction to ensure consistent processing.

If you use a consumer (inbound) per subscriber, each one will be decoupled, making it easier to ensure that each message is processed exactly once by each subscriber. Of course, each consumer will open its own connection and thus require its own threads.

Also, if you add multiple consumers for the same topic and always deserialize to the same message type, all subscribers will be invoked by all consumers, multiplying the processing of each message.

As a side note, Silverback doesn't currently support having a single consumer for multiple topics if you need to use different serializers, etc. However, this will be improved and optimized in the upcoming v5, where you will have much higher control over the created clients.