Closed ghost closed 7 years ago
Just to be sure, you are not running the brokers on same machine? Only this program running? How many messages/sec are you consuming (/producing if not same) and of what size?
If you can, can you share your code? With this result?
this is consume: public override void Pop() { using(var consumer = new EventConsumer(this.consumerConfig,this.Config.HostIps)) { try { long messageCount = 0; consumer.OnMessage += (obj,msg) => { Console.WriteLine(string.Format("TopicPartitionOffset: {0} instanceId: {1}",msg.TopicPartitionOffset,System.Threading.Thread.CurrentThread.ManagedThreadId)); Logger.WriteLog(System.Diagnostics.TraceLevel.Info,(string.Format("TopicPartitionOffset {0} instanceId {1}",msg.TopicPartitionOffset,System.Threading.Thread.CurrentThread.ManagedThreadId))); OnMessageReceived(msg.Payload); messageCount++; }; consumer.OnPartitionsAssigned += (obj,partitions) => { Console.WriteLine(string.Format("Assigned partitions: [{0}], member id: {1} - instanceId: {2}",string.Join(", ",partitions),consumer.MemberId,System.Threading.Thread.CurrentThread.ManagedThreadId)); consumer.Assign(partitions); };
if(this.consumerConfig.EnableAutoCommit) {
consumer.OnOffsetCommit += (obj,commit) => {
if(commit.Error != ErrorCode.NO_ERROR) {
Logger.WriteLog(System.Diagnostics.TraceLevel.Error,($"Failed to commit offsets: {commit.Error}"));
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ",commit.Offsets)}]");
};
}
consumer.OnStatistics += (obj,json) => {
Logger.WriteLog(System.Diagnostics.TraceLevel.Info,$"Statistics: {json}");
};
consumer.OnPartitionsRevoked += (obj,partitions) => {
Console.WriteLine($"Revoked partitions: [{string.Join(", ",partitions)}]");
consumer.Unassign();
};
consumer.OnEndReached += (obj,end) => {
Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
};
consumer.OnConsumerError += (obj,errorCode) => {
Console.WriteLine($"Consumer Error: {errorCode}");
};
consumer.OnError += (obj,error) => {
Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
};
consumer.Subscribe(new List<string> { this.Config.QueueName });
consumer.Start();
Console.WriteLine("Started consumer, press enter to stop consuming - InstanceId: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
ConsoleReader.ReadLine();
} catch {
throw;
} finally {
Logger.WriteLog(System.Diagnostics.TraceLevel.Info,(string.Format("Stopping consumer {0}", System.Threading.Thread.CurrentThread.ManagedThreadId)));
consumer.Stop();
}
}
}
this is produce: public override async void Push(string key, byte[] messageBuffer) { int retry = 0; while (retry <= 1) { try { topic.Produce(messageBuffer, this); return; } catch (Exception ex) { Console.WriteLine(ex.ToString()); if(retry > 0) { Console.WriteLine("Failure while producing ... " + ex.ToString()); Logger.WriteLog(System.Diagnostics.TraceLevel.Error,"Failure while producing... " + ex.ToString()); throw ex; } } finally { retry++; } } }//Push
Remember to format your code properly, will be easier to read ;)
so that makes: consume 4msg/s and produce 5k msg/s, about 20bytes by message. Should have no problems with this, I do it on my work machine which is less powerfull than your machine and don't have any memory issue (have max 300Mb used when reading / producing at > 1MB/s, about 15k msg/S
With your code i don't see exactly how you call Push, especially your delivery handler implementation - I suspect it might come from this. By the way, why is your Push async (given you use Produce with IDeliveryHandler) and why do you not use the key parameter in you method? You can also check your broker logs and enable logs (set debug config : https://github.com/ah-/rdkafka-dotnet/wiki/Faq#something-strange-is-going-on-how-do-i-find-out-what-rdkafka-is-doing )
You can also try to consume only to check it doesn't come from that
apologies for that... With current design, my application is long running and never dies. The memory utilization keeps growing mostly and do drops too. but after some times... it only grows and never releases(as shown in the original post).
The actual program who uses the pop/push method, create 8 parallel consumer thread and consume the messages.
About the IDeliveryHandler, i changed, but still no change. About the logging, is there any specific option needs to enable see the memory issue?
Not one debug option i'm aware off, but you could use a CLR memory profiler (either the one in visual studio 2013+ by doing snapshot, or Microsoft CLR profiler, or an other tool like dotTrace if you have a licence) Why are you creating 8 consumer thread in the same app? It should not help - quoting librdkafka creator :
You typically only run one consumer instance for a group per application+host. It doesn't really make sense to have multiple consumer instances for the same group in the same application.
it's only useful to create multiple consumer on multiple host (or eventually multiple application) You will just add complexity, librdkafka is already implementing threading internally, you just add complexity
I also have long duration app but don't have any issue. If you still have the problem, could you just provide a complete working example reproducing the problem? Thanks!
Hello @treziac , Thank you for your valuable inputs. Memory issue has been fixed. I will be implementing as per you suggestions here.. that make sense to create single consumer instance. Looks like in my case, once i subscribe the OnMessage , there am planning to use the parallel option(says by no of cores) to speed up the processing.. Wondering if you have sample program to use the parallel on OnMessage subscribe event.
Just to know, how did you resolve memory issue? Problem on your side?
If you really need to parallalize while sticking with HighLevel consumer (Subscribe) you can call Consume in multiple thread (consume is thread-safe) from the consumer you created. If you look at EventConsumer, you would just have to create multiple LongRunning task like this
change Task consumerTask
in List<Task> consumerTasks
;
Change Start and Stop like this
public void Start(int nbThread = 1)
{
if (consumerTasks != null)
{
throw new InvalidOperationException("Consumer task already running");
}
consumerCts = new CancellationTokenSource();
var ct = consumerCts.Token;
for (int i = 0; i < nbThread; i++)
{
consumerTasks.Add(Task.Factory.StartNew(() =>
{
...
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}
}
public async Task Stop() { consumerCts.Cancel(); try { await Task.WhenAll(consumerTasks); } finally { consumerTasks = null; consumerCts = null; } }
but you will have to make sure that function pointed by OnMessage, OnEndReached and OnConsumerError are threadSafe I think. Also note you can't guarantee your message are in order in this case
you might want to see this:
https://github.com/edenhill/librdkafka/issues/539
https://github.com/edenhill/librdkafka/pull/628
> Okay, are you planning on using the high-level consumer or the simple consumer?
> The former puts all messages in the same queue so that isn't perfect for your use-case, but the latter allows consumption per-partition and thus per-thread.
> If you need rebalancing you really dont want to do that yourself, then it is better to use KafkaConsumer as-is and dispatch messages to your worker threads from a single consumer->consume() thread precisely as you say.
>
> Having said that it would not be a huge change to allow per-partition consumption in the high-level consumer, or more precisely, partition queue message routing.
Others solutions:
- use simple consumer which provide separate queue for each partition but are currently not linked in c# wrapper, you can look at librdkafka performance example for hint on how to implement that (may look to implement this later)
- just have one Consume thread (current EventConsumer) but distribute message to working thread in a round robin faschion when subscribing to OnMessage, but i think it may cause issue if you receive mesage quicker thant you process them
- Try your idea of many consumer but it will create overhead (1 connexion by broker for each producer and all metadata & co will be duplicated) Consumers instance should be independent one from another, but there is no real benefit except easier implementation (unless of course you are deploying on many machines, so one consumer by machine, and you can process messages from one partition independently from the others - way to go!)
(Still may say some mistakes, that's how I understand how it works behind)
@treziac , thank you. yes the memory issue was from my side.
I see lot of these errors on console.. (I have tried some from https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) but no luck, Once these error occurred the complete process gets slowed down. I do not see any error in brokers log files.
RdKafka.RdKafkaException: Failed to produce message (Error _TIMED_OUT_QUEUE - Local: Timed out in queue -- bcz of this error messages are getting lost too.
Error: _TRANSPORT 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.521|rdkafka#consumer-6|FAIL| 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.521|rdkafka#consumer-1|FAIL| 10.61.19.87:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.521|rdkafka#consumer-4|FAIL| 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.521|rdkafka#consumer-12|FAIL| 10.61.19.89:9092/bootstrap: Connection closed Error: _TRANSPORT 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.524|rdkafka#consumer-8|FAIL| 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.524|rdkafka#consumer-11|FAIL| 10.61.19.88:9092/bootstrap: Connection closed Error: _TRANSPORT 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.528|rdkafka#consumer-15|FAIL| 10.61.19.89:9092/bootstrap: Connection closed Error: _TRANSPORT 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.532|rdkafka#consumer-3|FAIL| 10.61.19.87:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.533|rdkafka#consumer-5|FAIL| usiadansetl04:9092/4: Connection closed 5|2016-12-08 14:08:43.533|rdkafka#consumer-10|FAIL| 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.534|rdkafka#consumer-4|FAIL| usiadansetl05:9092/7: Connection closed Error: _TRANSPORT 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.534|rdkafka#consumer-3|FAIL| 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.539|rdkafka#consumer-9|FAIL| 10.61.19.89:9092/bootstrap: Connection closed Error: _TRANSPORT 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.539|rdkafka#consumer-9|FAIL| usiadansetl03:9092/1: Connection closed Error: _TRANSPORT usiadansetl03:9092/1: Connection closed 5|2016-12-08 14:08:43.557|rdkafka#consumer-10|FAIL| usiadansetl03:9092/1: Connection closed Error: _TRANSPORT usiadansetl03:9092/1: Connection closed 5|2016-12-08 14:08:43.557|rdkafka#consumer-12|FAIL| usiadansetl03:9092/1: Connection closed Error: _TRANSPORT usiadansetl03:9092/1: Connection closed 5|2016-12-08 14:08:43.558|rdkafka#consumer-8|FAIL| usiadansetl03:9092/1: Connection closed 5|2016-12-08 14:08:43.558|rdkafka#consumer-7|FAIL| 10.61.19.89:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.572|rdkafka#consumer-14|FAIL| 10.61.19.88:9092/bootstrap: Connection closed Error: _TRANSPORT 10.61.19.88:9092/bootstrap: Connection closed 5|2016-12-08 14:08:43.572|rdkafka#consumer-1|FAIL| usiadansetl04:9092/4: Connection closed
resolved the timeout issue by increasing the msg timeout duration..
@AKR2015 I meet the same error, can you tell me the detail ?
I am using v 0.9.2-ci-179 version with broker 0.9.1.
My program continuous consume the messages from one topic and produce to different topic. This is continuous running program.
Details as be below.
1. The client machine configuration is 32 GB RAM with 8 cores. its windows 2012. 2. Consuming messages from topic has 12 partitions runs with 8 consumer thread. 3. During the program startup.During first few minute of the program... it usages the 8-10 GB of memory...then after that onward... it keep increasing and decreases the memory.. After around 3-4 hours later... it used the whole memory of the system and the program hangs.. 4. Noticed even are some ideal time in the process where there is no messages to consume and produce... but the memory utilization does not come down.... it just stays as it..
5. attached the snapshot the memory utlization....