Open darrrrek opened 5 years ago
Hi @darrrrek. haven't done load tests yet but I'll start setting up something soon and will let you know.
Can you give some details on the test you're performing? (topics, partitions, KafkaOptions, etc..)
@darrrrek hey just an experiment: can you please try set a low PollBufferTimeout
and see how it goes? thanks for your patience!
Hi @jonathansant I have created topics with 8 partitions, kafka is installed locally on my laptop.
My KafkaOptions:
.AddKafkaStreamProvider(ConfigurationClass.CommandsStreamProviderName, options =>
{
options.BrokerList = new List<string> { "localhost:9092" };
options.ConsumerGroupId = "TestGroupId";
options.Topics = new List<string>
{
"testtopic1",
"testtopic2",
"testtopic3",
"testtopic4",
};
options.PollTimeout = TimeSpan.FromMilliseconds(10);
})
This is how I'm sending messages:
var commandStreamProvider = client.GetStreamProvider(ConfigurationClass.CommandsStreamProviderName);
var commandStream = commandStreamProvider.GetStream<ICommand>(Guid.Empty, "testtopic1");
Debug.WriteLine("Start: " + DateTime.Now);
for (int i = 0; i < 3000; i++)
{
await commandStream.OnNextAsync(new TestCommand());
}
Debug.WriteLine("End: " + DateTime.Now); // it takes about 12 seconds (only 3000 messages)
Small piece of my consumer stream grain:
[ImplicitStreamSubscription("testtopic1")]
public class ApplicationCommandHandler : Grain, IApplicationCommandHandler
{
private int _counter;
private DateTime _first;
private DateTime _last;
public override async Task OnActivateAsync()
{
try
{
var streamProvider = GetStreamProvider(ConfigurationClass.CommandsStreamProviderName);
var stream = streamProvider.GetStream<ICommand>(this.GetPrimaryKey(), "testtopic1");
await stream.SubscribeAsync(async (data, token) => await ConsumeCommand(data));
await base.OnActivateAsync();
}
catch (Exception ex)
{
_logger?.LogError(ex.Message);
throw;
}
}
private async Task ConsumeCommand(ICommand command)
{
switch (command)
{
case TestCommand cmd:
_counter++;
Debug.WriteLine("consumed: " + _counter);
if (_counter == 1)
{
_first = DateTime.Now;
}
if (_counter == 3000)
{
_last = DateTime.Now;
Debug.WriteLine("Start: " + _first);
Debug.WriteLine("End: " + _last);
}
break;
default:
break;
}
}
Consumer works very very slow, just few messages consumed per second. Greetings
@darrrrek hey just an experiment: can you please try set a low
PollBufferTimeout
and see how it goes? thanks for your patience!
I set PollBufferTimeout to 5 ms and no improvement after this :)
@jonathansant I have found that producing of messages is very slow. For example I'm producing 1000 of commands one by one by:
for (int i = 0; i < 1000; i++)
{
await commandStream.OnNextAsync(new TestCommand());
}
It takes about 37 seconds (too long).
I have checked deeper in your code, and I have found this:
await _producer.Produce(batch);
which actually runs extension method:
public static Task Produce(this Producer<byte[], KafkaBatchContainer> producer, KafkaBatchContainer batch)
=> **producer.ProduceAsync**(
batch.StreamNamespace,
new Message<byte[], KafkaBatchContainer>
{
Key = batch.StreamGuid.ToByteArray(),
Value = batch,
Timestamp = new Timestamp(DateTimeOffset.UtcNow)
}
);
Confluent.Kafka producer.ProduceAsync is very slow, it takes about 40ms when you are using await for one message. Just for tests I have removed await:
_producer.Produce(batch);
And now it works much faster, but still not enough.
Improvement:
1) from 5-10 msg/s to about 150-200 msg/second 2) producting 1000 of messages from 37 seconds to few miliseconds
Greetings
@darrrrek sorry for not replying earlier, but I was on holiday. I will be looking into the issue in the coming week.
@jonathansant no problem, looks like confluent.kafka works very slow under windows, this issue shouldn't occur under linux. It's related with librdkafka:
librdkafka uses to poll both the socket and it's internal message queue for new messages simultaneously isn't possible on Windows, so message delivery can be delayed due to this on Windows (this is on the roadmap to fix, but it requires quite a big change to librdkafka).
@darrrrek found your issue: https://github.com/confluentinc/confluent-kafka-dotnet/issues/563
Did your perf improve after setting { "socket.blocking.max.ms", 1 }? I can add the option if it helps you.
@jonathansant I don't see any improvement with this option. It's not solving this issue.
Hi @jonathansant
Few days ago, I have tried to migrate from rabbitmq stream provider to kafka stream provider. Before, performance of streams was very good in my case more than 3k messages produced/consumed.
After migration to Kafka, performance is very very low, only few messages per second. I'm trying to find where is the reason, but without successs.
Any help will be appreciated :) Have a nice day