confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
98 stars 894 forks source link

Re-subscribe to a topic after disconnection/reconnection #59

Closed patrickviet closed 7 years ago

patrickviet commented 7 years ago

Here is my test scenario. It's limited to the bare basics to isolate my problem

I have single kafka broker. Single topic, with a single partition.

Producer creates random messages to topic patricktest Consumer reads the messages from topic patricktest

Producer

from confluent_kafka import Producer
from time import gmtime

p = Producer({'bootstrap.servers':'localhost'})
p.produce('patricktest','Kwak kwak I got data on %s' % gmtime())
p.flush()

Consumer

from confluent_kafka import Consumer, KafkaError
import sys
import time

c = Consumer({
  'bootstrap.servers':'localhost',
  'group.id':'patricklaptop',
  'default.topic.config': {'auto.offset.reset':'smallest'}
})

c.subscribe(['patricktest'])

while True:

  msg = c.poll(timeout=1.0)

  if msg is None:
    print "msg is None"
    #time.sleep(1)
    continue

  if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
      # Not an error really
      sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
    else:
      print "raising exception"
      raise KafkaException(msg.error())
  else:
    # Good stuff here.
    print('Received message: %s' % msg.value().decode('utf-8'))

  print "end of while cycle"

c.close()

So as you can see, it's pretty basic. In my test scenario, I run the consumer, and it will read the messages, then print "msg is None" quite a bit, until I produce more messages by running the producer code.

Now here is the problem: if I stop kafka while the consumer is running, then restart kafka, the consumer will reconnect BUT it won't re-subscribe to the topic. I have not found ANY WAY to know that I was disconnected. All I know is that poll returns None. That's it.

For now the workaround that I have found is to add this at the beginning of the loop.

last_resubscribe = 0
while True:
  # resub every 10sec just in case I got disconnected
  if time.time() - last_resubscribe > 10:
    print "resubscribing - just in case"
    c.subscribe(['patricktest'])
    last_resubscribe = time.time()

[...]

But sadly I also get this kind of thing every 10 seconds on kafka server:

kafka/server.out

[2016-11-04 22:26:27,851] INFO [GroupCoordinator 1]: Preparing to restabilize group patricklaptop with old generation 7 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:27,852] INFO [GroupCoordinator 1]: Stabilized group patricklaptop generation 8 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:28,114] INFO [GroupCoordinator 1]: Assignment received from leader for group patricklaptop for generation 8 (kafka.coordinator.GroupCoordinator)

Is there any way to catch the reconnect so that I can re-subscribe to a topic? Or maybe have a way to query the consumer object to know my current status? Or maybe have the client automatically resubscribe to the topics itself?

While my workaround is functional, it's pretty awful AND produces extra load/logs on the broker. I'm suspecting that it would even trigger a full rebalance and other things if I had several consumers / partitions rather than this minimal one consumer / partition / topic minimal scenario...

Thanks! --Patrick

edenhill commented 7 years ago

The underlying library should automatically reconnect and re-subscribe once the group coordinator broker comes back up, but there were some corner case bugs around this in librdkafka 0.9.1 which have been fixed.

We'll be releasing librdkafka 0.9.2 within a week, but if you want to try out the fix before that please download and build librdkafka master or the v0.9.2-RC1 tag: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2-RC1

patrickviet commented 7 years ago

So I compiled 0.9.2-RC1 and I'm getting a way worse problem. With 0.9.1, at least if I re-subscribed manually I would get the messages. This time, not only am I losing my subscription - it doesn't even re-subscribe even if I force it. Messages only start being consumed if I completely interrupt the consumer (ctrl+C) and relaunch it...

edenhill commented 7 years ago

@patrickviet And that's without the re-subscribe-every-10s thing, right?

Can you reproduce this with .. "debug": "cgrp,protocol,topic" (config property) and provide logs from when this happens?

patrickviet commented 7 years ago

Output without the re-subscribe every 10 sec, from when I restart the kafka broker

%7|1478542790.864|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542790.936|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478542790.936|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Receive failed: Disconnected
%3|1478542790.936|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Receive failed: Disconnected
%3|1478542790.936|ERROR|rdkafka#consumer-1| 1/1 brokers are down
%7|1478542790.936|RETRY|rdkafka#consumer-1| mykafkahost:9092/1: Retrying FetchRequest (v1, 70 bytes, retry 1/2)
%7|1478542790.965|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state up -> query-coord (v4)
%7|1478542790.965|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542791.068|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542791.068|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542791.068|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478542791.468|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542792.065|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542792.107|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542792.107|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542792.107|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478542792.567|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542793.149|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542793.149|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542793.149|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478542793.166|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478542793.669|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542794.191|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542794.191|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542794.191|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478542794.267|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478542794.771|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542795.231|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542795.231|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542795.231|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478542795.369|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542795.863|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1128, committed offset 1095
%7|1478542795.863|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1128 for commit
%7|1478542795.863|COMMIT|rdkafka#consumer-1| Group "patricklaptop": unable to OffsetCommit in state query-coord: coordinator (mykafkahost:9092/1) is unavailable: retrying later
%7|1478542795.965|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542796.262|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542796.262|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542796.262|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478542796.467|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542797.066|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542797.299|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478542797.299|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478542797.299|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478542797.569|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478542798.167|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478542798.334|CONNECTED|rdkafka#consumer-1| mykafkahost:9092/1: Connected (#2)
%7|1478542798.334|APIVERSION|rdkafka#consumer-1| mykafkahost:9092/1: Using (configuration fallback) 0.9.0 protocol features
%7|1478542798.335|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 24)
%7|1478542798.335|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 25)
%7|1478542798.335|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 26)
%7|1478542798.335|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 27)
%7|1478542798.671|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: intervaled in state query-coord
%7|1478542798.671|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state query-coord -> wait-coord (v4)
%7|1478542798.737|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 28)
%7|1478542798.852|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 27 bytes, CorrId 24, rtt 516.97ms)
%7|1478542798.852|METADATA|rdkafka#consumer-1| mykafkahost:9092/1: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478542798.852|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7fafa0002ff0, term 0, ref 7, remove 0)
%7|1478542798.852|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker mykafkahost:9092/1 no longer leader
%7|1478542798.852|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker :0/internal is now leader for partition with 0 messages (0 bytes) queued
%7|1478542798.852|BRKMIGR|rdkafka#consumer-1| Migrating topic patricktest [0] 0x7fafa0002ff0 from mykafkahost:9092/1 to :0/internal (sending PARTITION_LEAVE to mykafkahost:9092/1)
%7|1478542798.852|FETCHADD|rdkafka#consumer-1| mykafkahost:9092/1: Removed patricktest [0] from fetch list (0 entries, opv 2)
%7|1478542798.852|TOPBRK|rdkafka#consumer-1| mykafkahost:9092/1: Topic patricktest [0]: leaving broker (0 messages in xmitq, next leader :0/internal, rktp 0x7fafa0002ff0)
%7|1478542798.852|TOPBRK|rdkafka#consumer-1| :0/internal: Topic patricktest [0]: joining broker (rktp 0x7fafa0002ff0)
%7|1478542798.860|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 25, rtt 524.93ms)
%7|1478542798.862|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 26, rtt 527.28ms)
%7|1478542798.862|FETCHADD|rdkafka#consumer-1| :0/internal: Added patricktest [0] to fetch list (1 entries, opv 2)
%7|1478542798.865|OFFSET|rdkafka#consumer-1| OffsetCommit failed for patricktest [0] at offset 1128: Broker: Unknown topic or partition
%7|1478542798.865|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op COORD_QUERY (v0) in state wait-coord (join state started, v4)
%7|1478542798.865|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: Broker: Not coordinator for group
%7|1478542798.899|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 27, rtt 564.64ms)
%7|1478542798.900|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 29)
%7|1478542798.905|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 12 bytes, CorrId 28, rtt 167.21ms)
%7|1478542798.934|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 29, rtt 34.05ms)
%7|1478542798.967|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" GroupCoordinator response error: Broker: Group coordinator not available
%7|1478542798.967|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator 1 -> -1
%7|1478542798.967|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-coord -> wait-broker (v4)
%7|1478542798.967|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" coordinator is mykafkahost:9092 id 1
%7|1478542798.967|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator -1 -> 1
%7|1478542798.967|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker -> wait-broker-transport (v4)
%7|1478542798.967|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker-transport -> up (v4)
%7|1478542799.068|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542799.136|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 30)
%7|1478542799.164|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 30, rtt 28.60ms)
%7|1478542799.168|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op OFFSET_COMMIT (v0) in state up (join state started, v4)
%7|1478542799.168|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478542799.168|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op COORD_QUERY (v0) in state up (join state started, v4)
%7|1478542799.168|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: Broker: Not coordinator for group
%7|1478542799.265|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 31)
%7|1478542799.265|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 32)
%7|1478542799.295|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 31, rtt 30.28ms)
msg is None
%7|1478542799.325|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 32, rtt 60.26ms)
%7|1478542799.369|OFFSET|rdkafka#consumer-1| OffsetCommit failed for patricktest [0] at offset 1128: Broker: Not coordinator for group
%7|1478542799.369|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" coordinator is mykafkahost:9092 id 1
%7|1478542800.168|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542800.231|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 33)
%7|1478542800.264|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 33, rtt 32.77ms)
msg is None
%7|1478542800.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1128, committed offset 1095
%7|1478542800.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1128 for commit
%7|1478542800.864|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478542800.868|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 34)
%7|1478542800.912|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 34, rtt 43.52ms)
%7|1478542801.169|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542801.216|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 35)
%7|1478542801.245|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 35, rtt 29.31ms)
msg is None
%7|1478542802.169|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542802.253|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 36)
%7|1478542802.282|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 36, rtt 29.32ms)
msg is None
%7|1478542803.170|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542803.189|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 37)
%7|1478542803.219|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 37, rtt 30.12ms)
msg is None
%7|1478542804.270|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
msg is None
%7|1478542804.326|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 38)
%7|1478542804.355|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 38, rtt 29.06ms)
%7|1478542805.271|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
msg is None
%7|1478542805.362|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 39)
%7|1478542805.393|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 39, rtt 30.33ms)
%7|1478542805.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1128, committed offset 1128
%7|1478542805.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478542806.271|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542806.300|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 40)
msg is None
%7|1478542806.330|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 40, rtt 30.49ms)
%7|1478542807.272|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
msg is None
%7|1478542807.338|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 41)
%7|1478542807.367|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 41, rtt 29.14ms)
%7|1478542808.273|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542808.274|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 42)
%7|1478542808.304|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 42, rtt 29.91ms)
msg is None
%7|1478542809.274|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542809.315|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 43)
msg is None
%7|1478542809.344|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 43, rtt 29.41ms)
msg is None
%7|1478542810.374|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 20
%7|1478542810.451|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 44)
%7|1478542810.480|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 44, rtt 29.30ms)
%7|1478542810.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1128, committed offset 1128
%7|1478542810.864|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
patrickviet commented 7 years ago

