Closed zoltan-fedor closed 8 years ago
This can happen to a kafka system if there is a message larger than your consumer's max_partition_fetch_bytes . Have you tried tuning that?
Thanks. The max_partition_fetch_bytes is already set to pretty high (10*1048576), so I don't think that would be the issue, but I will try to increase it even further to verify.
Unfortunately as expected, the max_partition_fetch_bytes did not make any difference, even increasing it 1000-fold did make the consumer read the message from the partition.
So with having:
max_partition_fetch_bytes=1000*1048576
The issue is the same as above.
Can you get debug logs for a consumer in this "stuck" state?
Sure, here comes the debug log of the consumer when it is in this "stuck" state. I am only trying to read one partition (partition 11), which I know that it is currently "stuck".
It seems it cannot find the message it is looking for (message # 812899910).
DEBUG:kafka.client:Attempting to bootstrap via node at 10.1.1.2:9092 DEBUG:kafka.conn:
: creating new socket DEBUG:kafka.conn: : established TCP connection DEBUG:kafka.client:Node bootstrap connected DEBUG:kafka.conn: Request 1: MetadataRequest_v0(topics=[]) DEBUG:kafka.conn: Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='fr.amers1.cloud', port=9092), (node_id=1, host='fr2.amers1.cloud', port=9092)], topics=[(error_code=0, topic='opsonsole', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='_schemas', partitions=[(error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='opsconsole', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='opsconsole_test', partitions=[(error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=41, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=44, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=17, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=35, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=29, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=47, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=46, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=40, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=49, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=22, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=31, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=16, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=43, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=25, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=34, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=37, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=28, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=19, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=45, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=27, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=21, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=30, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=39, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=33, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])]), (error_code=0, topic='_schemas2', partitions=[(error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='apphits_test', partitions=[(error_code=0, partition=1, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=0, replicas=[0, 1], isr=[0, 1])]), (error_code=0, topic='apphits', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])])]) DEBUG:kafka.cluster:Updated cluster metadata to Cluster(brokers: 2, topics: 9, groups: 0) DEBUG:kafka.client:No connected nodes found. Trying disconnected nodes. DEBUG:kafka.client:No luck. Trying all broker metadata DEBUG:kafka.client:Initiating connection to node 0 at fr.amers1.cloud:9092 DEBUG:kafka.conn: : creating new socket DEBUG:kafka.conn: : established TCP connection DEBUG:kafka.client:Node 0 connected INFO:kafka.conn:Broker is not v0.10 -- it did not recognize ApiVersionRequest_v0 DEBUG:kafka.conn: : creating new socket DEBUG:kafka.conn: : established TCP connection DEBUG:kafka.client:Node 0 connected DEBUG:kafka.conn: Response 3: ListGroupsResponse_v0(error_code=0, groups=[]) INFO:kafka.conn:Broker version identifed as 0.9 INFO:kafka.conn:Set configuration api_version='0.9' to skip auto check_version requests on startup DEBUG:kafka.metrics.metrics:Added sensor with name bytes-fetched DEBUG:kafka.metrics.metrics:Added sensor with name records-fetched DEBUG:kafka.metrics.metrics:Added sensor with name fetch-latency DEBUG:kafka.metrics.metrics:Added sensor with name records-lag DEBUG:kafka.metrics.metrics:Added sensor with name fetch-throttle-time DEBUG:kafka.metrics.metrics:Added sensor with name commit-latency 2016-06-03 14:09:28,682 (5605) [0:00:13.194239] - INFO - Uploaded the uuid-hashed uuid dict with 808642 records. - [module:opsconsole, line:334, functionname: ] INFO:Opsconsole ETL: Uploaded the uuid-hashed uuid dict with 808642 records. 2016-06-03 14:09:28,722 (5605) [0:00:00.040302] - INFO - Uploaded the statcodes set with 658 records. - [module:opsconsole, line:348, functionname: ] INFO:Opsconsole ETL: Uploaded the statcodes set with 658 records. DEBUG:kafka.coordinator:Sending group coordinator request for group biapp_opsconsole_etl to broker 0 DEBUG:kafka.conn: Request 5: GroupCoordinatorRequest_v0(consumer_group='biapp_opsconsole_etl') DEBUG:kafka.client:Sending metadata request MetadataRequest_v0(topics=['opsconsole']) to node 0 DEBUG:kafka.conn: Request 6: MetadataRequest_v0(topics=['opsconsole']) WARNING:kafka.conn: timed out after 10000 ms. Closing connection. WARNING:kafka.client:Node 0 connection failed -- refreshing metadata ERROR:kafka.coordinator:Error sending GroupCoordinatorRequest_v0 to node 0 [RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.] DEBUG:kafka.client:Initializing connection to node 0 for metadata request DEBUG:kafka.conn: : creating new socket DEBUG:kafka.conn: : established TCP connection DEBUG:kafka.client:Node 0 connected DEBUG:kafka.client:Sending metadata request MetadataRequest_v0(topics=['opsconsole']) to node 0 DEBUG:kafka.conn: Request 7: MetadataRequest_v0(topics=['opsconsole']) DEBUG:kafka.conn: Response 7: MetadataResponse_v0(brokers=[(node_id=0, host='fr.amers1.cloud', port=9092), (node_id=1, host='fr2.amers1.cloud', port=9092)], topics=[(error_code=0, topic='opsconsole', partitions=[(error_code=0, partition=8, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=11, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=5, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=4, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=13, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=7, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=10, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=1, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=9, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=3, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=6, leader=1, replicas=[1, 0], isr=[1, 0]), (error_code=0, partition=15, leader=0, replicas=[0, 1], isr=[0, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 0], isr=[1, 0])])]) DEBUG:kafka.cluster:Updated cluster metadata to Cluster(brokers: 2, topics: 1, groups: 0) DEBUG:kafka.coordinator:Sending group coordinator request for group biapp_opsconsole_etl to broker 0 DEBUG:kafka.conn: Request 8: GroupCoordinatorRequest_v0(consumer_group='biapp_opsconsole_etl') DEBUG:kafka.conn: Response 8: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092) DEBUG:kafka.coordinator:Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092) DEBUG:kafka.cluster:Updating coordinator for biapp_opsconsole_etl: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host='fr2.amers1.cloud', port=9092) INFO:kafka.cluster:Group coordinator for biapp_opsconsole_etl is BrokerMetadata(nodeId=1, host='fr2.amers1.cloud', port=9092) INFO:kafka.coordinator:Discovered coordinator 1 for group biapp_opsconsole_etl DEBUG:kafka.client:Initiating connection to node 1 at fr2.amers1.cloud:9092 DEBUG:kafka.conn: : creating new socket DEBUG:kafka.coordinator.consumer:Node 1 not ready -- failing offset fetch request DEBUG:kafka.conn: : established TCP connection DEBUG:kafka.client:Node 1 connected DEBUG:kafka.coordinator.consumer:Group biapp_opsconsole_etl fetching committed offsets for partitions: {TopicPartition(topic='opsconsole', partition=11)} DEBUG:kafka.conn: Request 1: OffsetFetchRequest_v1(consumer_group='biapp_opsconsole_etl', topics=[(topic='opsconsole', partitions=[11])]) DEBUG:kafka.conn: Response 1: OffsetFetchResponse_v1(topics=[(topic='opsconsole', partitions=[(partition=11, offset=812899910, metadata='', error_code=0)])]) DEBUG:kafka.consumer.fetcher:Resetting offset for partition TopicPartition(topic='opsconsole', partition=11) to the committed offset 812899910 DEBUG:kafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='opsconsole', partition=11) at offset 812899910 DEBUG:kafka.consumer.fetcher:Sending FetchRequest to node 0 DEBUG:kafka.conn: Request 9: FetchRequest_v1(replica_id=-1, max_wait_time=10000, min_bytes=1, topics=[(topic='opsconsole', partitions=[(partition=11, offset=812899910, max_bytes=1048576)])]) DEBUG:kafka.conn: Response 9: FetchResponse_v1(throttle_time_ms=0, topics=[(topics='opsconsole', partitions=[(partition=11, error_code=0, highwater_offset=819289683, message_set=['(offset=812899910, message=373)', ... '(offset=812905264, message=1226)', '(offset=812905265, message=325)', '(offset=812905278, message=1478)', '(offset=812905279, message=292)', '(offset=None, message=None)'])])]) DEBUG:kafka.consumer.fetcher:Adding fetched record for partition TopicPartition(topic='opsconsole', partition=11) with offset 812899910 to buffered record list DEBUG:kafka.metrics.metrics:Added sensor with name topic.opsconsole.bytes-fetched DEBUG:kafka.metrics.metrics:Added sensor with name topic.opsconsole.records-fetched DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899912 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899913 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Skipping message offset: 812899914 (expecting 812899910) ... DEBUG:kafka.consumer.fetcher:Skipping message offset: 812905278 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Skipping message offset: 812905279 (expecting 812899910) DEBUG:kafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='opsconsole', partition=11) at offset 812899910 ...
This is useful, thanks! I'll investigate.
This is strange -- it appears that the compressed messageset that we get back from the broker has packed incorrect message offsets.
The logs suggest that the outer offset is correct:
DEBUG:kafka.conn: Response 9: FetchResponse_v1(throttle_time_ms=0, topics=[(topics='opsconsole',
partitions=[(partition=11, error_code=0, highwater_offset=819289683,
message_set=['(offset=812899910, message=373)',
...
But the inner offsets appear to be 0:
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 812899910)
You are running 0.9 brokers with the v0 message format, correct? If so, compressed messages should always include absolute offsets as calculated by the broker. What are you using to produce these messages?
Yes, I am having 0.9.0.1 brokers from Confluent (confluent-kafka-2.11.7-0.9.0.1-1.noarch).
Not sure about the v0 message format. How could I check that?
I use kafka-python to produce the messages, see (simplified) code below:
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer, Util
from kafka import KafkaProducer
# Initialize the client by connecting to the Schema Registry
client = CachedSchemaRegistryClient(url='http://'+schema_registry_server)
# Load the current schema file
avro_schema = Util.parse_schema_from_file(current_folder+'/jobs/'+avro_schema_file)
# get the schema_id for the given schema file and topic
schema_id = client.register(kafka_topic, avro_schema)
# Avro message serialized
serializer = MessageSerializer(client)
producer = KafkaProducer(
bootstrap_servers=kafka_servers.split(','),
compression_type='snappy',
retries=5
)
for line in f:
data = json.loads(line)
avro_encoded = serializer.encode_record_with_schema_id(schema_id, data) # encode the data into avro according to the Schema
producer.send(kafka_topic, avro_encoded) # upload to Kafka
producer.flush()
Can you try grabbing the specific message using something like this:
https://gist.github.com/dpkp/452ea2080d54bb615ae4779851ada689
Point that at the host / port for the leader of the topic-partition that has the "stuck" message and send me the output.
I hope you don't mind but I have emailed the output to you. The error behavior has change somewhat, now it simply cannot read the message - just runs the CPU on 100% in kafa-python trying to read it. I used your snippet to export the message into a file and the funny thing that when I try to open it in a text editor (pluma) on my linux desktop, it has the same problem, CPU 100% trying to open the file.
The same happening across multiple topics now - CPU is at 100%, but kafka-python gets stuck reading the messages at a given point and can't move any further. It is like the compression would have hit a bug, causing CPU 100%.
Thanks -- specifically, which offset are you seeing as "stuck" ?
It is 813721754
I have the same issue and as I can see all requests goes out with default group_id kafka-python-default-group, despite I set another one - zinc_group: 2016-06-07 14:11:51,965.965.569019318:kafka.coordinator.consumer:140668873398080:DEBUG:11571:Group kafka-python-default-group fetching committed offsets for partitions: set([TopicPartition(topic='catalog-test1', partition=0)])
May this be the reason why we don't see any new messages?
I use kafka-python 1.2.1 Broker version: 0.8.1
I believe my issue was related to snappy compression. Since I turned snappy compression off in the producers a few days ago, the issue is gone.
Great, I also found the issue. Now it works. Thanks.
I haven't been able to identify the root cause here. The debug logs show that it is related to the fetcher discarding compressed messages that have offsets that appear too low. In normal operation this can happen if you request a message offset that falls in the middle of a compressed message set. In that case the fetch correctly scans through the compressed message set, dropping messages until it gets to the requested offset. If messages are not compressed, this code is not necessary because kafka can always return the exact message offset requested. But something is happening here that is causing the compressed message set scan to fail. I assume this bug is on the client side, but I have been unable to track it down. Help appreciated! If anyone can provide a reproducible test case, that would also be very useful.
Could it be related to this? https://issues.apache.org/jira/browse/KAFKA-3789
Although that seems to only apply to Kafka 0.10, so maybe isn't applicable.
I saw this behavior today. I'm using snappy compression. In my case, the problem message was not decoded during Message.decompress
-- the call succeeded but the message.value appeared unchanged. I'm not clear on whether the data was corrupt or there was a failure in the decoding caused by some bug. After this, since kafka-python was not seeing the offset as it scanned over the messages in the set, it was stuck in a loop making the same request and scanning it over and over.
kafka-python 1.2.1, python-snappy 0.5, kafka 0.9.0.1
Please disregard what I said about problems in decompression -- the message eventually show up decompressed. I haven't quite wrapped my head around the recursive _unpack_message_set calls and the relative vs. absolute offsets, but my stuck message ends up being yielded as a ConsumerRecord with a (relative) offset 0, and the scan at https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L440 is checking for an absolute:
DEBUG:kafka.consumer.fetcher:Adding fetched record for partition TopicPartition(topic=u'topic_name_redacted', partition=98) with offset 1057322 to buffered record list
DEBUG:kafka.consumer.fetcher:Skipping message offset: 0 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057323 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057324 (expecting 1057322)
DEBUG:kafka.consumer.fetcher:Skipping message offset: 1057325 (expecting 1057322)
Hope this is useful.
Thanks for the details. The _unpack_message_set should only be recursive when dealing with compressed messages. To encode "compressed" messages, kafka writes the messages into a message set structure, encodes the message set into bytes, compresses those bytes, writes the compressed bytes as a new, single message with the compression flag set, and then writes this new message as the only item in a new message set. It is this new "compressed" message set that is sent to and received from the broker. So in practice decoding a compressed messageset requires decoding the wrapper message set, getting the underlying message, decompressing the message into a new messageset, and decoding the new messageset into a list of uncompressed messages.
The relative / absolute offset change happened in kafka 0.10 clients/brokers. Older messages had encoded the underlying messages with absolute offsets. But this required the broker to decompress / recompress every compressed messageset as it was produced. The new approach only writes the absolute offset to the outer messageset, and uses relative offsets in the inner messageset in order to avoid the broker decompress/recompress step when handling produced messages.
I'm not sure whether that leaves you more confused or less, but hopefully this adds some useful background.
Yes, that helps -- explains why I had a compressed message even after I decompressed it once. I can poke around in the debugger again to try to understand how that 0 offset is getting out to that level.
Thanks -- it could be a bug in _unpack_message_set or it could be that there is an edge case where a compressed message set gets returned with 0 as the first offset and maybe the java client handles this case? I tried to look at raw data from zoltan to see if it was the 0 offset case, but the data looked normal and I was unable to reproduce in a repl. If you can get this to happen in a debug session, please post details! I have not been able to reproduce so far.
Here's a narrative from my debug session:
# we get to the top level _unpack_message_set with the right offset
> msg.is_compressed()
True
> offset
1057322
> relative_offset
0
# after mset = msg.decompress()
> mset[0][0]
1057322
# call _unpack_message_set a second time,
>msg.is_compressed()
True
> offset
1057322
> relative_offset
0
# after msg.decompress()
ipdb> mset[0][0]
0
# _unpack_message_set third time around
> msg.is_compressed()
False
> offset
0
> relative_offset
0
# finally we yield ConsumerRecord(... offset + relative_offset) <-- with the undesirable zero
Whereas in the normal, working case:
# _unpack_message_set
> msg.is_compressed()
True
> offset
1057321
> mset[0][0]
1057321
# _unpack_message_set again
ipdb> msg.is_compressed()
False
> offset
1057321
> relative_offset
0
# yield ConsumerRecord(... offset + relative_offset) <-- 1057321
I haven't busted into the raw response yet...
I got the same issue. after I changed " log.cleanup.policy = compact" in the server.properties to "log.cleanup.policy = delete" . Work well now!
Hey, seems I got a same problem.
kafka-python (1.0.2)
kafka-0.9.0.0
Here's my consumer
consumer = KafkaConsumer(
bootstrap_servers = 'xxx',
group_id = 'my.group',
max_partition_fetch_bytes = 1048576*1000,
)
consumer.assign([
TopicPartition(topic=my_topic, partition=0),
])
consumer.seek(TopicPartition(topic=my_topic, partition=0), 706)
for msg in consumer:
print msg.topic, msg.partition, msg.offset, msg.key
The "stuck" offset is 707
, so I read from 706
.
And here comes the log (I replaced the value
with none sense
here, and the original value
is in gzip
compression if it matters)
https://gist.github.com/ayiis/b5d5738722a0bfb184bf21c8230f4776
~PS
I upgrade kafka-python to 1.2.3
and seems it makes a little deferences
https://gist.github.com/ayiis/45e7489f7e8792888f3f799fc9652ad0
~PS
I upgrade kafka-python to 1.2.4
and it turns out that
The offset 707
had been ignored and stepped over, with no exception
First I got 706
and the next one is 708
and 709
and goes on.
Are any of you consuming from topics that are being produced by MirrorMaker ?
I'm still unable to reproduce this issue, but it would be great if anyone that can could test against PR #755
I tried PR 755 and it worked fine!
Now I can consume 707
as well as the other.
btw, I use kafka-python (1.0.2)
as my producer.
Thank you.
~PS
I looked into the raw_bytes
in Message.decompress()
. And I found that
The logs suggest that the outer offset is correct
But the inner offsets appear to be 0
is exactly what has happened.
Still dont know why. but i guess my message in kafka is somehow broken or invalid.
I reproduced this one.
In my case, it was caused by producer in a bad network.
kafka-python (1.2.4)
kafka-0.9.0.0
for x in xrange(100):
try:
producer = KafkaProducer(
bootstrap_servers = 'xxx',
compression_type = 'gzip',
retries = 5,
retry_backoff_ms = 1000,
)
print 'create producer success'
# Start sending RST to this producer whatever producer.send(), to cause a network RESET.
# To cause producer retries
# PS: This will not cause [BrokerConnection reset] Exception
time.sleep(5)
record_metadata = producer.send(topic = 'test.ay.1', value = 'Hello kafka!' + str(x)).get()
producer.flush()
producer.close()
except Exception, e:
print 'Exception:', e
Keep responsing RST
to producer.send().get()
to cause producer retries.
Stop sending RST
after the producer have retried a few times, to allow producer sending message to kafka. Then, one of those messages may "stuck" the KafkaConsumer.
aha! that makes sense. KafkaProducer may actually be the culprit here by double-compressing messages if there is a failure + retry.
I can reproduce -- excellent work, @ayiis !
I was able to force this behavior in KafkaProducer by hacking the retry code (this forces retries even on success):
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..2b56da1 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -215,7 +215,7 @@ class Sender(threading.Thread):
if error is Errors.NoError:
error = None
- if error is not None and self._can_retry(batch, error):
+ if self._can_retry(batch, error):
# retry
log.warning("Got error produce response on topic-partition %s,"
" retrying (%d attempts left). Error: %s",
@@ -243,8 +243,8 @@ class Sender(threading.Thread):
We can retry a send if the error is transient and the number of
attempts taken is fewer than the maximum allowed
"""
- return (batch.attempts < self.config['retries']
- and getattr(error, 'retriable', False))
+ return batch.attempts < self.config['retries']
+ #and getattr(error, 'retriable', False))
using a producer configured like this: KafkaProducer(retries=1, compression_type='gzip')
I am able to produce messages and cause KafkaConsumer to get "stuck" (prior to commit 003bb0a) or skip the message (after commit 003bb0a).
I will submit a patch to fix KafkaProducer. The current behavior of KafkaConsumer is to skip these double-compressed messages. I will think about whether we can or should attempt to decompress them now that we know how they are constructed. Unfortunately these messages are likely incompatible with other clients and so I'm actually hesitant to add code to handle them here. So I am currently leaning towards skip w/ warning. Thoughts?
Yeah, if a message is malformed, discarding while making noise about it seems appropriate.
Quite agree with benauthor
.
I made some changes to the consumer side of compressed message sets in #755 . I have tried a few different approaches and at this point I am leaning towards making the default behavior to return the corrupt message to the user. I added a configuration parameter named skip_double_compressed_messages
that can be set to skip the messages.
Does this sound like a reasonable solution?
I can abide that; returning the inner compressed data would be what you'd expect kafka-python to do if the producer bug was in some other library's producer.
For these double-compressed messages, sure the consumer could return the corrupt message or has a setting to suppress them (step them over). In my case I actually use Kafka-python as the producer and still saw this issue on the consumer side, so we should also look at the Producer side of Kafka-python to prevent this double-compression occurring at the first place, next to making the Consumer handle it in the above described manner.
Absolutely! I've already landed to master a fix for producer.
That would be great!
Fixes to both KafkaProducer and KafkaConsumer have been released in 1.2.5 -- please reopen if this issue resurfaces!
Thanks again to everyone for all the hard work tracking this one down!!
This is an interesting one.
In all of our topics every day a handful of partitions get "stuck". Basically the reading of the partition stops at a given message and kafka-python reports that there are no more messages in the given partition (just like it would have consumed all messages), while there are unconsumed messages. The only way to get the consumers moving again is to manually seek the offset forward by stepping over the "stuck" messages and then works again for a few million records and then get stuck again at some later offset.
I have multiple consumers consuming from the same topic and they all get stuck at the same messages of the same topics. Random number of partitions are affected day-to-day.
We are using Kafka broker version 0.9.0.1, kafka-python 1.2.1 (had the same issue with 1.1.1).
The consumer code is very simple (the below code is trying to read only partition #1, which is currently "stuck"):
The above code prints "Completed", but not the messages, while there is a 5M offset lag in partition 1, so there would be plenty of messages to read. After seeking the consumer offset forward the code works again until it doesn't get "stuck" again.