jrydberg / gevent-kafka

Python (gevent) bindings for Apache Kafka
12 stars 4 forks source link

consumer times out often, help to debug? #2

Open davidbirdsong opened 12 years ago

davidbirdsong commented 12 years ago

Hi, in demo'ing simple consumer code, I see many timeouts that I could use some help to track down. None of our scala producers/consumers see these timeouts to kafka, only the python clients:

Traceback (most recent call last): File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 192, in _reader self.offsets[bpid], self.max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/broker.py", line 50, in fetch max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/broker.py", line 36, in _interaction value = getattr(conn, method)(_args, *_kw) File "/usr/lib/python2.6/site-packages/gevent_kafka/protocol.py", line 209, in fetch error_code, messages, length = _decode_fetch_response(self.read()) File "/usr/lib/python2.6/site-packages/gevent_kafka/protocol.py", line 173, in read data = self.socket.recv(4) File "/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/socket.py", line 427, in recv wait_read(sock.fileno(), timeout=self.timeout, event=self._read_event) File "/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/socket.py", line 169, in wait_read switch_result = get_hub().switch() File "/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/hub.py", line 164, in switch return greenlet.switch(self) timeout: timed out

jrydberg commented 12 years ago

could it be that you have a poll interval longer than 5 seconds?

it seems we have a hard-coded read-timeout of 5 seconds here:

https://github.com/jrydberg/gevent-kafka/blob/master/gevent_kafka/broker.py#L30

davidbirdsong commented 12 years ago

i actually had my poll interval set to 5, i'll try 3 and restart.

On Fri, Oct 19, 2012 at 12:12 AM, Johan Rydberg notifications@github.comwrote:

could it be that you have a poll interval longer than 5 seconds?

it seems we have a hard-coded read-timeout of 5 seconds here:

https://github.com/jrydberg/gevent-kafka/blob/master/gevent_kafka/broker.py#L30

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9591684.

davidbirdsong commented 12 years ago

didn't help, still seeing timeouts at: File "/usr/lib/python2.6/site-packages/gevent_kafka/protocol.py", line 173, in read data = self.socket.recv(4)

and at: File "/usr/lib/python2.6/site-packages/gevent_kafka/protocol.py", line 182, in read data = self.socket.recv(size - len(buf))

and also rarely at: File "/usr/lib/python2.6/site-packages/gevent_kafka/pool.py", line 51, in create_connection timeout=self.connect_timeout)

On Fri, Oct 19, 2012 at 12:18 AM, David Birdsong david.birdsong@gmail.comwrote:

i actually had my poll interval set to 5, i'll try 3 and restart.

On Fri, Oct 19, 2012 at 12:12 AM, Johan Rydberg notifications@github.comwrote:

could it be that you have a poll interval longer than 5 seconds?

it seems we have a hard-coded read-timeout of 5 seconds here:

https://github.com/jrydberg/gevent-kafka/blob/master/gevent_kafka/broker.py#L30

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9591684.

jrydberg commented 12 years ago

try to increase the hard-coded values to something HUGE (e.g., five minutes) and see if you get any data at all, or if things are just stuck.

what version of Kafka btw ?

davidbirdsong commented 12 years ago

On Fri, Oct 19, 2012 at 12:28 AM, Johan Rydberg notifications@github.comwrote:

try to increase the hard-coded values to something HUGE (e.g., five minutes) and see if you get any data at all, or if things are just stuck.

raised to 5 mins, not stuck at all. cruising pretty fast actually.

what version of Kafka btw ?

this is how we build the deployed kafka:

BRANCH="0.7.1"

this Hash is generally the latest commit in the 0.7.1 release branch.

HASH="b045327b8064c00f044937e1105f09ffcdc03432" git clone ${REPO} ${LOCAL} cd ${LOCAL} git checkout ${BRANCH} git checkout ${HASH} ./sbt clean ./sbt update ./sbt assembly

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9591941.

davidbirdsong commented 12 years ago

no read timeouts in the last ~20 mins.

so that helped. is there a way to plumb that kwarg through easily when constructing a consumer or subscribing to a topic?

On Fri, Oct 19, 2012 at 12:48 AM, David Birdsong david.birdsong@gmail.comwrote:

On Fri, Oct 19, 2012 at 12:28 AM, Johan Rydberg notifications@github.comwrote:

try to increase the hard-coded values to something HUGE (e.g., five minutes) and see if you get any data at all, or if things are just stuck.

raised to 5 mins, not stuck at all. cruising pretty fast actually.

what version of Kafka btw ?

this is how we build the deployed kafka:

