confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
82 stars 128 forks source link

Question: Does Parallel-consumer have state that we can read from? #484

Open Ehud-Lev-Forter opened 1 year ago

Ehud-Lev-Forter commented 1 year ago

Hi, We have noticed that during high load, one of the consumers can get stuck, meaning it will report the same lag, but will not process anything, we were not able to reproduce it locally, but we are afraid that it will happen again. 

In our kafka-stream process we supervise state to monitor the process, if we see an error or problem, we restart the process. Does ParallelConsumer have something similar?

astubbs commented 1 year ago

Hi, no, not at the moment. But you can check on a per record basis - the record context in your user function, for failed processing counts. We’re developing the metrics and monitoring systems now:

What were the logs saying when this happened? Are you checking the record context for failure attempts in your user function? How did you resolve the situation? Just kill the pc that you thought was stuck?

Lag is a bit more complicated with PC. For example, if there’s a single poison pill that can’t process and your user function doesn’t give up retrying it, lag could show 1,000 but there may only be a single record that is “stuck”.

The new monitoring and metrics functions will allow you to see all this info.

Cc @nachomdo

Ehud-Lev-Forter commented 1 year ago

Hi, thank you for your response, We did get this issue again, when it happened we saw many warning logs that looked like the following: Warning: 1 records in the queue have been waiting longer than 10s for following topics [my-topic-2].  Important to mention that other processes also got those warnings but much less.  We are using version 0.5.2.4 with the "reactor consumer" and till now I didn't notice the "record context", we used the context.streamConsumerRecords()instead. I will check it out and see if I can catch errors through that. 

As for now, the only way for us to resolve the issue is to track the lag (using external metrics) and restart the process. 

astubbs commented 1 year ago

Are you logging any processing failures in your function? Do you see any ERROR level events in the log? If there really is a "stuck" record, it should still get given to the user function to attempt.

Ehud-Lev-Forter commented 1 year ago

Yes, I am printing everything I can (error-info), I also trying to catch all exceptions and send records that has failed into different DLQs. I am printing the record.numberOfFailedAttempt but I did not get any of those yet. I had the issue again, and I restarted the process again, and I got the following warning after the restart: Truncating state - removing records lower than 54845363. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 54845363 but expected 1 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.","logger_name":"io.confluent.parallelconsumer.state.PartitionState","thread_name":"pc-control","level":"WARN",

I am now trying to tackle the issue by adding some kind of supervisor that detects that this consumer is not really doing anything and kill it.

astubbs commented 1 year ago

What is the head offset of that partition? What is your auto offset reset policy set to?

Ehud-Lev-Forter commented 1 year ago

I don't have the historical information, I can say that now ( 1.5 hours after the restart) it is CURRENT-OFFSET LOG-END-OFFSET LAG 54203593 54203720 127 Must say that this does not make sense to me since it seems that it complained on future offset.

Update: It might be that this log was talking on another partition, this process consumed from another partition as well: 55016222 55016324

As for the auto offset reset policy earliest

astubbs commented 1 year ago

Update: It might be that this log was talking on another partition, this process consumed from another partition as well: 55016222 55016324

ah - does the log message not include the partition it's talking about?

@Ehud-Lev-Forter , if you get a chance, take a look at this PR and see if there's any other metrics you'd like?

485

astubbs commented 1 year ago

cc @nachomdo

Ehud-Lev-Forter commented 1 year ago

Looks like we can benefit a lot from this PR as is. As for other metrics, I am not sure if it makes sense, but I can think those:

  1. Throttling or rate limit, I saw that PC push back some work when things get slow.
  2. Maybe slowWork.size gauge.

As for the warn log, yes it is missing the partition log

astubbs commented 1 year ago

1 - yup, that status will be in there 2 - oh - that's a good idea!