dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

Problem using async=True #46

Closed watchforstock closed 11 years ago

watchforstock commented 11 years ago

Hi,

I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.

I have a simple script which exhibits the problem:

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

if __name__ == '__main__':

    kafka = KafkaClient("192.168.15.136", 9092)
    producer = SimpleProducer(kafka, "test", async=True)

    producer.send_messages("Hello")

The log output is as follows:

Traceback (most recent call last):
  File "kafka_test.py", line 7, in <module>
    producer = SimpleProducer(kafka, "test", async=True)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 176, in __init__
    batch_send_every_t)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 72, in __init__
    self.proc.start()
  File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "c:\python27\Lib\pickle.py", line 224, in dump
    self.save(obj)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 401, in save_reduce
    save(args)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x02D36F48>: it's not found as __main__.recvfrom_into
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "c:\python27\Lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "c:\python27\Lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "c:\python27\Lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received

mahendra commented 11 years ago

Hmmm, this looks like some issue with multiprocessing init in Windows. It is a good bug to catch. I guess we have to take care of platform specific issues w.r.t windows (as documented at http://docs.python.org/2/library/multiprocessing.html#windows)

I will work on it and let you know.

watchforstock commented 11 years ago

Thankyou - happy to test something at the appropriate time if it helps

mahendra commented 11 years ago

Looked into it further. Looks like in Windows, we cannot have methods as 'target' for Process.init() We have to define a normal function and use it. This will take some work. In the mean-time can you continue using async=False ?

watchforstock commented 11 years ago

async=False is fine for now. My deployment target is likely to be Linux anyway so no immediate rush. Thanks.

On 26 September 2013 08:39, Mahendra M notifications@github.com wrote:

Looked into it further. Looks like in Windows, we cannot have methods as 'target' for Process.init() We have to define a normal function and use it. This will take some work. In the mean-time can you continue using async=False ?

— Reply to this email directly or view it on GitHubhttps://github.com/mumrah/kafka-python/issues/46#issuecomment-25148845 .

mahendra commented 11 years ago

Have made a tentative fix. Cannot try it since I do not have windows. Would it be possible for you to try running it from this branch and let me know if it worked?

https://github.com/mahendra/kafka-python/tree/windows

watchforstock commented 11 years ago

Unfortunately it's still not going:

<type 'exceptions.Exception'>                                                                               
Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x037478E0>: it's not found as __mai
n__.recvfrom_into                                                                                           
[26/Sep/2013 10:03:45] "GET /sigs/hits/536f0932f30a45159b9b629a34659764/0/ HTTP/1.1" 200 27                 
Traceback (most recent call last):                                                                          

  File "C:\Users\astock\workspace\checklist\checklist\sigcheck\views.py", line 36, in upload                
    request.upload_handlers = [QueueUploadHandler(docid)]                                                   

  File "C:\Users\astock\workspace\checklist\checklist\sigcheck\QueueUploadHandler.py", line 27, in __init__ 
    self.producer = SimpleProducer(self.kafka, "test", async=True)#False,                                   

  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 185, in __init__          
    batch_send_every_t)                                                                                     

  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 125, in __init__          
    self.proc.start()                                                                                       

  File "c:\python27\Lib\multiprocessing\process.py", line 130, in start                                     
    self._popen = Popen(self)                                                                               

  File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__                                  
    dump(process_obj, to_child, HIGHEST_PROTOCOL)                                                           

  File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump                                      
    ForkingPickler(file, protocol).dump(obj)                                                                

  File "c:\python27\Lib\pickle.py", line 224, in dump                                                       
    self.save(obj)                                                                                          

  File "c:\python27\Lib\pickle.py", line 331, in save                                                       
    self.save_reduce(obj=obj, *rv)                                                                          

  File "c:\python27\Lib\pickle.py", line 419, in save_reduce                                                
    save(state)                                                                                             

  File "c:\python27\Lib\pickle.py", line 286, in save                                                       
    f(self, obj) # Call unbound method with explicit self    

  File "c:\python27\Lib\pickle.py", line 649, in save_dict       
    self._batch_setitems(obj.iteritems())                        

  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems 
    save(v)                                                      

  File "c:\python27\Lib\pickle.py", line 286, in save            
    f(self, obj) # Call unbound method with explicit self        

  File "c:\python27\Lib\pickle.py", line 562, in save_tuple      
    save(element)                                                

  File "c:\python27\Lib\pickle.py", line 331, in save            
    self.save_reduce(obj=obj, *rv)                               

  File "c:\python27\Lib\pickle.py", line 419, in save_reduce     
    save(state)                                                  

  File "c:\python27\Lib\pickle.py", line 286, in save            
    f(self, obj) # Call unbound method with explicit self        

  File "c:\python27\Lib\pickle.py", line 649, in save_dict       
    self._batch_setitems(obj.iteritems())                        

  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems 
    save(v)                                                      

  File "c:\python27\Lib\pickle.py", line 286, in save            
    f(self, obj) # Call unbound method with explicit self        

  File "c:\python27\Lib\pickle.py", line 649, in save_dict       
    self._batch_setitems(obj.iteritems())                        

  File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems 
    save(v)                                                      

  File "c:\python27\Lib\pickle.py", line 331, in save            
    self.save_reduce(obj=obj, *rv)                               

  File "c:\python27\Lib\pickle.py", line 419, in save_reduce     
    save(state)                                                  

  File "c:\python27\Lib\pickle.py", line 286, in save            
    f(self, obj) # Call unbound method with explicit self        

  File "c:\python27\Lib\pickle.py", line 649, in save_dict       
    self._batch_setitems(obj.iteritems())                        

  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems                                            
    save(v)                                                                                                 

  File "c:\python27\Lib\pickle.py", line 331, in save                                                       
    self.save_reduce(obj=obj, *rv)                                                                          

  File "c:\python27\Lib\pickle.py", line 419, in save_reduce                                                
    save(state)                                                                                             

  File "c:\python27\Lib\pickle.py", line 286, in save                                                       
    f(self, obj) # Call unbound method with explicit self                                                   

  File "c:\python27\Lib\pickle.py", line 548, in save_tuple                                                 
    save(element)                                                                                           

  File "c:\python27\Lib\pickle.py", line 286, in save                                                       
    f(self, obj) # Call unbound method with explicit self                                                   

  File "c:\python27\Lib\pickle.py", line 649, in save_dict                                                  
    self._batch_setitems(obj.iteritems())                                                                   

  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems                                            
    save(v)                                                                                                 

  File "c:\python27\Lib\pickle.py", line 286, in save                                                       
    f(self, obj) # Call unbound method with explicit self                                                   

  File "c:\python27\Lib\pickle.py", line 748, in save_global                                                
    (obj, module, name))                                                                                    

PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x037478E0>: it's not
 found as __main__.recvfrom_into                                                                            

Traceback (most recent call last):                                                                          
  File "<string>", line 1, in <module>                                                                      
  File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main                                      
    self = load(from_parent)                                                                                
  File "c:\python27\Lib\pickle.py", line 1378, in load                                                      
    return Unpickler(file).load()                                                                           
  File "c:\python27\Lib\pickle.py", line 858, in load                                                       
    dispatch[key](self)                                                                                     
  File "c:\python27\Lib\pickle.py", line 880, in load_eof                                                   
    raise EOFError                                                                                          
EOFError                                                                                                    
mumrah commented 11 years ago

@sweetcheeks24 also had this issue on Windows. I'm marking as wontfix since I'm not really sure there's anything we can do about it.

If you're on Windows, don't use async I guess :-/

mahendra commented 11 years ago

Checked in detail. this can be fixed, but we will hold on it till the zookeeper branch is checked in. Some of those patches will be required for it.

mahendra commented 11 years ago

@mumrah - we have to be careful of this feature. If multiprocessing is a problem in windows, things like MultiProcessConsumer will not work. We need to address this issue.

mahendra commented 11 years ago

@watchforstock can you give my branch one more try. I have tried to ensure that the following feature will work in windows

Give a quick check. If things work, I will send a merge request to @mumrah

watchforstock commented 11 years ago

Looks like I'm still getting the same error unfortunately about not being able to pickle method recvfrom_into of _socket.socket

mahendra commented 11 years ago

oh! ok. let me look into it. a trace would be helpful.

i will try and find myself a windows box for debugging. If I can't, I will keep bothering you.. :-)

sweetcheeks24 commented 11 years ago

I was able to get around the issue by switching multiprocessing.Process to threading.Thread. I am not sure of the overall impact that may have as I am only using the producer in my efforts for integrating with tcollector. (I should also add that I have not actually successfully produced anything since I am troubleshooting other tcollector/windows/python compatibility issues).

