Open cheungwsj opened 7 years ago
There are multiple issues requesting to do this. This solution and the one described by @hyperlink isn't as programmer friendly as it can be and can introduce race conditions (e.g. when I resume()
there may be events not yet dispatched).
I'm going to try https://github.com/nodefluent/node-sinek which seems to have backpressure
out of the box. My personal use-case will have every message handle a heavy transcoding process (a minute long), also skipping one message will be very hard to notice and later find in the haystack. This requires never accidentally marking an offset as committed, and does not need any microbatching. This almost drove me to use the low-level consumer but I want to use the new kafka api (consumerGroup). It would be nice if kafka-node
could solve this use-case just as well.
Here's a list of issues that I believe reference the need to explicitly commit one message at a time.
https://github.com/SOHU-Co/kafka-node/issues/502 https://github.com/SOHU-Co/kafka-node/issues/509 https://github.com/SOHU-Co/kafka-node/issues/617 https://github.com/SOHU-Co/kafka-node/issues/668 https://github.com/SOHU-Co/kafka-node/issues/683
After experimenting with node-sinek
I found that it implements the kafka-node
suggested solution of having a paused client queue of messages and committing offsets when the queue is empty. So if you have many messages in that queue you can't easily commit them one by one. So I've opened this issue there if the author would be interested.
https://github.com/nodefluent/node-sinek/issues/7
I found I can have better control over commits through https://github.com/Blizzard/node-rdkafka for my use-case of a kafka task queue. If you google "kafka as a task queue" you'll find multiple complaints and testimonials against it. Kafka is not a good fit for a task queue.
The google results:
I'm going back to rabbitmq.
YES, and NO. Yes, if you write the consumer and design the data/destination to support it. No, exactly once semantics can not be entirely implemented in a consumer in a distributed system.
Unless doing ALL work in a transactional database, this is a distributed computing problem where it is necessary to understand and deal with ALL the fallacies of distributed computing. The problem mentioned here relates to dealing with #1 reliability and failure recovery.
Using RabbitMQ or any other technology in any particular application does not solve the problem of failure recovery. Failure/recovery must at least in some part be embedded or systemic in the application implementation and destination and persistence design.
If an application reads a message from any queue Rabbit, Kafka, RDBMS, etc. and then processes the message into any remote system, and the application or remote system fails before acknowledging the processing back to the application or message queue, when the remote application or consumer restarts, the queue/topic nor application implicitly know whether the last message was processed by the remote system or not. The message must be processed idempotently at the destination(s) or the consumer must ask the destination(s) if the message has already been successfully processed.
This could be done by looking up the message ID or some transaction ID from the message at the remote destination(s) to see if it has already been processed. It's generally easiest/most efficient to perform sequential queue/topic message processing and only track the latest message and/or transaction ID. Sequential only has to be per queue or partition in the case of kafka.
If idempotent processing can be utilized, then just reprocess all messages from last acknowledged message/committed offset. Idempotent processing is simpler than tracking last message/transaction processed. Unfortunately it isn't always possible, i.e., increment/decrement counter operations.
The reason there are multiple partitions in Kafka is to support concurrent consumption (higher throughput). The key is to use a keyed message to route like type messages to the same partition. Key here means whatever is appropriate for your application to properly sequence processing and still increase concurrent throughput, i.e., message type, customer ID, product ID, etc.
I solved the exact same problem using ConsumerGroupStream. Node.js is single-threaded so it makes things very easy. Just call pause as the first action in the on('data') event and call resume as the last action.
kafkaConsumerGroupStream.on('data', (msg) => {
try {
kafkaConsumerGroupStream.pause()
await myCustomMessageProcessor(msg)
await kafkaConsumerGroupStream.commitOffset(msg, force=true)
} catch (err) {
errorProcessing
} finally {
kafkaConsumerGroupStream.resume()
}
})
Above uses await. kafka-node was wrapped to support async/await instead of using callbacks. Also the above is either called from or includes promise-retry wrapper to support configurable retries.
The application still has to check with the remote destinations on startup or recovery from any error to see if the last message was already successfully processed by destination(s), or the destination could do that if you own the destination and can modify the implemenation. Effectively that's idempotent processing though.
unless I am reading things wrong, you need to enable config setting on the producer : enable.idempotence = true
. However, I dont think kafka-node
supports this atm. https://github.com/SOHU-Co/kafka-node/blob/4a5a767328695558dc50fbf2d3370cf0cc0fe0a7/types/index.d.ts#L164-L168
@vasiliyb this guarantees producer deduplication but not consumption exactly once guaranty. There is a nice approach (https://stackoverflow.com/a/42167873) using a transactional database to achieve the mentioned issue, although I don't think it can be implemented in this library.
To explain, lets say you are using an event sourcing modeled application, when you are saving the aggregate inside a transaction you are also inserting to an events table a record that contains the processed offset. This way you avoid distributed transactions and you are doing an atomic operation on a relational/transactional database to ensure data consistency. Whenever your app crashes, you trace back the latest offset processed and continue to the next one.
Hi,
Need some help.
New to Kafka and read an article about implementing 'Exactly Once' consumer. Can I use kafka-node to achieve the same thing? If yes, would you please provide me some guidance.
Steps recommended from the article.
How to do step 3 and 4 in kafka-node? Please help.