BRANCH="0.7.1"

this Hash is generally the latest commit in the 0.7.1 release branch.

HASH="b045327b8064c00f044937e1105f09ffcdc03432" git clone ${REPO} ${LOCAL} cd ${LOCAL} git checkout ${BRANCH} git checkout ${HASH} ./sbt clean ./sbt update ./sbt assembly

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9591941.

jrydberg commented 12 years ago

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

davidbirdsong commented 12 years ago

looks like you forgot to pass the new timeout kwargs through to ConsumedTopic

Traceback (most recent call last): File "/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/greenlet.py", line 390, in run result = self._run(_self.args, *_self.kwargs) File "kafka_test.py", line 157, in consume c.subscribe('fslogger', 3.000).start(callback) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 339, in subscribe polling_interval, max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 78, in init self.connect_timeout = connect_timeout NameError: global name 'connect_timeout' is not defined

On Fri, Oct 19, 2012 at 2:52 AM, Johan Rydberg notifications@github.comwrote:

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595247.

jrydberg commented 12 years ago

try again!

On Fri, Oct 19, 2012 at 12:09 PM, david birdsong notifications@github.comwrote:

looks like you forgot to pass the new timeout kwargs through to ConsumedTopic

Traceback (most recent call last): File "/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/greenlet.py",

line 390, in run result = self._run(_self.args, *_self.kwargs) File "kafka_test.py", line 157, in consume c.subscribe('fslogger', 3.000).start(callback) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 339, in subscribe polling_interval, max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 78, in init self.connect_timeout = connect_timeout NameError: global name 'connect_timeout' is not defined

On Fri, Oct 19, 2012 at 2:52 AM, Johan Rydberg notifications@github.comwrote:

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595247>.

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595610.

davidbirdsong commented 12 years ago

i passed in a 10 second read timeout..definitely looks better.

On Fri, Oct 19, 2012 at 3:12 AM, Johan Rydberg notifications@github.comwrote:

try again!

On Fri, Oct 19, 2012 at 12:09 PM, david birdsong notifications@github.comwrote:

looks like you forgot to pass the new timeout kwargs through to ConsumedTopic

Traceback (most recent call last): File

"/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/greenlet.py",

line 390, in run result = self._run(_self.args, *_self.kwargs) File "kafka_test.py", line 157, in consume c.subscribe('fslogger', 3.000).start(callback) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 339, in subscribe polling_interval, max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 78, in init self.connect_timeout = connect_timeout NameError: global name 'connect_timeout' is not defined

On Fri, Oct 19, 2012 at 2:52 AM, Johan Rydberg notifications@github.comwrote:

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595247>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595610>.

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595662.

jrydberg commented 12 years ago

are you pulling a lot of data? that's quite a long read timeout for something that should happen pretty much right away.

it isn't really related to the poll interval, as I said earlier. the "fetch" call to kafka should return right away. but you seem to need up to 10 seconds to get the data.

that's kinda odd.

On Fri, Oct 19, 2012 at 12:47 PM, david birdsong notifications@github.comwrote:

i passed in a 10 second read timeout..definitely looks better.

On Fri, Oct 19, 2012 at 3:12 AM, Johan Rydberg notifications@github.comwrote:

try again!

On Fri, Oct 19, 2012 at 12:09 PM, david birdsong notifications@github.comwrote:

looks like you forgot to pass the new timeout kwargs through to ConsumedTopic

Traceback (most recent call last): File

"/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/greenlet.py",

line 390, in run result = self._run(_self.args, *_self.kwargs) File "kafka_test.py", line 157, in consume c.subscribe('fslogger', 3.000).start(callback) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 339, in subscribe polling_interval, max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 78, in init self.connect_timeout = connect_timeout NameError: global name 'connect_timeout' is not defined

On Fri, Oct 19, 2012 at 2:52 AM, Johan Rydberg < notifications@github.com>wrote:

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595247>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595610>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595662>.

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9596499.

davidbirdsong commented 12 years ago

my consumer reports that i'm grabbing ~2500 messages that total about 39MB. so yeah, that is a lot.

i don't see what knob one has to tune on the consumer side to reduce the msg batch sizes. is this something the producer is affecting?

On Fri, Oct 19, 2012 at 4:09 AM, Johan Rydberg notifications@github.comwrote:

are you pulling a lot of data? that's quite a long read timeout for something that should happen pretty much right away.

it isn't really related to the poll interval, as I said earlier. the "fetch" call to kafka should return right away. but you seem to need up to 10 seconds to get the data.

that's kinda odd.