And output with the re-subscribe because I find it interesting too

[vagrant@localhost vagrant]$ /opt/python/py276/bin/python pipkafka.py
%7|1478543086.813|MEMBERID|rdkafka#consumer-1| Group "patricklaptop": updating member id "(not-set)" -> ""
%7|1478543086.814|BRKREASSIGN|rdkafka#consumer-1| Group "patricklaptop" management reassigned from broker (none) to :0/internal
%7|1478543086.814|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state init -> wait-broker (v1)
%7|1478543086.814|BRKASSIGN|rdkafka#consumer-1| Group "patricklaptop" management assigned to broker :0/internal
%7|1478543086.845|CONNECTED|rdkafka#consumer-1| mykafkahost:9092/bootstrap: Connected (#1)
%7|1478543086.845|APIVERSION|rdkafka#consumer-1| mykafkahost:9092/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1478543086.855|SEND|rdkafka#consumer-1| mykafkahost:9092/bootstrap: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1478543086.885|RECV|rdkafka#consumer-1| mykafkahost:9092/bootstrap: Received MetadataResponse (v0, 1482 bytes, CorrId 1, rtt 29.44ms)
%7|1478543086.885|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op SUBSCRIBE (v0) in state wait-broker (join state init, v1)
%7|1478543086.885|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": subscribe to new subscription of 1 topics (join state init)
%7|1478543086.885|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current unset subscription of 0 topics (leave group=no, join state init, v1)
%7|1478543086.885|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassign done in state wait-broker (join state init): without new assignment
resubscribing - just in case
%7|1478543086.885|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op SUBSCRIBE (v0) in state wait-broker (join state init, v1)
%7|1478543086.885|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": subscribe to new subscription of 1 topics (join state init)
%7|1478543086.885|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current subscription of 1 topics (leave group=no, join state init, v1)
%7|1478543086.885|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassign done in state wait-broker (join state init): without new assignment
%7|1478543086.885|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state wait-broker
%7|1478543086.895|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 2)
%7|1478543086.925|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 2, rtt 29.18ms)
%7|1478543086.925|BRKREASSIGN|rdkafka#consumer-1| Group "patricklaptop" management reassigned from broker :0/internal to mykafkahost:9092/1
%7|1478543086.925|BRKUNASSIGN|rdkafka#consumer-1| Group "patricklaptop" management unassigned from broker handle :0/internal
%7|1478543086.925|BRKASSIGN|rdkafka#consumer-1| Group "patricklaptop" management assigned to broker mykafkahost:9092/1
%7|1478543087.025|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: intervaled in state wait-broker
%7|1478543087.126|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 3)
%7|1478543087.154|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 3, rtt 28.45ms)
%7|1478543087.226|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" coordinator is mykafkahost:9092 id 1
%7|1478543087.226|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator -1 -> 1
%7|1478543087.226|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker -> wait-broker-transport (v1)
%7|1478543087.226|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker-transport -> up (v1)
%7|1478543087.327|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543087.327|JOIN|rdkafka#consumer-1| Group "patricklaptop": postponing join until full metadata is available (current metadata age -1ms > metadata.max.age.ms 300000ms)
%7|1478543087.356|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 4)
%7|1478543087.385|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 4, rtt 29.06ms)
%7|1478543087.385|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": effective subscription list changed from 0 to 1 topic(s):
%7|1478543087.385|SUBSCRIPTION|rdkafka#consumer-1|  Topic patricktest with 1 partition(s)
%7|1478543087.385|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 1 (1) subscribed topic(s)
%7|1478543087.385|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state init -> wait-join (v1)
%7|1478543087.486|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent JoinGroupRequest (v0, 129 bytes @ 0, CorrId 5)
%7|1478543087.523|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received JoinGroupResponse (v0, 182 bytes, CorrId 5, rtt 36.24ms)
%7|1478543087.587|JOINGROUP|rdkafka#consumer-1| JoinGroup response: GenerationId 24, Protocol range, LeaderId rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871 (me), my MemberId rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871, 1 members in group: (no error)
%7|1478543087.587|MEMBERID|rdkafka#consumer-1| Group "patricklaptop": updating member id "" -> "rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871"
%7|1478543087.588|JOINGROUP|rdkafka#consumer-1| Elected leader for group "patricklaptop" with 1 member(s)
%7|1478543087.588|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-join -> wait-metadata (v1)
%7|1478543087.623|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 6)
%7|1478543087.653|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 6, rtt 29.66ms)
%7|1478543087.688|ASSIGN|rdkafka#consumer-1| Group "patricklaptop" running range assignment for 1 member(s):
%7|1478543087.688|ASSIGN|rdkafka#consumer-1|  Member "rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871" (me) with 1 subscription(s):
%7|1478543087.688|ASSIGN|rdkafka#consumer-1|   patricktest [-1]
%7|1478543087.688|ASSIGN|rdkafka#consumer-1| range: Topic patricktest with 1 partition(s) and 1 subscribing member(s)
%7|1478543087.688|ASSIGN|rdkafka#consumer-1| range: Member "rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871": assigned topic patricktest partitions 0..0
%7|1478543087.688|ASSIGN|rdkafka#consumer-1| Group "patricklaptop" range assignment for 1 member(s) finished in 0.077ms:
%7|1478543087.688|ASSIGN|rdkafka#consumer-1|  Member "rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871" (me) assigned 1 partition(s):
%7|1478543087.688|ASSIGN|rdkafka#consumer-1|   patricktest [0]
%7|1478543087.688|ASSIGNOR|rdkafka#consumer-1| Group "patricklaptop": "range" assignor run for 1 member(s)
%7|1478543087.688|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-metadata -> wait-sync (v1)
%7|1478543087.754|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent SyncGroupRequest (v0, 171 bytes @ 0, CorrId 7)
%7|1478543087.793|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received SyncGroupResponse (v0, 37 bytes, CorrId 7, rtt 38.82ms)
%7|1478543087.814|SYNCGROUP|rdkafka#consumer-1| SyncGroup response: Success (31 bytes of MemberState data)
%7|1478543087.814|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": delegating assign of 1 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1478543087.814|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-sync -> wait-assign-rebalance_cb (v1)
%7|1478543087.814|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 24
%7|1478543087.814|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1)
%7|1478543087.814|TOPIC|rdkafka#consumer-1| New local topic: patricktest
%7|1478543087.814|TOPPARNEW|rdkafka#consumer-1| NEW patricktest [-1] 0x7f27e4002c30 (at rd_kafka_topic_new0:261)
%7|1478543087.814|TOPPARNEW|rdkafka#consumer-1| NEW patricktest [0] 0x7f27e4002ff0 (at rd_kafka_toppar_desired_add:515)
%7|1478543087.814|DESP|rdkafka#consumer-1| Adding desired topic patricktest [0]
%7|1478543087.814|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_assign:1482: new version barrier v2
%7|1478543087.814|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": assigning 1 partition(s) in join state wait-assign-rebalance_cb
%7|1478543087.814|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-assign-rebalance_cb -> assigned (v2)
%7|1478543087.814|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_partitions_fetch_start0:989: new version barrier v3
%7|1478543087.814|FETCHSTART|rdkafka#consumer-1| Group "patricklaptop": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 1506)
%7|1478543087.814|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1478543087.814|FETCHSTART|rdkafka#consumer-1|  patricktest [0] offset INVALID
%7|1478543087.814|OFFSET|rdkafka#consumer-1| Fetch 1 offsets with v3
%7|1478543087.814|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetFetchRequest(v1) for 1/1 partition(s)
msg is None
%7|1478543087.893|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 8)
%7|1478543087.893|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 9)
%7|1478543087.893|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetFetchRequest (v1, 61 bytes @ 0, CorrId 10)
%7|1478543087.923|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 103 bytes, CorrId 8, rtt 29.28ms)
%7|1478543087.923|STATE|rdkafka#consumer-1| Topic patricktest changed state unknown -> exists
%7|1478543087.923|PARTCNT|rdkafka#consumer-1| Topic patricktest partition count changed from 0 to 1
%7|1478543087.923|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker mykafkahost:9092/1 (rktp 0x7f27e4002ff0, term 0, ref 4, remove 0)
%7|1478543087.923|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker mykafkahost:9092/1 is now leader for partition with 0 messages (0 bytes) queued
%7|1478543087.923|BRKMIGR|rdkafka#consumer-1| Migrating topic patricktest [0] 0x7f27e4002ff0 from (none) to mykafkahost:9092/1 (sending PARTITION_JOIN to mykafkahost:9092/1)
%7|1478543087.923|METADATA|rdkafka#consumer-1| mykafkahost:9092/1: Requested topic patricktest seen in metadata
%7|1478543087.923|TOPBRK|rdkafka#consumer-1| mykafkahost:9092/1: Topic patricktest [0]: joining broker (rktp 0x7f27e4002ff0)
%7|1478543087.953|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 9, rtt 59.48ms)
%7|1478543087.956|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetFetchResponse (v1, 37 bytes, CorrId 10, rtt 62.86ms)
%7|1478543088.023|OFFSETFETCH|rdkafka#consumer-1| List with 1 partition(s):
%7|1478543088.023|OFFSETFETCH|rdkafka#consumer-1|  patricktest [0] offset INVALID
%7|1478543088.023|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting default offset INVALID
%7|1478543088.023|OFFSETFETCH|rdkafka#consumer-1| mykafkahost:9092/1: OffsetFetchResponse: patricktest [0] offset 1165
%7|1478543088.023|OFFFETCH|rdkafka#consumer-1| mykafkahost:9092/1: OffsetFetch for 1/1 partition(s) returned Success
%7|1478543088.023|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_partitions_fetch_start0:989: new version barrier v4
%7|1478543088.023|FETCHSTART|rdkafka#consumer-1| Group "patricklaptop": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=yes, v4, line 918)
%7|1478543088.023|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1478543088.023|FETCHSTART|rdkafka#consumer-1|  patricktest [0] offset 1165
%7|1478543088.023|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state assigned -> started (v4)
%7|1478543088.023|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_fetch_start:1935: new version barrier v2
%7|1478543088.023|CONSUMER|rdkafka#consumer-1| Start consuming patricktest [0] at offset 1165 (v2)
%7|1478543088.023|OP|rdkafka#consumer-1| patricktest [0] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1478543088.023|FETCH|rdkafka#consumer-1| Start fetch for patricktest [0] in state none at offset 1165 (v2)
%7|1478543088.023|PARTSTATE|rdkafka#consumer-1| Partition patricktest [0] changed fetch state none -> active
%7|1478543088.023|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op PARTITION_JOIN in state up (join state started, v4) for patricktest [0]
%7|1478543088.023|PARTADD|rdkafka#consumer-1| Group "patricklaptop": add patricktest [0]
%7|1478543088.057|FETCHDEC|rdkafka#consumer-1| Topic patricktest [0]: fetch decide: updating to version 2 (was 0) at offset 1165 (was 0)
%7|1478543088.057|FETCHADD|rdkafka#consumer-1| mykafkahost:9092/1: Added patricktest [0] to fetch list (1 entries, opv 2)
%7|1478543088.057|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 11)
%7|1478543088.130|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 574 bytes, CorrId 11, rtt 73.09ms)
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=38, tm_sec=36, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=38, tm_sec=47, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=38, tm_sec=52, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=38, tm_sec=58, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=8, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=13, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=19, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=24, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=29, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=35, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=40, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=45, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=50, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=39, tm_sec=56, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=40, tm_sec=1, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=18, tm_min=40, tm_sec=6, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
%7|1478543088.131|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 12)
%7|1478543088.269|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 12, rtt 137.95ms)
% patricktest [0] reached end at offset 1181
end of while cycle
%7|1478543088.774|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 13)
%7|1478543088.814|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 24
%7|1478543088.875|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 14)
%7|1478543088.907|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 13, rtt 133.24ms)
%7|1478543088.907|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 15)
%7|1478543088.908|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 14, rtt 33.71ms)
%7|1478543089.037|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 15, rtt 129.51ms)
%7|1478543089.037|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 16)
%7|1478543089.168|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 16, rtt 130.91ms)
%7|1478543089.168|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 17)
msg is None
%7|1478543089.298|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 17, rtt 130.08ms)
%7|1478543089.298|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 18)
%7|1478543089.428|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 18, rtt 130.16ms)
%7|1478543089.429|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 19)
%7|1478543089.560|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 19, rtt 131.85ms)
%7|1478543089.561|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 20)
%7|1478543089.690|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 20, rtt 129.45ms)
%7|1478543089.690|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 21)
%7|1478543089.814|HEARTBEAT|rdkafka#consumer-1| mykafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 24
%7|1478543089.820|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 21, rtt 129.88ms)
%7|1478543089.820|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 22)
%7|1478543089.820|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 23)
%7|1478543089.951|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 22, rtt 130.18ms)
%7|1478543089.951|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 24)
%7|1478543089.952|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 23, rtt 131.60ms)
%7|1478543090.037|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478543090.037|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Receive failed: Disconnected
%3|1478543090.037|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Receive failed: Disconnected
%3|1478543090.037|ERROR|rdkafka#consumer-1| 1/1 brokers are down
%7|1478543090.037|RETRY|rdkafka#consumer-1| mykafkahost:9092/1: Retrying FetchRequest (v1, 70 bytes, retry 1/2)
%7|1478543090.116|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state up -> query-coord (v4)
%7|1478543090.116|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543090.169|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543090.169|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543090.169|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478543090.619|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543091.208|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543091.208|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543091.208|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478543091.216|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478543091.718|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543091.813|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1181, committed offset 1165
%7|1478543091.813|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1181 for commit
%7|1478543091.813|COMMIT|rdkafka#consumer-1| Group "patricklaptop": unable to OffsetCommit in state query-coord: coordinator (mykafkahost:9092/1) is unavailable: retrying later
%7|1478543092.250|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543092.251|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543092.251|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
msg is None
%7|1478543092.317|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543092.915|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478543093.293|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543093.293|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543093.293|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478543093.417|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543094.015|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478543094.336|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543094.336|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543094.336|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478543094.517|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543095.116|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478543095.371|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543095.371|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543095.371|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478543095.619|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543096.218|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478543096.406|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478543096.406|FAIL|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478543096.406|ERROR|rdkafka#consumer-1| mykafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478543096.721|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543096.814|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1181, committed offset 1165
%7|1478543096.814|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1181 for commit
%7|1478543096.814|COMMIT|rdkafka#consumer-1| Group "patricklaptop": unable to OffsetCommit in state query-coord: coordinator (mykafkahost:9092/1) is unavailable: retrying later
msg is None
resubscribing - just in case
%7|1478543097.318|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op SUBSCRIBE (v0) in state query-coord (join state started, v4)
%7|1478543097.318|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": subscribe to new subscription of 1 topics (join state started)
%7|1478543097.318|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current subscription of 1 topics (leave group=no, join state started, v4)
%7|1478543097.318|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": clearing subscribed topics list (1)
%7|1478543097.318|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": effective subscription list changed from 1 to 0 topic(s):
%7|1478543097.318|PAUSE|rdkafka#consumer-1| Library pausing 1 partition(s)
%7|1478543097.318|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_pause_resume:2020: new version barrier v3
%7|1478543097.318|PAUSE|rdkafka#consumer-1| Pause patricktest [0] (v3)
%7|1478543097.318|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": delegating revoke of 1 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: unsubscribe
%7|1478543097.318|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state started -> wait-revoke-rebalance_cb (v4)
%7|1478543097.318|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478543097.318|OP|rdkafka#consumer-1| patricktest [0] received op PAUSE (v3) in fetch-state active (opv2)
%7|1478543097.318|PAUSE|rdkafka#consumer-1| Pause patricktest [0]: at offset 1181 (state active, v3)
%7|1478543097.318|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op ASSIGN (v0) in state query-coord (join state wait-revoke-rebalance_cb, v4)
%7|1478543097.318|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_assign:1482: new version barrier v5
%7|1478543097.318|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_unassign:1399: new version barrier v6
%7|1478543097.318|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassigning 1 partition(s) (v6)
%7|1478543097.318|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-revoke-rebalance_cb -> wait-unassign (v6)
%7|1478543097.318|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1181, committed offset 1165
%7|1478543097.318|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1181 for commit
%7|1478543097.318|COMMIT|rdkafka#consumer-1| Group "patricklaptop": unable to OffsetCommit in state query-coord: coordinator (mykafkahost:9092/1) is unavailable: retrying later
%7|1478543097.318|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_fetch_stop:1962: new version barrier v4
%7|1478543097.318|CONSUMER|rdkafka#consumer-1| Stop consuming patricktest [0] (v4)
%7|1478543097.318|DESP|rdkafka#consumer-1| Removing (un)desired topic patricktest [0]
%7|1478543097.318|RESUME|rdkafka#consumer-1| Library resuming 1 partition(s)
%7|1478543097.318|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_pause_resume:2020: new version barrier v5
%7|1478543097.318|RESUME|rdkafka#consumer-1| Resume patricktest [0] (v5)
%7|1478543097.318|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": assigning 0 partition(s) in join state wait-unassign
%7|1478543097.318|OP|rdkafka#consumer-1| patricktest [0] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1478543097.318|FETCH|rdkafka#consumer-1| Stopping fetch for patricktest [0] in state active (v4)
%7|1478543097.318|PARTSTATE|rdkafka#consumer-1| Partition patricktest [0] changed fetch state active -> stopping
%7|1478543097.318|STORETERM|rdkafka#consumer-1| patricktest [0]: offset store terminating
%7|1478543097.318|PARTSTATE|rdkafka#consumer-1| Partition patricktest [0] changed fetch state stopping -> stopped
%7|1478543097.318|OP|rdkafka#consumer-1| patricktest [0] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1478543097.318|RESUME|rdkafka#consumer-1| Not resuming stopped patricktest [0]: at offset 1181 (state stopped, v5)
%7|1478543097.318|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op PARTITION_LEAVE in state query-coord (join state wait-unassign, v6) for patricktest [0]
%7|1478543097.318|PARTDEL|rdkafka#consumer-1| Group "patricklaptop": delete patricktest [0]
%7|1478543097.318|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op REPLY:FETCH_STOP in state query-coord (join state wait-unassign, v6) for patricktest [0]
%7|1478543097.320|FETCHADD|rdkafka#consumer-1| mykafkahost:9092/1: Removed patricktest [0] from fetch list (0 entries, opv 2)
%7|1478543097.450|CONNECTED|rdkafka#consumer-1| mykafkahost:9092/1: Connected (#2)
%7|1478543097.450|APIVERSION|rdkafka#consumer-1| mykafkahost:9092/1: Using (configuration fallback) 0.9.0 protocol features
%7|1478543097.450|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 25)
%7|1478543097.450|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 26)
%7|1478543097.916|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: intervaled in state query-coord
%7|1478543097.917|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state query-coord -> wait-coord (v6)
%7|1478543097.934|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received MetadataResponse (v0, 27 bytes, CorrId 25, rtt 484.13ms)
%7|1478543097.934|METADATA|rdkafka#consumer-1| mykafkahost:9092/1: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478543097.934|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7f27e4002ff0, term 0, ref 6, remove 0)
%7|1478543097.934|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker mykafkahost:9092/1 no longer leader
%7|1478543097.934|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker :0/internal is now leader for partition with 0 messages (0 bytes) queued
%7|1478543097.934|BRKMIGR|rdkafka#consumer-1| Migrating topic patricktest [0] 0x7f27e4002ff0 from mykafkahost:9092/1 to :0/internal (sending PARTITION_LEAVE to mykafkahost:9092/1)
%7|1478543097.935|TOPBRK|rdkafka#consumer-1| mykafkahost:9092/1: Topic patricktest [0]: leaving broker (0 messages in xmitq, next leader :0/internal, rktp 0x7f27e4002ff0)
%7|1478543097.935|TOPBRK|rdkafka#consumer-1| :0/internal: Topic patricktest [0]: joining broker (rktp 0x7f27e4002ff0)
%7|1478543097.935|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 27)
%7|1478543097.972|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 26, rtt 522.10ms)
%7|1478543097.979|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 12 bytes, CorrId 27, rtt 43.92ms)
%7|1478543098.035|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" GroupCoordinator response error: Broker: Group coordinator not available
%7|1478543098.035|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator 1 -> -1
%7|1478543098.035|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-coord -> wait-broker (v6)
msg is None
%7|1478543099.017|CGRPQUERY|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop": querying for coordinator: intervaled in state wait-broker
%7|1478543099.086|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 28)
%7|1478543099.127|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 28, rtt 40.70ms)
%7|1478543099.219|CGRPCOORD|rdkafka#consumer-1| mykafkahost:9092/1: Group "patricklaptop" coordinator is mykafkahost:9092 id 1
%7|1478543099.219|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator -1 -> 1
%7|1478543099.219|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker -> wait-broker-transport (v6)
%7|1478543099.219|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker-transport -> up (v6)
msg is None
%7|1478543099.419|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op OFFSET_COMMIT (v0) in state up (join state wait-unassign, v6)
%7|1478543099.419|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478543099.419|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op OFFSET_COMMIT (v0) in state up (join state wait-unassign, v6)
%7|1478543099.419|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478543099.419|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op OFFSET_COMMIT (v0) in state up (join state wait-unassign, v6)
%7|1478543099.419|OFFSET|rdkafka#consumer-1| mykafkahost:9092/1: OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478543099.429|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 29)
%7|1478543099.429|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 30)
%7|1478543099.429|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 31)
%7|1478543099.486|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 29, rtt 57.31ms)
%7|1478543099.516|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 30, rtt 86.93ms)
%7|1478543099.517|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 31, rtt 87.57ms)
%7|1478543099.520|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassign done in state up (join state wait-unassign): without new assignment
%7|1478543099.520|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-unassign -> init (v6)
%7|1478543099.520|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543099.520|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 11831ms old metadata: next metadata refresh in 288169ms
msg is None
%7|1478543100.522|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543100.522|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 12833ms old metadata: next metadata refresh in 287167ms
msg is None
%7|1478543101.621|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543101.621|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 13932ms old metadata: next metadata refresh in 286068ms
msg is None
%7|1478543102.621|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543102.621|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 14933ms old metadata: next metadata refresh in 285067ms
msg is None
%7|1478543103.721|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543103.721|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 16033ms old metadata: next metadata refresh in 283967ms
msg is None
%7|1478543104.722|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543104.722|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 17034ms old metadata: next metadata refresh in 282966ms
msg is None
^C%7|1478543105.816|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478543105.816|JOIN|rdkafka#consumer-1| Group "patricklaptop": no matching topics based on 18128ms old metadata: next metadata refresh in 281872ms
Traceback (most recent call last):
  File "pipkafka.py", line 22, in <module>
    msg = c.poll(timeout=1.0)