self.proc = Process(target=self._send_upstream, args=(self.queue,))

self.proc = Thread(target=self._send_upstream, args=(self.queue,))

mahendra commented 11 years ago

Thread will not exactly solve the problem. This module was for folks who did not want to use threads. I have new feature adding driver support to kafka, where you can switch between gevent, thread and process. Coming up soon.

Meanwhile, I figured out this problem (I hope). The issue is that the socket object is not pickle-able. So, I have written some hacks for it and pushed it to my "windows" branch. Could you give it one more try please?

Sorry about this. The branch is available here: https://github.com/mahendra/kafka-python/tree/windows

sweetcheeks24 commented 11 years ago

Not sure if it is just my setup, but I got this:

Process Process-1: Traceback (most recent call last): File "c:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() File "c:\Python27\lib\multiprocessing\process.py", line 114, in run self._target(_self._args, *_self._kwargs) File "F:\collectors\lib\kafka\producer.py", line 29, in _send_upstream client.reinit() AttributeError: 'NoneType' object has no attribute 'reinit'

watchforstock commented 11 years ago

Thanks for your efforts - I'll give it a try once I'm back in the office on Monday

Thanks

On 4 October 2013 17:07, Mahendra M notifications@github.com wrote:

Thread will not exactly solve the problem. This module was for folks who did not want to use threads. I have new feature adding driver support to kafka, where you can switch between gevent, thread and process. Coming up soon.

Meanwhile, I figured out this problem (I hope). The issue is that the socket object is not pickle-able. So, I have written some hacks for it and pushed it to my "windows" branch. Could you give it one more try please?

Sorry about this. The branch is available here: https://github.com/mahendra/kafka-python/tree/windows

— Reply to this email directly or view it on GitHubhttps://github.com/mumrah/kafka-python/issues/46#issuecomment-25710364 .

mahendra commented 11 years ago

Actually, it was my mistake. I had not tested it thoroughly.

Have fixed it. Do test it once more when you have time. Will try testing it myself as well.

On Sat, Oct 5, 2013 at 12:51 AM, sweetcheeks24 notifications@github.comwrote:

Not sure if it is just my setup, but I got this:

Process Process-1:

Traceback (most recent call last): File "c:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() File "c:\Python27\lib\multiprocessing\process.py", line 114, in run self._target(_self._args, *_self._kwargs) File "F:\collectors\lib\kafka\producer.py", line 29, in _send_upstream client.reinit() AttributeError: 'NoneType' object has no attribute 'reinit'

— Reply to this email directly or view it on GitHubhttps://github.com/mumrah/kafka-python/issues/46#issuecomment-25723855 .

Mahendra

http://twitter.com/mahendra

watchforstock commented 11 years ago

The error has changed now so definitely making progress:

Traceback (most recent call last):                                                                          
  File "c:\python27\Lib\multiprocessing\process.py", line 258, in _bootstrap                                
    self.run()                                                                                              
  File "c:\python27\Lib\multiprocessing\process.py", line 114, in run                                       
    self._target(*self._args, **self._kwargs)                                                               
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 29, in _send_upstream     
    client.reinit()                                                                                         
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\client.py", line 197, in reinit              
    conn.reinit()                                                                                           
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\conn.py", line 118, in reinit                
    self._sock.close()                                                                                      
AttributeError: 'NoneType' object has no attribute 'close'                                                  

It appears that although the code runs, no messages are actually making it to the Kafka broker

mahendra commented 11 years ago

I have done some testing from my side and made a pull request for this #61.

@watchforstock if you confirm that it works in windows, I will merge the ticket.

Veereshsajjan commented 9 years ago

HI , I am using modified tail2kafka which is log2 kafka when i run i get the below error Traceback (most recent call last): File "log2kafka.py", line 46, in main() File "log2kafka.py", line 40, in main producer = kafka.producer.Producer(options.topic,options.host,int(options.port)) File "/home/nandhu/kafka/kafka-storm-cassandra-master/kafka/producer.py", line 29, in init self.connect() File "/home/nandhu/kafka/kafka-storm-cassandra-master/kafka/io.py", line 21, in connect self.socket.connect((self.host, self.port)) File "/usr/lib/python2.7/socket.py", line 224, in meth return getattr(self._sock,name)(*args) TypeError: coercing to Unicode: need string or buffer, int found

my Producer is import atexit import contextlib import itertools import struct import threading

