Closed accelerated closed 4 years ago
Not sure why one of the sync producer tests fails intermittently on Travis. I ran it in a loop on latest 1.3.0 rdkafka version and i never got it to fail. Made some cmake changes to allow easier test setup in other environments outside of Travis.
Ok this should be good now. I figured out the intermittent failing. Essentially, when calling sync_flush
with a timeout, the first change I made was calling wait_for_acks(timeout)
right before waiting on the future. There was a race condition which happened sometimes when wait_for_acks
would actually time out, it would then block on the future in the while loop iteration. Because the main thread was now blocked, the queue was no longer flushed, which meant that the delivery callback was never called, thus the future would never return. I never saw this in my testing because we always poll/flush on a separate thread so this blocking never happened. In the Catch2 test suite, the setup is simpler so all the polling and flushing is done by the same main thread.
Wow this is so much code. I'll hold off until I'm more awake, it's 7 am here :).
Wow this is so much code. I'll hold off until I'm more awake, it's 7 am here :).
Sure! I had to cleanup this class and also fixed a race-condition + some performance problems.
Well I dunno, I can't really follow what's happening in this class anymore without sitting for a few hours on it. I presume you've tested this and it works fine for you so I'll trust that it works.
Sorry for the delay on this!
@mfontanini thanks! it's on beta at the moment and no issues. Comparing the diff is a bit hard, I would recommend just reading the code as-is. It's shorter now and simpler so it should be easy to follow (hopefully). In any case, if any bugs arise, i'm very committed to fixing them asap.
Added more comments and clarifications for the
BufferedProducer
class. There are also small changes namely:has_internal_data_
toenable_message_retries_
to make its intent clearer.Tracker
logic inside the class for better expression of functionality.flush(timeout)
. The previous function was too complicated and simply wrong because it was using the timeout parameter as a total time spent flushing instead of being passed towait_for_acks
which is what it was originally supposed to do. As a result, the previous implementation would stop flushing after N messages, just because the time was up, and then re-enqueuing them back which was a performance loss. The current code simplifies everything by having a single implementation forflush
. The defaultflush()
with no timeout simply calls the other overload with the default producer timeout which is consistent with all other overloads in the class. An overload forsync_produce(timeout)
was also added to facilitate all this.sync_produce()
just redirects tosync_produce(default_timeout)
underneath.async_flush
in terms offlush(milliseconds(0))
. The logic was identical and merging these two makes code more intuitive.AckMonitor
which properly tracks acks for each thread as well as globally. This is needed in multi-threaded producers so that each producer only waits until his last ack is received. Otherwise, in the previous implementation, each producer would wait until acks from all producers were received, which is unnecessary and it would slow down production.