zendesk / ruby-kafka

A Ruby client library for Apache Kafka
http://www.rubydoc.info/gems/ruby-kafka
Apache License 2.0
1.28k stars 337 forks source link

manual reconnect #135

Closed mrkamel closed 8 years ago

mrkamel commented 8 years ago

hi,

is it possible to manually reconnect? i have a consumer that "delays" messages by sleeping for a while if the message's timestamp is to "low". But after waking up i always get a "Connection Error: EOFError" during the following fetch_messages call. So, is it possible to proactively reconnect?

Thx for your help

dasch commented 8 years ago

I'm not quite sure why, but it seems like sometimes open sockets by a consumer are killed by the broker when the consumer is kicked from its group. Make sure to not sleep for longer than the session timeout – you can also increase this timeout when calling consumer:

consumer = kafka.consumer(group_id: "x", session_timeout: 45)

Note that there are broker-defined limits to the timeout value, so you may get an error if you set it too high or too low.

dasch commented 8 years ago

Actually, are you even using the new consumer code or are you calling kafka.fetch_messages directly? Note that you may have to implement your own retry logic if you want to go down this path. I would recommend using the new consumer if you're running Kafka 0.9.

mrkamel commented 8 years ago

For the use case i'm talking about, i'm using fetch_messages directly, because i need to do batch processing such that consumer.each_message isn't appropiate. Having access to the batch (fetch_batch) would be a valuable extension for me as well.

dasch commented 8 years ago

I'm considering adding a batch option to Consumer as well. Under the hood, a set of message batches are fetched and each message successively yielded in each_message. What kind of processing do you need to do?

mrkamel commented 8 years ago

... i'm indexing messages/records into ElasticSearch. Not using bulk/batch would kill performance.

And for ES <-> DB constistency, i have a second process which additionally "delays" the messages to re-check (after some time has passed) if everything got correctly indexed/or re-index. That's why i'm sleeping and getting the EOFError.

dasch commented 8 years ago

Hmm. Would it still make sense to use a balanced consumer rather than a single process, for fault tolerance? And are you tracking offsets in ES as well, or are you using idempotent writes and at-least-once processing?

I've also been thinking about how an un-coordinated, single-process consumer would work. Would this API work for you https://gist.github.com/dasch/34f40482e242abbcf896?

dasch commented 8 years ago

By the way, does it work if you rescue ConnectionError and do a retry?

mrkamel commented 8 years ago

currently, i'm fetching a batch (fetch_messages) and index this batch. If, and only if, it finishes successfully i'm then storing the current offset within zookeeper. If an exception gets raised, i rescue it, log and notify, then sleep for a few seconds and retry (so: yes, when i retry after the EOFError, ruby-kafka seems to reconnect automatically and everything continues to work again). For fault tolerance, i'm doing leader election via zookeeper myself. Sure, i'd appreciate having this implemented for me instead of doing it on my own, but i need the batch processing.

Moreover, for the "delay message for re-check" use case, i'm not quite sure if kafka's consumer group feature is even applicable, because the delaying-process won't get the same messages the indexing-process gets. Thus, i have to use the low level fetch_messages method within the delaying-process, right? So please don't remove it from the API, i need it :-)

mrkamel commented 8 years ago

well, it's applicable by using a separated delay-topic. So, i'd be pretty happy if the new consumer feature gets a batch option, such that i then can remove all my manual offset handling and leader election :-)

dasch commented 8 years ago

Regarding the first part, the indexer: I think you should use the new consumer group feature with the built-in offset commit handling. I want to expand the API to allow for batch processing. The main difficulties are:

  1. A nice API for manually committing offsets (i.e., after the ES transaction returns)
  2. Heartbeats need to be sent with a somewhat high frequency, ~30 seconds.
  3. The API should be simple to use but still expose enough information.