KeyboardInterrupt
%7|1478543106.322|DESTROY|rdkafka#consumer-1| Terminating instance
%7|1478543106.420|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op TERMINATE (v0) in state up (join state init, v6)
%7|1478543106.420|CGRPTERM|rdkafka#consumer-1| Terminating group "patricklaptop" in state up with 0 partition(s)
%7|1478543106.420|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current subscription of 1 topics (leave group=yes, join state init, v6)
%7|1478543106.420|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassign done in state up (join state init): without new assignment
%7|1478543106.420|LEAVE|rdkafka#consumer-1| Group "patricklaptop": leave
%7|1478543106.420|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state up -> term (v6)
%7|1478543106.420|BRKUNASSIGN|rdkafka#consumer-1| Group "patricklaptop" management unassigned from broker handle mykafkahost:9092/1
%7|1478543106.420|DESTROY|rdkafka#consumer-1| Destroy internal
%7|1478543106.420|DESTROY|rdkafka#consumer-1| Remove all topics
%7|1478543106.420|PARTCNT|rdkafka#consumer-1| Topic patricktest partition count changed from 1 to 0
%7|1478543106.420|REMOVE|rdkafka#consumer-1| patricktest [0] no longer reported in metadata
%7|1478543106.420|BRKMIGR|rdkafka#consumer-1| patricktest [0] 0x7f27e4002ff0 sending final LEAVE for removal by :0/internal
%7|1478543106.420|TOPPARREMOVE|rdkafka#consumer-1| Removing toppar patricktest [-1] 0x7f27e4002c30
%7|1478543106.420|DESTROY|rdkafka#consumer-1| patricktest [-1]: 0x7f27e4002c30 DESTROY_FINAL
%7|1478543106.420|TOPBRK|rdkafka#consumer-1| :0/internal: Topic patricktest [0]: leaving broker (0 messages in xmitq, next leader (none), rktp 0x7f27e4002ff0)
%7|1478543106.420|TOPBRK|rdkafka#consumer-1| :0/internal: Topic patricktest [0]: no next leader, failing 0 message(s) in partition queue
%7|1478543106.420|TOPPARREMOVE|rdkafka#consumer-1| Removing toppar patricktest [0] 0x7f27e4002ff0
%7|1478543106.420|DESTROY|rdkafka#consumer-1| patricktest [0]: 0x7f27e4002ff0 DESTROY_FINAL
%7|1478543106.431|BROKERFAIL|rdkafka#consumer-1| :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success)
%7|1478543106.467|SEND|rdkafka#consumer-1| mykafkahost:9092/1: Sent LeaveGroupRequest (v0, 82 bytes @ 0, CorrId 32)
%7|1478543106.508|RECV|rdkafka#consumer-1| mykafkahost:9092/1: Received LeaveGroupResponse (v0, 2 bytes, CorrId 32, rtt 41.38ms)
%7|1478543106.508|BROKERFAIL|rdkafka#consumer-1| mykafkahost:9092/1: failed: err: Local: Broker handle destroyed: (errno: Resource temporarily unavailable)
%7|1478543106.509|MEMBERID|rdkafka#consumer-1| Group "patricklaptop": updating member id "rdkafka-5f6040fc-655a-49b8-9eb6-15aaf0d30871" -> "(not-set)"
patrickviet commented 7 years ago

