First, I apologize for submitting this PR without prior notice.
This PR includes the following changes:
Refactor buffer management into a separate BroadwayKafka.Producer.Buffer module.
After fetching messages from the broker, prepend the messages to the front of the queue, and then dequeue the items based on demand.
Here are my reasons for these changes:
1. Refactor buffer management into BroadwayKafka.Producer.Buffer
I've been trying to understand the buffer system in this library and found it a bit challenging to follow. In the main branch, when dequeuing items, we are also updating acks, which complicates the logic around dequeue.
In addition because the buffer management is deep inside the producer, we are not able to test it. If it was just a simple queue pop in/out, I wouldn't be worry about the test but as you might know, we have quite a logic going on in the buffer.
2. enqueue and dequeue messages after each pull
The first reason for this change is that it provides a simpler approach, as messages follow the same path: first enqueued, then dequeued.
The second reason is that, when there are no new messages to enqueue, the dequeue operation picks up available messages from the queue, increasing data throughput.
Hello,
First, I apologize for submitting this PR without prior notice.
This PR includes the following changes:
Here are my reasons for these changes:
1. Refactor buffer management into BroadwayKafka.Producer.Buffer
I've been trying to understand the buffer system in this library and found it a bit challenging to follow. In the main branch, when dequeuing items, we are also updating acks, which complicates the logic around dequeue.
In addition because the buffer management is deep inside the producer, we are not able to test it. If it was just a simple queue pop in/out, I wouldn't be worry about the test but as you might know, we have quite a logic going on in the buffer.
2. enqueue and dequeue messages after each pull
The first reason for this change is that it provides a simpler approach, as messages follow the same path: first enqueued, then dequeued.
The second reason is that, when there are no new messages to enqueue, the dequeue operation picks up available messages from the queue, increasing data throughput.