There are basically two ways to go: have a batch-oriented loop that sends heartbeats and commits offsets automatically after yielding a batch, or a more manual poll based API where the user must poll with some frequency in order to stay in the group, and offset commits would perhaps be triggered manually.

The first case could look like this:

consumer.each_batch do |batch|
  # Pseudo-ES-code:
  txn = ES::Transaction.new
  batch.messages.each {|message| txn << message.value }
  txn.commit
end

The second case could look like this:

# Do this in a loop.

# A "batch" is a set of messages from the same topic/partition.
batches = consumer.fetch_batches

batches.each do |batch|
  txn = ES::Transaction.new
  batch.messages.each do |message|
    txn << message.value

    # Marks the message as processed; the next offset commit will include the offset.
    consumer.mark_message_as_processed(message)
  end
  txn.commit
end

consumer.commit_offsets
dasch commented 8 years ago

Somewhat related to https://github.com/zendesk/ruby-kafka/pull/132

mrkamel commented 8 years ago

am i right that a heartbeat frequency of 30 seconds means: a crashed consumer (currently leader) gets removed after 30 seconds? if yes, then that seems quite unfrequent to me, since i'm trying to use ES for realtime purposes. 5 seconds seems reasonable to me, which afaik is zookeepers heartbeat frequency and thus corresponds to my current (manual leader election) setup.

dasch commented 8 years ago

The session timeout basically means that a consumer group member must send a heartbeat every n seconds, otherwise it gets kicked from the group, and the group needs to re-balance (which is costly in terms of time.) Setting it low increases the risk of instability; setting it high means that a consumer can be dead for a while before the group notices. In case of a high value there's also the risk of a group rebalance not being noticed by a member, causing it to keep processing a partition that the group has now re-assigned to a different member. By setting the heartbeat interval to a lower value that risk can be mitigated, but there will never by any hard guarantee that two processes won't simultaneously process the same message.

UPDATE: The heartbeat interval is not currently configurable (it's set to half the session timeout) but that could easily be changed.

dasch commented 8 years ago

Take a look at https://github.com/zendesk/ruby-kafka/pull/132 and see if that would be helpful.

mrkamel commented 8 years ago

thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature. Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?

dasch commented 8 years ago

thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature.

I'm still actively developing the consumer groups API, and I think your use case is somewhat typical, so I want to make this work for you.

Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?

Not really – the heartbeat should indicate that the consumer isn't dead or stuck. If a separate thread just continues to send heartbeats while the consumer thread is deadlocked there would be no point.

dasch commented 8 years ago

Regarding the "checker" consumer – is there no way to consistently write to ES? What kind of consistency guarantees are made, if any?

mrkamel commented 8 years ago

huh, long story. i try to keep it short. it's not the consistency of ES i'm worried about (writes to ES should be consistent and ES supports optimistic concurrency control via incrementing versions, etc). it's about achieving DB <-> ES consistency.

class Model
  after_save :queue_for_recheck
  after_destroy :queue_for_recheck
  after_commit :queue_for_immediate_index
end

Within the DB transaction (after_safe, after_destroy) i queue for recheck, because the background indexer can't already see the changes to the DB record since we're in a DB transaction and the process could crash right before after_commit. So, if it crashes, the recheck will correct the DB <-> ES inconsistency.

dasch commented 8 years ago

Hmm... that's essentially a two-phased commit, right? That's very hard to do wrong. Have you considered using something like Maxwell that is based on the SQL database binlog.

Anyway, the consumer group API should cover your use case. I think "write this batch of messages atomically to this remote store" is a common consumer use case. When writes are idempotent, it shouldn't be difficult, and performance should be fine. Combining a batch API with a configurable heartbeat interval should make this a good fit for your use case.

mrkamel commented 8 years ago

yes, have considered listening to the binlog, thx for pointing me to maxwell. re-check is still neccessary, though, because deletions are an issue, unless you do "soft deletes" (only mark a record as deleted), because a long GC pause, virtual machine pause or whatever just before an update operation could undo a delete operation: 1. leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES. So, since the binlog doesn't provide an immense benefit here, i don't want to integrate another dependency to the stack.

Thanks for your support! really appreciate it and looking forward for the consumer group batch support.

dasch commented 8 years ago

Maxwell will include deletions in its topic, as far as I know, so you'll get a serialized stream of operations – as long as you process them in-order, you should be fine.

mrkamel commented 8 years ago

in a distributed environment it's not possible to guarantee that you process them in order at all times unless the datastore you're replicating to provides features to support it (ES does not, no datastore i know of does). that's what i tried to say with:

  1. leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES
dasch commented 8 years ago

Yes and no – if your datastore has transactions, you can use optimistic locking to ensure no other agent has modified the data set since last time you saw it. Let's say your input Kafka topic is partitioned based on the ES document id, and that ES provides transactions (replace ES with Postgres for a more realistic scenario...). If the Kafka client was able to seek to a specific offset (which is certainly possible to implement) you could store the offset of the last message processed for a given partition within the data store itself. On startup, you'd query the data store; continue from that offset; and start fetching batches. When storing a batch in the store, you'd fail if the partition/offset has been changed since you did the lookup.

This of course only works for data stores that provide consistency and CAS semantics.

dasch commented 8 years ago

A concrete example could be a page view counter: all page views for a given page must of course be written to the same partition for this to work:

consumer = kafka.consumer(group_id: "page_view_counter")
consumer.subscribe("page-views")

kafka.partitions_for("page-views").each do |partition|
  last_processed_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)

  # This doesn't exist yet.
  consumer.seek("page-views", partition: partition, offset: last_processed_offset)