For the second one, the restart of the kafka broken happens right in the middle, you can see the lost connection. My producer is a simple loop which publishes the 'kwak' message every 5 sec.

edenhill commented 7 years ago

Thank you, one last request; can you try latest master (instead of 0.9.2-RC1)? (and repro without the re-subscribe)

edenhill commented 7 years ago

When reproducing, do it with debug "cgrp,topic,protocol,broker" Thanks

patrickviet commented 7 years ago

Code

from confluent_kafka import Consumer, KafkaError
import sys
import time

c = Consumer({
  'bootstrap.servers':'kafkahost',
  'group.id':'patricklaptop',
  'default.topic.config': {'auto.offset.reset':'smallest'},
  'debug':'cgrp,protocol,topic,broker',
})

c.subscribe(['patricktest'])

last_resubscribe = 0
while True:

  #if time.time() - last_resubscribe > 10:
  #  print "resubscribing - just in case"
  #  c.subscribe(['patricktest'])
  #  last_resubscribe = time.time()

  msg = c.poll(timeout=1.0)

  if msg is None:
    print "msg is None"
    #time.sleep(1)
    continue

  if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
      # Not an error really
      sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
    else:
      print "raising exception"
      raise KafkaException(msg.error())
  else:
    # Good stuff here.
    print('Received message: %s' % msg.value().decode('utf-8'))

  print "end of while cycle"

c.close()

full output