On Fri, Oct 19, 2012 at 12:47 PM, david birdsong notifications@github.comwrote:

i passed in a 10 second read timeout..definitely looks better.

On Fri, Oct 19, 2012 at 3:12 AM, Johan Rydberg notifications@github.comwrote:

try again!

On Fri, Oct 19, 2012 at 12:09 PM, david birdsong notifications@github.comwrote:

looks like you forgot to pass the new timeout kwargs through to ConsumedTopic

Traceback (most recent call last): File

"/home/david/lib64/python/gevent-0.13.6-py2.6-linux-x86_64.egg/gevent/greenlet.py",

line 390, in run result = self._run(_self.args, *_self.kwargs) File "kafka_test.py", line 157, in consume c.subscribe('fslogger', 3.000).start(callback) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 339, in subscribe polling_interval, max_size) File "/usr/lib/python2.6/site-packages/gevent_kafka/consumer.py", line 78, in init self.connect_timeout = connect_timeout NameError: global name 'connect_timeout' is not defined

On Fri, Oct 19, 2012 at 2:52 AM, Johan Rydberg < notifications@github.com>wrote:

try the new "timeouts" branch. you can now pass connect_timeout and read_timeout to Consumer and Producer.

it is not tested at all though (not even executed once :))

— Reply to this email directly or view it on GitHub<

https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595247>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595610>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9595662>.

— Reply to this email directly or view it on GitHub< https://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9596499>.

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9597191.

davidbirdsong commented 12 years ago

i was off by 10x and my calc is hacky. i'm simply adding the length of each msg together per msg batch. for some reason this never matches what the protocol layer reports as what has been delivered over the wire. no matter, i lower the max_size to 1/10 the default and timeouts stop.

why would a large fetch cause a timeout? seems that if bytes are passing, a timeout shouldn't happen at all.

jrydberg commented 12 years ago

it might stem from that we're doing a socket.read(N) where N is the expected payload size of the response. it that takes longer than T, it times out. we should probably chunk the reads instead, that way we do not impose a bandwidth limit (or a lower limit).

but it is quite mysterious that you cannot read 1MB (default max_size) in less than 5 seconds. what's your bandwidth to the kafka broker?

On Fri, Oct 19, 2012 at 9:26 PM, david birdsong notifications@github.comwrote:

i was off by 10x and my calc is hacky. i'm simply adding the length of each msg together per msg batch. for some reason this never matches what the protocol layer reports as what has been delivered over the wire. no matter, i lower the max_size to 1/10 the default and timeouts stop.

why would a large fetch cause a timeout? seems that if bytes are passing, a timeout shouldn't happen at all.

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9615023.

jedsmith commented 12 years ago

it might stem from that we're doing a socket.read(N) where N is the expected payload size of the response.

You definitely don't want to pass a large value to recv on a socket.socket, if that's what you're doing. That value interacts with network buffers, and the Python documentation suggests a small power of 2.

Looking in Int32Protocol, I see strange values for recv in a couple spots. It might seem like a convenient "only give me this many bytes" knob, but you don't want to use recv's argument that way; I'd rearchitect to use .recv(4096), and read what you need off of it (cStringIO.StringIO might be useful here). I see in Dave's original traceback that we're recving 4 bytes, so I don't know ... might help.

The bandwidth to the Kafka broker is, for intents and purposes, low-latency gigabit and shouldn't matter. (I work with Dave, to answer your next question. I'm just following along.)

jrydberg commented 12 years ago

agreed.

but i think it doesn't really explain the timeout anyhow.

On Sun, Oct 21, 2012 at 5:33 AM, Jed Smith notifications@github.com wrote:

it might stem from that we're doing a socket.read(N) where N is the expected payload size of the response.

You definitely don't want to pass a large value to recv on a socket, if that's what you're doing. That value interacts with network buffers, and the Python documentationhttp://docs.python.org/library/socket.html#socket.socket.recvsuggests a small power of 2.

Looking in Int32Protocolhttps://github.com/jrydberg/gevent-kafka/blob/master/gevent_kafka/protocol.py#L157, I see strange values for recv in a couple spots. It might seem like a convenient "only give me this many bytes" knob, but you don't want to use recv's argument that way; I'd rearchitect to use .recv(4096), and read what you need off of it (cStringIO.StringIO might be useful here).

The bandwidth to the Kafka broker is, for intents and purposes, low-latency gigabit and shouldn't matter. (I work with Dave, to answer your next question. I'm just following along.)

— Reply to this email directly or view it on GitHubhttps://github.com/jrydberg/gevent-kafka/issues/2#issuecomment-9639381.