Kafka delta ingest currently requires a message to trigger the latency timer check for flushing buffers. It would be better if we ran the latency timer on a separate thread to trigger flushes - especially for low volume topics that receive periodic writes. For these low volume topics that are triggered periodically - we suck everything in from Kafka and it just sits in buffer until we get another message.
I've changed the run loop to be an infinite loop and wrapped the invocation of consumer.stream().next() with an invocation of tokio::time::timeout.
The consume_timeout_duration method of IngestProcessor returns the appropriate timeout duration for each iteration of the run loop based on how much time has elapsed since the latency timer was started.
I addressed another issue related to the latency timer (not documented in the original issue), where the first message received after startup may trigger a write of a single row. This happens because the latency timer is initialized before the Kafka stream is fully initialized. I've added logic to re-initialize the latency timer after consuming the first message, but before any flush checks.
Remove dummy messages from feed_s3_tests and email_s3_tests since we don't need to send those anymore
After merge, I plan to start a branch to reverse integrate this into https://github.com/delta-io/kafka-delta-ingest/pull/114 and fix some flaky test problems @thovoll and I have seen on that PR. The latency timer fixes should make some of those flaky tests easier to troubleshoot since we won't be doing weird things like sending bad messages to trigger a flush anymore.
This PR addresses https://github.com/delta-io/kafka-delta-ingest/issues/74, but by using a timeout future instead of the approaches described in the issue. 🙇 to @houqp for pointing me to the Timeout struct.
From the original issue:
I've changed the run loop to be an infinite
loop
and wrapped the invocation ofconsumer.stream().next()
with an invocation oftokio::time::timeout
.The consume_timeout_duration method of
IngestProcessor
returns the appropriate timeout duration for each iteration of the run loop based on how much time has elapsed since the latency timer was started.I addressed another issue related to the latency timer (not documented in the original issue), where the first message received after startup may trigger a write of a single row. This happens because the latency timer is initialized before the Kafka stream is fully initialized. I've added logic to re-initialize the latency timer after consuming the first message, but before any flush checks.
The PR also contains a few drive-bys as well:
app_id
in the playground testAfter merge, I plan to start a branch to reverse integrate this into https://github.com/delta-io/kafka-delta-ingest/pull/114 and fix some flaky test problems @thovoll and I have seen on that PR. The latency timer fixes should make some of those flaky tests easier to troubleshoot since we won't be doing weird things like sending bad messages to trigger a flush anymore.