[vagrant@localhost vagrant]$ sudo /opt/python/py276/bin/python pipkafka.py
%7|1478548253.432|MEMBERID|rdkafka#consumer-1| Group "patricklaptop": updating member id "(not-set)" -> ""
%7|1478548253.432|BROKER|rdkafka#consumer-1| kafkahost:9092/bootstrap: Added new broker with NodeId -1
%7|1478548253.432|BRKMAIN|rdkafka#consumer-1| kafkahost:9092/bootstrap: Enter main broker thread
%7|1478548253.432|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state INIT connecting
%7|1478548253.433|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548253.433|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1478548253.433|BRKMAIN|rdkafka#consumer-1| :0/internal: Enter main broker thread
%7|1478548253.433|STATE|rdkafka#consumer-1| :0/internal: Broker changed state INIT -> UP
%7|1478548253.433|BRKREASSIGN|rdkafka#consumer-1| Group "patricklaptop" management reassigned from broker (none) to :0/internal
%7|1478548253.433|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state init -> wait-broker (v1, join-state init)
%7|1478548253.433|BRKASSIGN|rdkafka#consumer-1| Group "patricklaptop" management assigned to broker :0/internal
%7|1478548253.461|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connected to ipv4#10.28.100.74:9092
%7|1478548253.461|CONNECTED|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connected (#1)
%7|1478548253.461|APIVERSION|rdkafka#consumer-1| kafkahost:9092/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1478548253.461|FEATURE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Updated enabled protocol features to BrokerBalancedConsumer,ThrottleTime,Sasl,BrokerGroupCoordinator,LZ4
%7|1478548253.461|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> UP
%7|1478548253.472|SEND|rdkafka#consumer-1| kafkahost:9092/bootstrap: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1478548253.502|RECV|rdkafka#consumer-1| kafkahost:9092/bootstrap: Received MetadataResponse (v0, 1482 bytes, CorrId 1, rtt 29.58ms)
%7|1478548253.502|BROKER|rdkafka#consumer-1| kafkahost:9092/1: Added new broker with NodeId 1
%7|1478548253.502|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op SUBSCRIBE (v0) in state wait-broker (join state init, v1 vs 0)
%7|1478548253.502|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": subscribe to new subscription of 1 topics (join state init)
%7|1478548253.502|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current unset subscription of 0 topics (leave group=no, join state init, v1)
%7|1478548253.502|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state init -> wait-unassign (v1, state wait-broker)
%7|1478548253.502|UNASSIGN|rdkafka#consumer-1| Group "patricklaptop": unassign done in state wait-broker (join state wait-unassign): without new assignment: unassign (no previous assignment)
%7|1478548253.502|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-unassign -> init (v1, state wait-broker)
%7|1478548253.502|BRKMAIN|rdkafka#consumer-1| kafkahost:9092/1: Enter main broker thread
%7|1478548253.502|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state INIT connecting
%7|1478548253.503|BRKREASSIGN|rdkafka#consumer-1| Group "patricklaptop" management reassigned from broker :0/internal to kafkahost:9092/bootstrap
%7|1478548253.503|BRKUNASSIGN|rdkafka#consumer-1| Group "patricklaptop" management unassigned from broker handle :0/internal
%7|1478548253.503|BRKASSIGN|rdkafka#consumer-1| Group "patricklaptop" management assigned to broker kafkahost:9092/bootstrap
%7|1478548253.506|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548253.506|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state INIT -> CONNECT
%7|1478548253.534|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connected to ipv4#10.28.100.74:9092
%7|1478548253.534|CONNECTED|rdkafka#consumer-1| kafkahost:9092/1: Connected (#1)
%7|1478548253.534|APIVERSION|rdkafka#consumer-1| kafkahost:9092/1: Using (configuration fallback) 0.9.0 protocol features
%7|1478548253.534|FEATURE|rdkafka#consumer-1| kafkahost:9092/1: Updated enabled protocol features to BrokerBalancedConsumer,ThrottleTime,Sasl,BrokerGroupCoordinator,LZ4
%7|1478548253.534|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> UP
%7|1478548253.534|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1478548253.564|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 1, rtt 29.28ms)
%7|1478548253.564|CGRPQUERY|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop": querying for coordinator: intervaled in state wait-broker
%7|1478548253.623|SEND|rdkafka#consumer-1| kafkahost:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 2)
%7|1478548253.653|RECV|rdkafka#consumer-1| kafkahost:9092/bootstrap: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 2, rtt 29.24ms)
%7|1478548253.664|CGRPCOORD|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop" coordinator is kafkahost:9092 id 1
%7|1478548253.664|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator -1 -> 1
%7|1478548253.664|BRKREASSIGN|rdkafka#consumer-1| Group "patricklaptop" management reassigned from broker kafkahost:9092/bootstrap to kafkahost:9092/1
%7|1478548253.664|BRKUNASSIGN|rdkafka#consumer-1| Group "patricklaptop" management unassigned from broker handle kafkahost:9092/bootstrap
%7|1478548253.664|BRKASSIGN|rdkafka#consumer-1| Group "patricklaptop" management assigned to broker kafkahost:9092/1
%7|1478548253.664|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker -> wait-broker-transport (v1, join-state init)
%7|1478548253.664|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker-transport -> up (v1, join-state init)
%7|1478548253.765|CGRPQUERY|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop": querying for coordinator: intervaled in state up
%7|1478548253.765|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 0 (1) subscribed topic(s)
%7|1478548253.765|JOIN|rdkafka#consumer-1| Group "patricklaptop": postponing join until full metadata is available (current metadata age -1ms > metadata.max.age.ms 300000ms)
%7|1478548253.775|SEND|rdkafka#consumer-1| kafkahost:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 3)
%7|1478548253.804|RECV|rdkafka#consumer-1| kafkahost:9092/bootstrap: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 3, rtt 28.69ms)
%7|1478548253.866|CGRPCOORD|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop" coordinator is kafkahost:9092 id 1
%7|1478548253.866|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 2)
%7|1478548253.895|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 2, rtt 29.13ms)
%7|1478548253.895|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": effective subscription list changed from 0 to 1 topic(s):
%7|1478548253.895|SUBSCRIPTION|rdkafka#consumer-1|  Topic patricktest with 1 partition(s)
%7|1478548253.895|JOIN|rdkafka#consumer-1| Group "patricklaptop": join with 1 (1) subscribed topic(s)
%7|1478548253.895|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state init -> wait-join (v1, state up)
%7|1478548253.895|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent JoinGroupRequest (v0, 129 bytes @ 0, CorrId 3)
%7|1478548253.932|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received JoinGroupResponse (v0, 182 bytes, CorrId 3, rtt 36.79ms)
%7|1478548253.996|JOINGROUP|rdkafka#consumer-1| JoinGroup response: GenerationId 42, Protocol range, LeaderId rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b (me), my MemberId rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b, 1 members in group: (no error)
%7|1478548253.996|MEMBERID|rdkafka#consumer-1| Group "patricklaptop": updating member id "" -> "rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b"
%7|1478548253.996|JOINGROUP|rdkafka#consumer-1| Elected leader for group "patricklaptop" with 1 member(s)
%7|1478548253.996|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-join -> wait-metadata (v1, state up)
%7|1478548254.033|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 25 bytes @ 0, CorrId 4)
%7|1478548254.062|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 1482 bytes, CorrId 4, rtt 29.49ms)
%7|1478548254.096|ASSIGN|rdkafka#consumer-1| Group "patricklaptop" running range assignment for 1 member(s):
%7|1478548254.096|ASSIGN|rdkafka#consumer-1|  Member "rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b" (me) with 1 subscription(s):
%7|1478548254.096|ASSIGN|rdkafka#consumer-1|   patricktest [-1]
%7|1478548254.096|ASSIGN|rdkafka#consumer-1| range: Topic patricktest with 1 partition(s) and 1 subscribing member(s)
%7|1478548254.096|ASSIGN|rdkafka#consumer-1| range: Member "rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b": assigned topic patricktest partitions 0..0
%7|1478548254.096|ASSIGN|rdkafka#consumer-1| Group "patricklaptop" range assignment for 1 member(s) finished in 0.049ms:
%7|1478548254.096|ASSIGN|rdkafka#consumer-1|  Member "rdkafka-68a7c69f-4b0d-46c4-a3c6-956c0818871b" (me) assigned 1 partition(s):
%7|1478548254.096|ASSIGN|rdkafka#consumer-1|   patricktest [0]
%7|1478548254.096|ASSIGNOR|rdkafka#consumer-1| Group "patricklaptop": "range" assignor run for 1 member(s)
%7|1478548254.096|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-metadata -> wait-sync (v1, state up)
%7|1478548254.163|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent SyncGroupRequest (v0, 171 bytes @ 0, CorrId 5)
%7|1478548254.197|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received SyncGroupResponse (v0, 37 bytes, CorrId 5, rtt 34.14ms)
%7|1478548254.298|SYNCGROUP|rdkafka#consumer-1| SyncGroup response: Success (31 bytes of MemberState data)
%7|1478548254.298|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": delegating assign of 1 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1478548254.298|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)
%7|1478548254.298|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548254.298|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 6)
%7|1478548254.327|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 6, rtt 29.05ms)
%7|1478548254.399|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
%7|1478548254.399|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": new assignment of 1 partition(s) in join state wait-assign-rebalance_cb
%7|1478548254.399|TOPIC|rdkafka#consumer-1| New local topic: patricktest
%7|1478548254.399|TOPPARNEW|rdkafka#consumer-1| NEW patricktest [-1] 0x7fe384003c90 (at rd_kafka_topic_new0:261)
%7|1478548254.399|TOPPARNEW|rdkafka#consumer-1| NEW patricktest [0] 0x7fe384004050 (at rd_kafka_toppar_desired_add:534)
%7|1478548254.399|DESIRED|rdkafka#consumer-1| patricktest [0]: adding to DESIRED list
%7|1478548254.399|DESP|rdkafka#consumer-1| Adding desired topic patricktest [0]
%7|1478548254.399|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_assign:1549: new version barrier v2
%7|1478548254.399|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": assigning 1 partition(s) in join state wait-assign-rebalance_cb
%7|1478548254.399|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
%7|1478548254.399|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_partitions_fetch_start0:1011: new version barrier v3
%7|1478548254.399|FETCHSTART|rdkafka#consumer-1| Group "patricklaptop": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 1588)
%7|1478548254.399|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1478548254.399|FETCHSTART|rdkafka#consumer-1|  patricktest [0] offset INVALID
%7|1478548254.399|OFFSET|rdkafka#consumer-1| Fetch 1 offsets with v3
%7|1478548254.399|OFFSET|rdkafka#consumer-1| kafkahost:9092/1: OffsetFetchRequest(v1) for 1/1 partition(s)
%7|1478548254.428|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent OffsetFetchRequest (v1, 61 bytes @ 0, CorrId 7)
%7|1478548254.462|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received OffsetFetchResponse (v1, 37 bytes, CorrId 7, rtt 33.22ms)
%7|1478548254.462|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 8)
%7|1478548254.490|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 103 bytes, CorrId 8, rtt 28.42ms)
%7|1478548254.490|STATE|rdkafka#consumer-1| Topic patricktest changed state unknown -> exists
%7|1478548254.490|PARTCNT|rdkafka#consumer-1| Topic patricktest partition count changed from 0 to 1
%7|1478548254.490|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker kafkahost:9092/1 (rktp 0x7fe384004050, term 0, ref 4, remove 0)
%7|1478548254.490|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker kafkahost:9092/1 is now leader for partition with 0 messages (0 bytes) queued
%7|1478548254.490|BRKMIGR|rdkafka#consumer-1| Migrating topic patricktest [0] 0x7fe384004050 from (none) to kafkahost:9092/1 (sending PARTITION_JOIN to kafkahost:9092/1)
%7|1478548254.490|METADATA|rdkafka#consumer-1| kafkahost:9092/1: Requested topic patricktest seen in metadata
%7|1478548254.490|OFFSETFETCH|rdkafka#consumer-1| List with 1 partition(s):
%7|1478548254.490|OFFSETFETCH|rdkafka#consumer-1|  patricktest [0] offset INVALID
%7|1478548254.490|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting default offset INVALID
%7|1478548254.490|OFFSETFETCH|rdkafka#consumer-1| kafkahost:9092/1: OffsetFetchResponse: patricktest [0] offset 1908
%7|1478548254.490|OFFFETCH|rdkafka#consumer-1| kafkahost:9092/1: OffsetFetch for 1/1 partition(s) returned Success
%7|1478548254.490|BARRIER|rdkafka#consumer-1| Group "patricklaptop": rd_kafka_cgrp_partitions_fetch_start0:1011: new version barrier v4
%7|1478548254.490|FETCHSTART|rdkafka#consumer-1| Group "patricklaptop": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=yes, v4, line 940)
%7|1478548254.490|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1478548254.490|FETCHSTART|rdkafka#consumer-1|  patricktest [0] offset 1908
%7|1478548254.490|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state assigned -> started (v4, state up)
%7|1478548254.490|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_fetch_start:1922: new version barrier v2
%7|1478548254.490|CONSUMER|rdkafka#consumer-1| Start consuming patricktest [0] at offset 1908 (v2)
%7|1478548254.490|TOPBRK|rdkafka#consumer-1| kafkahost:9092/1: Topic patricktest [0]: joining broker (rktp 0x7fe384004050)
%7|1478548254.490|OP|rdkafka#consumer-1| patricktest [0] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1478548254.490|FETCH|rdkafka#consumer-1| Start fetch for patricktest [0] in state none at offset 1908 (v2)
%7|1478548254.490|PARTSTATE|rdkafka#consumer-1| Partition patricktest [0] changed fetch state none -> active
%7|1478548254.490|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op PARTITION_JOIN in state up (join state started, v4) for patricktest [0]
%7|1478548254.490|PARTADD|rdkafka#consumer-1| Group "patricklaptop": add patricktest [0]
msg is None
%7|1478548254.591|FETCHDEC|rdkafka#consumer-1| Topic patricktest [0]: fetch decide: updating to version 2 (was 0) at offset 1908 (was 0)
%7|1478548254.591|FETCHADD|rdkafka#consumer-1| kafkahost:9092/1: Added patricktest [0] to fetch list (1 entries, opv 2)
%7|1478548254.591|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 9)
%7|1478548254.634|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 777 bytes, CorrId 9, rtt 42.37ms)
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=48, tm_sec=32, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=48, tm_sec=42, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=48, tm_sec=47, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=48, tm_sec=53, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=48, tm_sec=58, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=3, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=8, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=13, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=18, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=24, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=29, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=34, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=40, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=45, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=50, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=49, tm_sec=55, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=0, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=6, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=11, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=16, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=21, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=26, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=32, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=37, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=42, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=48, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=53, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
%7|1478548254.635|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 10)
%7|1478548254.771|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 10, rtt 135.85ms)
% patricktest [0] reached end at offset 1935
end of while cycle
%7|1478548255.274|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 11)
%7|1478548255.395|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548255.409|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 11, rtt 134.18ms)
%7|1478548255.409|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 12)
%7|1478548255.409|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 13)
%7|1478548255.540|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 12, rtt 130.39ms)
%7|1478548255.540|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 14)
%7|1478548255.542|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 13, rtt 132.58ms)
%7|1478548255.670|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 14, rtt 129.77ms)
%7|1478548255.670|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 15)
msg is None
%7|1478548255.801|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 15, rtt 130.76ms)
%7|1478548255.801|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 16)
%7|1478548255.931|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 16, rtt 130.51ms)
%7|1478548255.932|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 17)
%7|1478548256.064|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 17, rtt 132.37ms)
%7|1478548256.064|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 18)
%7|1478548256.197|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 18, rtt 133.16ms)
%7|1478548256.198|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 19)
%7|1478548256.329|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 19, rtt 131.69ms)
%7|1478548256.329|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 20)
%7|1478548256.434|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548256.462|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 20, rtt 132.24ms)
%7|1478548256.462|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 21)
%7|1478548256.462|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 22)
%7|1478548256.591|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 21, rtt 128.95ms)
%7|1478548256.591|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 23)
%7|1478548256.592|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 22, rtt 130.28ms)
%7|1478548256.721|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 23, rtt 129.63ms)
%7|1478548256.721|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 24)
msg is None
%7|1478548256.852|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 24, rtt 130.26ms)
%7|1478548256.852|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 25)
%7|1478548256.982|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 25, rtt 129.40ms)
%7|1478548256.982|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 26)
%7|1478548257.112|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 26, rtt 129.95ms)
%7|1478548257.112|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 27)
%7|1478548257.241|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 27, rtt 129.06ms)
%7|1478548257.241|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 28)
%7|1478548257.372|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 28, rtt 130.23ms)
%7|1478548257.372|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 29)
%7|1478548257.434|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548257.473|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 30)
%7|1478548257.502|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 29, rtt 129.91ms)
%7|1478548257.502|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 31)
%7|1478548257.504|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 30, rtt 30.29ms)
%7|1478548257.613|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 241 bytes, CorrId 31, rtt 110.90ms)
Received message: Kwak I got data on time.struct_time(tm_year=2016, tm_mon=11, tm_mday=7, tm_hour=19, tm_min=50, tm_sec=58, tm_wday=0, tm_yday=312, tm_isdst=0)
end of while cycle
%7|1478548257.614|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 32)
%7|1478548257.744|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 32, rtt 130.73ms)
% patricktest [0] reached end at offset 1936
end of while cycle
%7|1478548258.249|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 33)
%7|1478548258.379|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 33, rtt 130.09ms)
%7|1478548258.379|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 34)
%7|1478548258.432|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1908
%7|1478548258.432|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting stored offset 1936 for commit
%7|1478548258.432|OFFSET|rdkafka#consumer-1| kafkahost:9092/1: Enqueue OffsetCommitRequest(v1, 1/1 partition(s)))
%7|1478548258.434|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548258.480|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent OffsetCommitRequest (v1, 129 bytes @ 0, CorrId 35)
%7|1478548258.480|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 36)
%7|1478548258.509|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 34, rtt 129.30ms)
%7|1478548258.509|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 37)
%7|1478548258.511|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received OffsetCommitResponse (v1, 27 bytes, CorrId 35, rtt 30.59ms)
%7|1478548258.535|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Success
%7|1478548258.537|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 36, rtt 56.47ms)
%7|1478548258.640|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 37, rtt 130.84ms)
%7|1478548258.640|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 38)
msg is None
%7|1478548258.770|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 38, rtt 130.07ms)
%7|1478548258.770|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 39)
%7|1478548258.901|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 39, rtt 130.99ms)
%7|1478548258.901|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 40)
%7|1478548259.031|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 40, rtt 129.78ms)
%7|1478548259.031|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 41)
%7|1478548259.161|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 41, rtt 129.53ms)
%7|1478548259.161|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 42)
%7|1478548259.291|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 42, rtt 129.39ms)
%7|1478548259.291|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 43)
%7|1478548259.421|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 43, rtt 129.94ms)
%7|1478548259.421|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 44)
%7|1478548259.435|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548259.523|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 45)
%7|1478548259.551|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 44, rtt 129.40ms)
%7|1478548259.551|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 46)
%7|1478548259.552|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 45, rtt 29.04ms)
%7|1478548259.680|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 46, rtt 129.08ms)
%7|1478548259.680|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 47)
msg is None
%7|1478548259.811|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 47, rtt 130.12ms)
%7|1478548259.811|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 48)
%7|1478548259.941|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 48, rtt 130.33ms)
%7|1478548259.941|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 49)
%7|1478548260.073|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 49, rtt 131.24ms)
%7|1478548260.073|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 50)
%7|1478548260.097|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478548260.097|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Receive failed: Disconnected
%3|1478548260.097|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Receive failed: Disconnected
%7|1478548260.097|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state UP -> DOWN
%7|1478548260.105|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478548260.105|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Receive failed: Disconnected
%3|1478548260.105|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Receive failed: Disconnected
%7|1478548260.105|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state UP -> DOWN
%3|1478548260.105|ERROR|rdkafka#consumer-1| 2/2 brokers are down
%7|1478548260.105|RETRY|rdkafka#consumer-1| kafkahost:9092/1: Retrying FetchRequest (v1, 70 bytes, retry 1/2)
%7|1478548260.138|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state up -> query-coord (v4, join-state started)
%7|1478548260.138|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548260.199|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548260.199|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548260.199|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548260.206|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548260.210|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548260.210|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548260.231|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548260.231|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548260.231|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548260.231|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548260.242|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548260.242|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548260.242|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548260.242|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
%7|1478548260.737|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478548261.239|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548261.239|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548261.239|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548261.241|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548261.250|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548261.253|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548261.253|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548261.267|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548261.267|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548261.267|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548261.267|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548261.282|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548261.282|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548261.282|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548261.282|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
msg is None
%7|1478548261.838|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548262.270|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548262.271|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548262.271|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548262.282|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548262.285|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548262.285|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548262.300|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548262.300|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548262.300|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548262.300|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548262.314|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548262.315|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548262.315|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548262.315|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
%7|1478548262.340|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478548262.938|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548263.308|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548263.308|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548263.308|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548263.319|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548263.323|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548263.323|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548263.337|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548263.337|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548263.337|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548263.337|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548263.352|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548263.352|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548263.352|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548263.352|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
%7|1478548263.433|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548263.433|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548263.433|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548263.433|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548263.433|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548263.536|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478548264.038|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548264.337|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548264.338|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548264.338|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548264.359|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548264.362|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548264.362|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548264.366|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548264.366|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548264.366|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548264.366|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548264.391|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548264.391|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548264.391|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548264.391|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
%7|1478548264.636|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478548265.140|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548265.373|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548265.373|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548265.373|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548265.394|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548265.399|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548265.399|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548265.402|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548265.402|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548265.402|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548265.402|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548265.428|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548265.428|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548265.428|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548265.428|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
%7|1478548265.737|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
msg is None
%7|1478548266.241|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548266.411|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548266.412|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548266.412|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548266.433|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548266.439|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548266.439|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548266.440|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548266.440|FAIL|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548266.440|ERROR|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548266.440|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1478548266.468|BROKERFAIL|rdkafka#consumer-1| kafkahost:9092/1: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1478548266.468|FAIL|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%3|1478548266.468|ERROR|rdkafka#consumer-1| kafkahost:9092/1: Connect to ipv4#10.28.100.74:9092 failed: Connection refused
%7|1478548266.468|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> DOWN
msg is None
%7|1478548266.837|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548267.342|CGRPQUERY|rdkafka#consumer-1| Group "patricklaptop": no broker available for coordinator query: intervaled in state query-coord
%7|1478548267.446|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: broker in state DOWN connecting
%7|1478548267.446|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 3
%7|1478548267.446|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478548267.475|CONNECT|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connected to ipv4#10.28.100.74:9092
%7|1478548267.475|CONNECTED|rdkafka#consumer-1| kafkahost:9092/bootstrap: Connected (#2)
%7|1478548267.475|APIVERSION|rdkafka#consumer-1| kafkahost:9092/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1478548267.475|STATE|rdkafka#consumer-1| kafkahost:9092/bootstrap: Broker changed state CONNECT -> UP
%7|1478548267.479|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: broker in state DOWN connecting
%7|1478548267.481|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connecting to ipv4#10.28.100.74:9092 (plaintext) with socket 4
%7|1478548267.481|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state DOWN -> CONNECT
%7|1478548267.485|SEND|rdkafka#consumer-1| kafkahost:9092/bootstrap: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 4)
%7|1478548267.510|CONNECT|rdkafka#consumer-1| kafkahost:9092/1: Connected to ipv4#10.28.100.74:9092
%7|1478548267.510|CONNECTED|rdkafka#consumer-1| kafkahost:9092/1: Connected (#2)
%7|1478548267.510|APIVERSION|rdkafka#consumer-1| kafkahost:9092/1: Using (configuration fallback) 0.9.0 protocol features
%7|1478548267.510|STATE|rdkafka#consumer-1| kafkahost:9092/1: Broker changed state CONNECT -> UP
%7|1478548267.511|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 51)
%7|1478548267.511|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent MetadataRequest (v0, 38 bytes @ 0, CorrId 52)
%7|1478548267.511|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent FetchRequest (v1, 70 bytes @ 0, CorrId 53)
msg is None
%7|1478548267.939|CGRPQUERY|rdkafka#consumer-1| kafkahost:9092/1: Group "patricklaptop": querying for coordinator: intervaled in state query-coord
%7|1478548267.939|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state query-coord -> wait-coord (v4, join-state started)
%7|1478548268.014|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 54)
%7|1478548268.170|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 27 bytes, CorrId 51, rtt 659.03ms)
%7|1478548268.170|METADATA|rdkafka#consumer-1| kafkahost:9092/1: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7fe384004050, term 0, ref 5, remove 0)
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker kafkahost:9092/1 no longer leader
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker :0/internal is now leader for partition with 0 messages (0 bytes) queued
%7|1478548268.170|BRKMIGR|rdkafka#consumer-1| Migrating topic patricktest [0] 0x7fe384004050 from kafkahost:9092/1 to :0/internal (sending PARTITION_LEAVE to kafkahost:9092/1)
%7|1478548268.170|FETCHADD|rdkafka#consumer-1| kafkahost:9092/1: Removed patricktest [0] from fetch list (0 entries, opv 2)
%7|1478548268.170|TOPBRK|rdkafka#consumer-1| kafkahost:9092/1: Topic patricktest [0]: leaving broker (0 messages in xmitq, next leader :0/internal, rktp 0x7fe384004050)
%7|1478548268.170|TOPBRK|rdkafka#consumer-1| :0/internal: Topic patricktest [0]: joining broker (rktp 0x7fe384004050)
%7|1478548268.170|RECV|rdkafka#consumer-1| kafkahost:9092/bootstrap: Received MetadataResponse (v0, 27 bytes, CorrId 4, rtt 685.00ms)
%7|1478548268.170|METADATA|rdkafka#consumer-1| kafkahost:9092/bootstrap: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7fe384004050, term 0, ref 5, remove 0)
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: not updating broker: already on correct broker :0/internal
%7|1478548268.171|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 27 bytes, CorrId 52, rtt 660.69ms)
%7|1478548268.171|METADATA|rdkafka#consumer-1| kafkahost:9092/1: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478548268.171|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7fe384004050, term 0, ref 5, remove 0)
%7|1478548268.171|BRKDELGT|rdkafka#consumer-1| patricktest [0]: not updating broker: already on correct broker :0/internal
%7|1478548268.181|FETCHADD|rdkafka#consumer-1| :0/internal: Added patricktest [0] to fetch list (1 entries, opv 2)
%7|1478548268.214|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received FetchResponse (v1, 43 bytes, CorrId 53, rtt 703.48ms)
%7|1478548268.221|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received GroupCoordinatorResponse (v0, 12 bytes, CorrId 54, rtt 207.48ms)
%7|1478548268.272|CGRPCOORD|rdkafka#consumer-1| kafkahost:9092/1: Group "patricklaptop" GroupCoordinator response error: Broker: Group coordinator not available
%7|1478548268.273|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator 1 -> -1
%7|1478548268.273|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-coord -> wait-broker (v4, join-state started)
%7|1478548268.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548268.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548268.434|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548268.434|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548268.434|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
msg is None
%7|1478548269.039|CGRPQUERY|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop": querying for coordinator: intervaled in state wait-broker
%7|1478548269.070|SEND|rdkafka#consumer-1| kafkahost:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 36 bytes @ 0, CorrId 5)
%7|1478548269.102|RECV|rdkafka#consumer-1| kafkahost:9092/bootstrap: Received GroupCoordinatorResponse (v0, 52 bytes, CorrId 5, rtt 31.34ms)
%7|1478548269.139|CGRPCOORD|rdkafka#consumer-1| kafkahost:9092/bootstrap: Group "patricklaptop" coordinator is kafkahost:9092 id 1
%7|1478548269.139|CGRPCOORD|rdkafka#consumer-1| Group "patricklaptop" changing coordinator -1 -> 1
%7|1478548269.139|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker -> wait-broker-transport (v4, join-state started)
%7|1478548269.140|CGRPSTATE|rdkafka#consumer-1| Group "patricklaptop" changed state wait-broker-transport -> up (v4, join-state started)
%7|1478548269.240|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548269.328|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 55)
%7|1478548269.361|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 55, rtt 33.00ms)
msg is None
%7|1478548270.242|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548270.267|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 56)
%7|1478548270.296|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 56, rtt 28.72ms)
msg is None
%7|1478548271.342|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548271.405|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 57)
%7|1478548271.435|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 57, rtt 29.00ms)
msg is None
%7|1478548272.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548272.443|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 58)
%7|1478548272.472|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 58, rtt 29.33ms)
msg is None
%7|1478548273.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548273.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548273.434|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548273.434|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548273.434|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548273.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548273.479|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 59)
%7|1478548273.508|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 59, rtt 29.20ms)
msg is None
%7|1478548274.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548274.516|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 60)
%7|1478548274.546|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 60, rtt 29.35ms)
msg is None
%7|1478548275.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548275.454|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 61)
%7|1478548275.485|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 61, rtt 30.12ms)
msg is None
%7|1478548276.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548276.492|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 62)
%7|1478548276.521|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 62, rtt 29.00ms)
msg is None
%7|1478548277.437|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548277.528|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 63)
%7|1478548277.557|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 63, rtt 29.04ms)
msg is None
%7|1478548278.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548278.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548278.434|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548278.434|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548278.434|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548278.438|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548278.465|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 64)
%7|1478548278.494|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 64, rtt 29.14ms)
msg is None
%7|1478548279.438|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548279.501|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 65)
%7|1478548279.532|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 65, rtt 31.10ms)
msg is None
%7|1478548280.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548280.540|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 66)
%7|1478548280.569|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 66, rtt 29.66ms)
msg is None
%7|1478548281.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548281.477|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 67)
%7|1478548281.506|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 67, rtt 29.15ms)
msg is None
%7|1478548282.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548282.513|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 68)
%7|1478548282.543|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 68, rtt 29.66ms)
msg is None
%7|1478548283.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548283.434|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548283.434|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548283.434|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548283.434|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548283.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548283.447|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 69)
%7|1478548283.477|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 69, rtt 29.66ms)
msg is None
%7|1478548284.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548284.484|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 70)
%7|1478548284.514|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 70, rtt 29.26ms)
msg is None
%7|1478548285.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548285.523|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 71)
%7|1478548285.553|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 71, rtt 29.50ms)
msg is None
%7|1478548286.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548286.460|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 72)
%7|1478548286.489|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 72, rtt 29.61ms)
msg is None
%7|1478548287.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548287.496|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 73)
%7|1478548287.526|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 73, rtt 29.53ms)
msg is None
%7|1478548288.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548288.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548288.435|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548288.435|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548288.435|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548288.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548288.532|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 74)
%7|1478548288.562|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 74, rtt 29.95ms)
msg is None
%7|1478548289.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548289.468|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 75)
%7|1478548289.498|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 75, rtt 29.70ms)
msg is None
%7|1478548290.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548290.505|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 76)
%7|1478548290.535|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 76, rtt 29.40ms)
msg is None
%7|1478548291.439|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548291.442|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 77)
%7|1478548291.472|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 77, rtt 29.82ms)
msg is None
%7|1478548292.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548292.479|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 78)
%7|1478548292.508|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 78, rtt 29.32ms)
msg is None
%7|1478548293.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548293.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548293.435|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548293.435|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548293.435|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548293.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548293.515|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 79)
%7|1478548293.544|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 79, rtt 29.19ms)
msg is None
%7|1478548294.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548294.451|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 80)
%7|1478548294.480|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 80, rtt 29.39ms)
msg is None
%7|1478548295.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548295.487|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 81)
%7|1478548295.516|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 81, rtt 29.39ms)
msg is None
%7|1478548296.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548296.523|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 82)
%7|1478548296.552|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 82, rtt 28.92ms)
msg is None
%7|1478548297.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548297.456|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 83)
%7|1478548297.486|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 83, rtt 30.00ms)
msg is None
%7|1478548298.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: stored offset 1936, committed offset 1936
%7|1478548298.435|OFFSET|rdkafka#consumer-1| Topic patricktest [0]: setting offset INVALID for commit
%7|1478548298.435|COMMIT|rdkafka#consumer-1| OffsetCommit internal error: Local: No offset stored
%7|1478548298.435|COMMIT|rdkafka#consumer-1| OffsetCommit for 1 partition(s) returned: Local: No offset stored
%7|1478548298.435|UNASSIGN|rdkafka#consumer-1| Unassign not done yet (0 wait_unassign, 1 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1478548298.440|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548298.493|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 84)
%7|1478548298.523|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 84, rtt 29.55ms)
msg is None
^C%7|1478548299.441|HEARTBEAT|rdkafka#consumer-1| kafkahost:9092/1: Heartbeat for group "patricklaptop" generation id 42
%7|1478548299.530|SEND|rdkafka#consumer-1| kafkahost:9092/1: Sent HeartbeatRequest (v0, 86 bytes @ 0, CorrId 85)
%7|1478548299.559|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received HeartbeatResponse (v0, 2 bytes, CorrId 85, rtt 29.26ms)
Traceback (most recent call last):
  File "pipkafka.py", line 22, in <module>
    msg = c.poll(timeout=1.0)