end

consumer.each_batch do |batch|
  partition = batch.partition
  last_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)
  new_offset = batch.messages.last.offset

  db.transaction do |txn|
    messages.each do |message|
      url = JSON.parse(message.value)
      txn.execute("UPDATE page_view_counts SET count = count + 1 WHERE url = ?", url)
    end

    txn.execute("UPDATE kafka_commits SET offset = ? WHERE partition = ? AND offset = ?", new_offset, partition, last_offset)
  end
end

If two consumers process the same partition concurrently, only one would succeed in committing the transaction.

mrkamel commented 8 years ago

right, transactions are a means to get this done. unfortunately, ES and other stores i want to replicate to don't support transactions.

dasch commented 8 years ago

Yeah :-/

mrkamel commented 8 years ago

wondering ... did you ever encounter something similar? fetching offset 73580 works, 73581 ... 73641 raises EOFError, fetching offset 73642 is working again.

irb(main):011:0> client.fetch_messages(:topic => "my-topic", :offset => 73580, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000077a0b28 ...>

irb(main):003:0> client.fetch_messages(:topic => "my-topic", :offset => 73581, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError

irb(main):004:0> client.fetch_messages(:topic => "my-topic", :offset => 73582, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError

...

irb(main):007:0> client.fetch_messages(:topic => "my-topic", :offset => 73641, :partition => 0, :max_wait_time => 10).first
Kafka::ConnectionError: Connection error: EOFError

irb(main):005:0> client.fetch_messages(:topic => "my-topic", :offset => 73642, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000072a2a40 ...>

the respective kafka logfiles:

kafka kafka 12256 Mär 15 07:54 00000000000000000000.index kafka kafka 6392949 Mär 15 07:54 00000000000000000000.log kafka kafka 10485760 Mär 15 07:54 00000000000000073581.index kafka kafka 1520909 Mär 15 09:02 00000000000000073581.log

Seems to be a kafka bug? Or am i'm doing something wrong?

dasch commented 8 years ago

@mrkamel can you try that with the latest ruby-kafka master? There was an issue regarding partial messages at the end of message sets causing EOFError (#134).

mrkamel commented 8 years ago

yeaah, that fixed it. thx alot!

dasch commented 8 years ago

Hmm, I should really cut a new release...