import kafka.io import kafka.request_type

class Producer(kafka.io.IO): """Class for sending data to a Kafka <http://sna-projects.com/kafka/>_ broker.

:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
:param max_message_sz: The maximum allowed size of a produce request (in bytes). [default: 1MB]

"""

PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

def init(self, topic, partition=3, host='localhost', port=9092, max_message_sz=104): kafka.io.IO.init(self, host, port) self.max_message_sz = max_message_sz self.topic = topic self.partition = partition self.connect()

def _pack_payload(self, messages): """Pack a list of messages into a sendable buffer.

  :param msgs: The packed messages to send.
  :param size: The size (in bytes) of all the `messages` to send.

"""
payload = ''.join(messages)
payload_sz = len(payload)
topic_sz = len(self.topic)
# Create the request as::
#   <REQUEST_ID: short>
#   <TOPIC_SIZE: short>
#   <TOPIC: bytes>
#   <PARTITION: int>
#   <BUFFER_SIZE: int>
#   <BUFFER: bytes>
return struct.pack(
  '>HH%dsii%ds' % (topic_sz, payload_sz),
  self.PRODUCE_REQUEST_ID,
  topic_sz,
  self.topic,
  self.partition,
  payload_sz,
  payload
)

def _pack_kafka_message(self, payload): """Pack a payload in a format kafka expects.""" return struct.pack('>i%ds' % len(payload), len(payload), payload)

def encode_request(self, messages): """Encode a sequence of messages for sending to a kafka broker.

  Encoding a request can yeild multiple kafka messages if the payloads exceed
  the maximum produce size.

  :param messages: An iterable of :class:`Message <kafka.message>` objecjts.
  :rtype: A generator of packed kafka messages ready for sending.

"""
encoded_msgs = []
encoded_msgs_sz = 0
for message in messages:
  encoded = message.encode()
  encoded_sz = len(encoded)
  if encoded_sz + encoded_msgs_sz > self.max_message_sz:
    yield self._pack_kafka_message(self._pack_payload(encoded_msgs))
    encoded_msgs = []
    encoded_msgs_sz = 0
  msg = struct.pack('>i%ds' % encoded_sz, encoded_sz, encoded)
  encoded_msgs.append(msg)
  encoded_msgs_sz += encoded_sz
if encoded_msgs:
  yield self._pack_kafka_message(self._pack_payload(encoded_msgs))

def send(self, messages): """Send a :class:Message <kafka.message> or a sequence of Messages to the Kafka server.""" if isinstance(messages, kafka.message.Message): messages = [messages] for message in self.encode_request(messages): sent = self.write(message) if sent != len(message): raise IOError('Failed to send kafka message - sent %s/%s many bytes.' % (sent, len(message)))

@contextlib.contextmanager def batch(self): """Send messages with an implict send.""" messages = [] yield(messages) self.send(messages)

class BatchProducer(Producer): """Class for batching messages to a Kafka <http://sna-projects.com/kafka/>_ broker with periodic flushing.

:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.

"""

MAX_RESPAWNS = 5 PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

def init(self, topic, batch_interval, partition=3, host='localhost', port=9092): Producer.init(self, topic, partition=partition, host=host, port=port) self.batch_interval = batch_interval self._message_queue = [] self.event = threading.Event() self.lock = threading.Lock() self.timer = None atexit.register(self.close) self.respawns = 0

def check_timer(self): """Starts the flush timer and restarts it after forks.""" if (self.timer and self.timer.is_alive()) or self.respawns > self.MAX_RESPAWNS: return self.respawns += 1 self.timer = threading.Thread(target=self._interval_timer) self.timer.daemon = True self.timer.start() self.connect()

def _interval_timer(self): """Flush the message queue every batch_interval seconds.""" while not self.event.is_set(): self.flush() self.event.wait(self.batch_interval)

def enqueue(self, message): """Enqueue a message in the queue.

  .. note:: These messages are implicitly sent every `batch_interval` seconds.

  :param message: The message to queue for sending at next send interval.

"""
with self.lock:
  self.check_timer()
  self._message_queue.append(message)

def flush(self): """Send all messages in the queue now.""" with self.lock: if len(self._message_queue) > 0: self.send(self._message_queue)

Reset the queue

    del self._message_queue[:]

def close(self): """Shutdown the timer thread and flush the message queue.""" self.event.set() self.flush() if self.timer: self.timer.join()