KeyboardInterrupt
%7|1478548299.745|DESTROY|rdkafka#consumer-1| Terminating instance
%7|1478548299.845|CGRPOP|rdkafka#consumer-1| Group "patricklaptop" received op TERMINATE (v0) in state up (join state started, v4 vs 0)
%7|1478548299.845|CGRPTERM|rdkafka#consumer-1| Terminating group "patricklaptop" in state up with 1 partition(s)
%7|1478548299.845|UNSUBSCRIBE|rdkafka#consumer-1| Group "patricklaptop": unsubscribe from current subscription of 1 topics (leave group=yes, join state started, v4)
%7|1478548299.845|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": clearing subscribed topics list (1)
%7|1478548299.845|SUBSCRIPTION|rdkafka#consumer-1| Group "patricklaptop": effective subscription list changed from 1 to 0 topic(s):
%7|1478548299.845|PAUSE|rdkafka#consumer-1| Library pausing 1 partition(s)
%7|1478548299.845|BARRIER|rdkafka#consumer-1| patricktest [0]: rd_kafka_toppar_op_pause_resume:2007: new version barrier v3
%7|1478548299.845|PAUSE|rdkafka#consumer-1| Pause patricktest [0] (v3)
%7|1478548299.845|ASSIGN|rdkafka#consumer-1| Group "patricklaptop": delegating revoke of 1 partition(s) to application rebalance callback on queue rd_kafka_consumer_close: unsubscribe
%7|1478548299.845|CGRPJOINSTATE|rdkafka#consumer-1| Group "patricklaptop" changed join state started -> wait-revoke-rebalance_cb (v4, state up)
%7|1478548299.845|CGRPTERM|rdkafka#consumer-1| Group "patricklaptop": waiting for 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-unassign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
Fatal Python error: PyEval_RestoreThread: NULL tstate
[vagrant@localhost vagrant]$
patrickviet commented 7 years ago

