Closed dasch closed 7 years ago
I think I have a need for this.
What I need to ensure is that for a given single message - I will be doing a bunch of work. I need to ensure that work completes successfully before moving on. If it does succeed - I want to ensure the offset is updated and it will not get processed again.
So the options around time and number of messages for the offset to get updated is a little to course grain.
To get across the idea:
consumer.each_message do |message| successful = process rather large transaction related to this single message (could take several min.) if successful
# if I die in the next message before commit is made I could have to process this transaction again which would not be good
else
# commit should not be made
exit (or maybe break to run some other code like logging what the hell happened)
end end
I think my use case may be much more simplistic than what this issue was created for. Maybe there is a way I could do this now?
Or maybe I should have put this comment in #112. Though in my use case I will only have on consumer...
If you spend several minutes processing a single message, you can be sure that the offset is committed after finishing processing it – you configure the offset commit policy either on a time basis (every x seconds) or on a message basis (every x messages), or both. You could set it to commit after every single message.
That leaves the reprocessing. If the consumer has been kicked from the group or the offset commit fails for some other reason, the message will be reprocessed. I'm not sure you need a dedicated offset management API to handle that, though. Let's say you end up writing the result of your processing to a SQL server. When doing that, make sure to also save the partition/offset pair of the message in the same transaction. Then, before processing a message, check whether its offset is great than the one you've stored – if it isn't, skip it.
Note though that you'll probably want to tune the session timeout parameter if you plan on spending that much time per message, as Kafka will assume that the process has died if it doesn't send a heartbeat with a frequency defined by that timeout, and heartbeats are only sent after the processing block has returned.
Hmmm… Maybe I should explain my use case in more detail.
I am developing a “batch” job that will run every 15 min or so. It will read all the changes in a SASS system (Zuora to be exact). Specifically it gets all the table changes for a bunch of tables. The result could be large (right after a bill run) or empty (like when run in the middle of the night). I get an ID for the job. Processing of that job can be quick or take a long time – and indeed the processing of the job could fail if Zora is down or networking problems (in which case I want to try again on the next run). So I want to save some state – namely this job ID.
Since most of processing is turning those table changes into Kafka messages – I was thinking maybe I’d save this state (job id) in Kafka as well.
I’ve been thinking along these lines – have a couple of questions in the pseudo code below…
Have a special state_topic that holds the job id. Kick-off a query save the job-id on the special state_topic state_consumer.each_message do message Get the results of the found job id – write out to all the various kafka topics representing those tables (This could take a while and could fail) If error raise “failed – message should not be committed – right?” If message.empty? Nothing to do - exit — but this doesn’t work with consumer api – and with fetch_messages api I can’t save the offset – right? Next – if I go into the loop again – this is when this message gets committed? Is that right? (assuming, of course, I set offset_commit_threshold: 1 on the consumer group) End
Now maybe this whole idea is stupid and it would be abusing Kafka for something it should not be used for… Or perhaps there is a different design others have used to solve this use case?
Assuming – I’m right on my questions above – I think I’m still stuck unless I have a way for consumer.each_message to return if no messages are available.
Ray
From: Daniel Schierbeck notifications@github.com<mailto:notifications@github.com> Reply-To: zendesk/ruby-kafka reply@reply.github.com<mailto:reply@reply.github.com> Date: Thursday, June 16, 2016 at 9:09 PM To: zendesk/ruby-kafka ruby-kafka@noreply.github.com<mailto:ruby-kafka@noreply.github.com> Cc: Ray Johnson rjohnson@yp.com<mailto:rjohnson@yp.com>, Comment comment@noreply.github.com<mailto:comment@noreply.github.com> Subject: Re: [zendesk/ruby-kafka] Allow manually committing consumer offsets (#126)
If you spend several minutes processing a single message, you can be sure that the offset is committed after finishing processing it – you configure the offset commit policy either on a time basis (every x seconds) or on a message basis (every x messages), or both. You could set it to commit after every single message.
That leaves the reprocessing. If the consumer has been kicked from the group or the offset commit fails for some other reason, the message will be reprocessed. I'm not sure you need a dedicated offset management API to handle that, though. Let's say you end up writing the result of your processing to a SQL server. When doing that, make sure to also save the partition/offset pair of the message in the same transaction. Then, before processing a message, check whether its offset is great than the one you've stored – if it isn't, skip it.
— You are receiving this because you commented. Reply to this email directly, view it on GitHubhttps://urldefense.proofpoint.com/v2/url?u=https-3Agithub.com_zendesk_ruby-2Dkafka_issues_126-23issuecomment-2D226676079&d=CwMFaQ&c=lXkdEK1PC7UK9oKA-BBSI8p1AamzLOSncm6Vfn0C_UQ&r=o8F7RwopToqYNoYEJq4R4g&m=ZA0QNH-Z6esiYH0LWSxTE4ioO-ZVxkdhYKwreZQ1ar8&s=CpVbLZJpfiwY9UipB0mbUWyumb3hBWmn08_D9j5PFKs&e=, or mute the threadhttps://urldefense.proofpoint.com/v2/url?u=https-3Agithub.com_notifications_unsubscribe_ADULOPlu0vcMoINlKf9VyyON0Wnpguv8ks5qMh3ngaJpZM4HqzEc&d=CwMFaQ&c=lXkdEK1PC7UK9oKA-BBSI8p1AamzLOSncm6Vfn0C_UQ&r=o8F7RwopToqYNoYEJq4R4g&m=ZA0QNH-Z6esiYH0LWSxTE4ioO-ZVxkdhYKwreZQ1ar8&s=cUrRXPgOX1hmHZy49Q0vJlnCyLUI-3GW9xHvDcJoJas&e=.
It sounds like you're trying to use consumer groups for something they're not intended for. The groups API expects consumers to periodically send heartbeats indicating that they're still active, and I don't think the interval between these can be that long. Furthermore, there's really no way around duplicates in the topics you'll be writing to, as your job may fail halfway through. You could maybe split the job into smaller parts – as a rule of thumb, you can only guarantee atomicity when writing to a single data store, in this case a single Kafka partition.
@dasch I was about to write up a new issue for my use case, but I think it may be solved by providing this feature.
We currently have consumers that use poseidon_cluster
, so their coordination happens via zookeeper. I’d like to move the consumers to use ruby-kafka
, without dropping any messages in the process. As I see it, my options are:
start_from_beginning: true
. This will reprocess all messages in Kafka’s log. For our idempotent consumers, this is not a huge deal, but not all of consumers are idempotent (some of them write messages in Slack, for example). Also, since our Kafka cluster retains messages for up to 15 days, this would introduce a large initial delay when the ruby-kafka
-based consumers start, before they are caught up.start_from_beginning: false
. This will not reprocess the messages in Kafka, but will drop messages produced after my poseidon_cluster
-based consumer process stops, and before the ruby-kafka
-based consumer process starts.If ruby-kafka
offers an API to manually commit consumer offsets, migrating away from poseidon_cluster
becomes much easier: stop the old consumer processes, and note the last-processed offset for each partition. Use those values to commit offsets to kafka, and then start the ruby-kafka
-based version of the consumers. They will pick up the very next message, and will not drop any messages regardless of how many messages were produced during the migration window.
In an ideal scenario, I’d love to be able to do something like this:
consumer.subscribe("things", start_from_offsets: {
0 => 123,
1 => 456,
2 => 789,
})
Where the start_from_offsets
hash is of the format partition_number: offset
. Alternatively, being able to commit the offsets manually in a separate call(s) would also solve my problem.
Does the above make sense, or do you perhaps have another suggestion on how to do this?
I've been thinking about providing something like a seek
method that allows you to set the position for each partition, however I still haven't found a use case that absolutely requires this.
In your case, I think a better solution would be to stop the old consumers and do something like this:
consumer = kafka.consumer(...)
consumer.subscribe("things", start_from_beginning: true)
zk_offsets = MyZookeeper.get_them_offsets
consumer.each_message do |message|
if message.offset < zd_offsets[message.topic][message.partition]
next # skip this message, it's already been processed
end
...
end
That is, consume but skip messages with an offset less than the one in zookeeper. Would that solve your problem?
Yeah, I see that as a variation of number 1 above and it is probably what I will end up doing.
It feels a bit weird/hacky to have the same consumer re-consume a topic just because its internal implementation changes, but since it is one-time code maybe it is okay :)
@dasch I was about to open an issue for this. Currently we consume each message and store it on a buffer until we received a message to stop buffering. At that point, we process the buffer and commit the offset. Because something could happen before we received the "stop" message we want to commit only if and after the buffer was processed.
I would be happy to help on adding support to this.
@ale7714 what sort of API are you proposing?
@dasch thanks for replying. I was thinking something like:
each_batch
to be able to avoid offsets commits. process_and_commit(message)
that would mark as processed and then commit the offset.buffered_offsets
to OffsetManager, so we could get the next offset from the buffer. Hmm.
How about we expose Consumer#mark_message_as_processed
and Consumer#commit_offsets
– those should allow a user of the batch API to control when offsets are committed. We'd need an option for disabling auto-commit.
Why do you need buffered_offsets
?
@dasch Yeah that's right. Exposing those methods would be more flexible. buffered_offsets
will allow us to get next offset from the buffer instead of processed_offsets
which hasn't been updated.
I'll submit an initial PR today and we can discuss more.
Why would you need the next offset?
@dasch you're right no need for it. I was too focused on my specific case without realizing the simple approach on your previous comment is more than enough. I opened a PR of what will give us the flexibility to achieve what we need https://github.com/zendesk/ruby-kafka/pull/342
:+1:
@dasch hi :). i've been having this issue where each_batch
fetches multiple times from the same partition/offset until i process the messages. I think is related to this https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/offset_manager.rb#L44-L46, WDYT?
Yes – it seems that I was wrong when I said that the offset tracking differentiated between that which had been returned to the user and that which had been processed.
If would make sense to have a separate tracker, e.g. @next_offsets
that was updated after a message/batch had been passed to the user code.
@dasch no worries. I think also another way of approaching this is marking as processed but avoiding to commit. @next_offsets
idea is something i was thinking (like a buffer for offsets). But by reading offset_manager, It looks like i would need to change next_offset_for
and maybe clear
.
Do you think going for @next_offsets
is better than having an option to avoid to automatic commit?
I do – we already tried avoiding commits, but that turned out to be slightly messy.
Makes sense, but just checking in. I will open an PR with @next_offsets
Hi @dasch, just wanted to chime in to provide my use case where I think #seek
would be helpful. Looks like @ale7714 already has #369, but I wanted to check in to see if maybe you think there's a workaround while #seek
is being tested. I could even help test it if my use case is fitting.
I have an analytics use case, and I'm trying to enhance our Rails API to double as a Kafka consumer (messages are fed to the browser over ActionCable/websockets). We've set up topics per real-time visualization, such that a topic's log represents a graph's timeseries data, e.g a user's download rate over the last week/day/hr. I've setup consumer groups per topic and subscribed to the topics with start_from_beginning: true
.
start_from_beginning: true
works, the consumer group will have already checkpointed its progress and only get new data starting from the checkpointed offsets. Therefore, I won't be able to have a graph with buffered data.To counter this, I've thought of possibly 2 ways around this:
<user>.downloadrate.<uniq_tag>
will be unique whenever the visualization needs to be reloaded, therefore guaranteeing starting from beginning of the log.
Cons: This doesn't seem like it'll scale, and I'm not sure consumer groups are to be used this way. It also feels like I'm creating unnecessary overhead.start_from_beginning: false
and cache results for the last week. Merge cached data with latest offsets to provide the full picture.
Cons: Data duplication. I also feel like this defeats the purpose of using Kafka, since Kafka itself can be treated as a temporary store.#seek
would help in that I could use Kafka directly to seek out my data for the last week/day/hr.
@Conanza I have to say, I don't think using Kafka the way you're proposing will work out well – it's neither scalable nor fast. First of all, each consumer group comes at a cost – having a very large number of these will place undue load on the brokers. Especially since they're, as far as I can see, throw-away. Second, reading a partition from start won't be guaranteed to be as fast as reading from a more conventional data store. Kafka wasn't optimized for this use case, but rather for long-running connections. There's a fair bit of setup involved in even figuring out where a given partition is stored.
If I were you, I'd consider a hybrid approach instead: use a single, long-running consumer group to materialize the visualization data into an indexed data store that provides fast scans, e.g. Cassandra or BigTable; use a different consumer group to push data over ActionCable / websockets; when rendering a visualization, start by buffering the websocket data, then fetch e.g. the last week's worth of data from the indexed store, merge with the buffered data (in order to avoid rendering duplicates) and visualize – use the websocket data to keep the visualization up-to-date.
Done! 🎉
We should maybe have an API for manually committing offsets. I'd like to see some concrete use cases first, though.