Extra info, no idea if it's relevant:

Running last kafka to this date: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz

On Centos 6.8 with Oracle JVM 1.8.0u92

edenhill commented 7 years ago

You just have the one broker, right? What is your server.properties default.replication.factor?

patrickviet commented 7 years ago

Yes, just one broker for this test. default.replication.factor is at 3 since it's the default server.properties file However this topic was created with a replication factor of 1, explicitly

# /opt/kafka/bin/kafka-topics.sh --describe --topic patricktest --zookeeper localhost:2181
Topic:patricktest   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: patricktest  Partition: 0    Leader: 1   Replicas: 1 Isr: 1
edenhill commented 7 years ago

I think this is what messes it up:

%7|1478548268.170|RECV|rdkafka#consumer-1| kafkahost:9092/1: Received MetadataResponse (v0, 27 bytes, CorrId 51, rtt 659.03ms)
%7|1478548268.170|METADATA|rdkafka#consumer-1| kafkahost:9092/1: Error in metadata reply for topic patricktest (PartCnt 0): Err-38?
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: delegate to broker (none) (rktp 0x7fe384004050, term 0, ref 5, remove 0)
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker kafkahost:9092/1 no longer leader
%7|1478548268.170|BRKDELGT|rdkafka#consumer-1| patricktest [0]: broker :0/internal is now leader for partition with 0 messages (0 bytes) queued

MetadataResponse returns error 38 (INVALID_REPLICATION_FACTOR) in some transient state after the broker comes back up, since the partition count in the response is set to 0 there can't be a leader broker for the partition that librdkafka is consuming and thus librdkafka undelegates the current leader for that partition and stops consuming. I'm quite sure that if you would wait 5 minutes (topic.metadata.refresh.interval.ms) for the next Metadata request the metadata would be correct and the partition would get a leader again and consumtion would resume. You can try this with your example by configuring: .. , "default.topic.config": {"topic.metadata.refresh.interval.ms": 20000} ..

The situation should then be remedied within half a minute.

Can you give it a shot?

patrickviet commented 7 years ago

Thanks so much! It works! It starts reading again after 20sec. I didn't think of waiting a full 5min... And it also works on 0.9.1

Note: I managed to crash master, after a few kafka restarts

* rdkafka_cgrp.c:2420:rd_kafka_cgrp_metadata_update_check: assert: thrd_is_current(rkcg->rkcg_rk->rk_thread) *

Would it make any sense to have a default value lower for the topic metadata if the replication factor is zero? a sort of "retry faster if it all failed" option?

edenhill commented 7 years ago

Great!

Yeah, it needs to handle this somehow and there is a fast metadata refresher that kicks in for various other errors, but not for this code path. I'll look into it.

You wouldn't happen to have a core-file from that crash? If so, please do:

$ gdb python core
(gdb) bt
patrickviet commented 7 years ago

Sadly, I looked on the full filesystem of this virtual machine and didn't find any core file. I didn't manage to reproduce the issue either, it was a one time thing

Thanks again

edenhill commented 7 years ago

Created upstream issue, closing this one since a workaround is provided (set topic.metadata.refresh.interval.ms)

ghost commented 3 years ago

@edenhill I'm using version 1.6.0 and facing this issue. Our entire cluster went down recently which took down connectivity with all kafka brokers. The connection was reestablished but consumer didn't start pulling messages even after 5 min until I restarted my service in all the environments after several hours. Does kafka consumer automatically re-subscribe to the topics once brokers come online? This issue showed up on front page when I searched for similar query as the title.

This StackOverflow answer says, it connects automatically but doesn't resubscribe after reconnecting.

edenhill commented 3 years ago

The consumer will resume fetching (and/or rejoin the group if it has expired) when the cluster is up again and the client has refreshed its metadata, which may take a minute or so depending on configuration.

If you have any logs (preferably debug logs) from this situation please create a new issue and provide config, logs, etc.

nikita-yatchenko commented 2 years ago

Hello,

I am using almost identical infrastructure as in the original question. Only my logs are somewhat different and I do not understand them. I do know that kafka broker was unavailable and as a result consumer was logging errors. Could you kindly clarify these error messages?

%4|1636441172.909|SESSTMOUT|rdkafka#consumer-2| [thrd:main]: Consumer group session timed out (in join-state steady) after 45497 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group %4|1636441173.561|SESSTMOUT|rdkafka#consumer-3| [thrd:main]: Consumer group session timed out (in join-state steady) after 45497 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group %5|1636441175.500|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out HeartbeatRequest in flight (after 45090ms, timeout #0) %4|1636441175.500|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441175.500|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: cluster-dev-0-oms-dev-kafka.[ address ] :443: 1 request(s) timed out: disconnect (after 73335170ms in state UP) %5|1636441177.012|REQTMOUT|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out HeartbeatRequest in flight (after 45950ms, timeout #0) %4|1636441177.012|REQTMOUT|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441177.012|FAIL|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator: cluster-dev-0-oms-dev-kafka.[ address ] :443: 1 request(s) timed out: disconnect (after 73336680ms in state UP) %5|1636441189.744|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out FetchRequest in flight (after 61284ms, timeout #0) %4|1636441189.744|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441189.745|FAIL|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: 1 request(s) timed out: disconnect (after 73304396ms in state UP) %5|1636441189.864|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out FetchRequest in flight (after 61442ms, timeout #0) %4|1636441189.864|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441189.865|FAIL|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: 1 request(s) timed out: disconnect (after 73304551ms in state UP) %4|1636441250.791|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %4|1636441250.891|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %5|1636441253.870|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Timed out MetadataRequest in flight (after 60217ms, timeout #0) %4|1636441253.870|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441253.870|FAIL|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: 1 request(s) timed out: disconnect (after 64062ms in state UP) %5|1636441290.193|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1]: ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1: Timed out MetadataRequest in flight (after 60897ms, timeout #0) %5|1636441290.194|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1]: ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1: Timed out FindCoordinatorRequest in flight (after 60897ms, timeout #1) %4|1636441290.194|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1]: ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441290.194|FAIL|rdkafka#consumer-2| [thrd:ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1]: ssl://cluster-dev-1-oms-dev-kafka.[ address ] :443/1: 2 request(s) timed out: disconnect (after 100281ms in state UP) %4|1636441311.291|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %4|1636441311.391|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %4|1636441314.097|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Timed out 0 in-flight, 0 retry-queued, 61 out-queue, 0 partially-sent requests %4|1636441371.907|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %4|1636441371.991|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 0 partially-sent requests %4|1636441374.794|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Timed out 0 in-flight, 0 retry-queued, 61 out-queue, 0 partially-sent requests %5|1636441383.103|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: Timed out ApiVersionRequest in flight (after 10011ms, timeout #0) %5|1636441383.103|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: Timed out ApiVersionRequest in flight (after 10011ms, timeout #0) %4|1636441383.104|FAIL|rdkafka#consumer-2| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10012ms in state APIVERSION_QUERY) %4|1636441383.104|FAIL|rdkafka#consumer-3| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10012ms in state APIVERSION_QUERY) %4|1636441383.104|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %4|1636441383.104|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap]: ssl://oms-dev-kafka-bootstrap.[ address ] :443/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441395.809|FAIL|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 140938ms in state CONNECT) %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.809|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %4|1636441395.810|METADATA|rdkafka#consumer-3| [thrd:main]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: Metadata request failed: refresh unavailable topics: Local: SSL error (0ms): Permanent %3|1636441395.811|FAIL|rdkafka#consumer-2| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 105615ms in state CONNECT) %3|1636441398.936|FAIL|rdkafka#consumer-3| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 3024ms in state CONNECT, 1 identical error(s) suppressed) %3|1636441424.919|FAIL|nlp-learning-client-6588dc6778-cnh47#producer-1| [thrd:ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2]: ssl://cluster-dev-2-oms-dev-kafka.[ address ] :443/2: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 103720ms in state CONNECT) %4|1636441431.833|SESSTMOUT|rdkafka#consumer-3| [thrd:main]: Consumer group session timed out (in join-state steady) after 45498 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group %4|1636441431.834|SESSTMOUT|rdkafka#consumer-2| [thrd:main]: Consumer group session timed out (in join-state steady) after 45498 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group %5|1636441435.309|REQTMOUT|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out HeartbeatRequest in flight (after 45975ms, timeout #0) %4|1636441435.309|REQTMOUT|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441435.310|FAIL|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator: cluster-dev-0-oms-dev-kafka.[ address ] :443: 1 request(s) timed out: disconnect (after 52052ms in state UP, 1 identical error(s) suppressed) %5|1636441435.314|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out HeartbeatRequest in flight (after 45979ms, timeout #0) %4|1636441435.314|REQTMOUT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441435.315|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: cluster-dev-0-oms-dev-kafka.[ address ] :443: 1 request(s) timed out: disconnect (after 52056ms in state UP, 1 identical error(s) suppressed) %5|1636441448.358|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out FetchRequest in flight (after 60513ms, timeout #0) %4|1636441448.358|REQTMOUT|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441448.358|FAIL|rdkafka#consumer-2| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: 4 request(s) timed out: disconnect (after 65067ms in state UP) %5|1636441449.323|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out FetchRequest in flight (after 61426ms, timeout #0) %4|1636441449.324|REQTMOUT|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests %3|1636441449.324|FAIL|rdkafka#consumer-3| [thrd:ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0]: ssl://cluster-dev-0-oms-dev-kafka.[ address ] :443/0: 4 request(s) timed out: disconnect (after 66066ms in state UP)