dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.63k stars 1.41k forks source link

kafka message queue backend is not working with gevent socket code. #2325

Open neerajdlh74 opened 2 years ago

neerajdlh74 commented 2 years ago

I needed to use kafka as backend message queue with the gevent web-socket, but it is not working, and giving error, i checked that kafka topics and brokers are working properly, but don't know where the problem could be. please someone help me. i used python-socketio framework for web-socket.

error i am getting on running server1.py is:

Traceback (most recent call last):
  File "server1.py", line 16, in <module>
    mgr = socketio.KafkaManager(url=url, channel='bingo.sockets')
  File "/usr/local/lib/python3.6/site-packages/socketio/kafka_manager.py", line 52, in __init__
    self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls)
  File "/usr/local/lib/python3.6/site-packages/kafka/producer/kafka.py", line 383, in __init__
    **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 208, in __init__
    self._selector = self.config['selector']()
  File "/usr/lib64/python3.6/selectors.py", line 399, in __init__
    self._epoll = select.epoll()
AttributeError: module 'select' has no attribute 'epoll'
Exception ignored in: <bound method KafkaClient.__del__ of <kafka.client_async.KafkaClient object at 0x7fed7ed59b00>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 443, in __del__
    self._close()
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 421, in _close
    self._selector.close()
AttributeError: 'KafkaClient' object has no attribute '_selector'

here i am pasting server1.py code:

import socketio
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey
monkey.patch_all()

url=['kafka://127.0.0.1:9092', 'kafka://127.0.0.2:9092', 'kafka://127.0.0.3:9092']
mgr = socketio.KafkaManager(url=url, channel='sockets')
sio = socketio.Server(async_mode='gevent', client_manager=mgr, logger=True, engineio_logger=True)
app = socketio.WSGIApp(sio)

@sio.event
def connect(sid, environ):
      print('connected')
      sio.emit('message', {'foo':'bar'})

@sio.event
def message(sid, data):
      print('msg', data)

@sio.event
def disconnect(sid):
      print('disconnect')

if __name__=='__main__':
    pywsgi.WSGIServer(('0.0.0.0', 8000), app, handler_class=WebSocketHandler).serve_forever()

Here is the version details of the libraries currently I am using right now:

Python 3.6.8

gevent==1.2.2
gevent-websocket==0.10.1
eventlet==0.33.1
kafka-python==2.0.2
python-engineio==4.3.2
python-socketio==5.6.0
websocket-client==1.3.1
websockets==9.1
greenlet==1.1.2
asyncio==3.4.3
dabbler0606 commented 1 year ago

You must put gevent.monkey.patch_all() on top

Please try

import gevent.monkey
gevent.monkey.patch_all()

import socketio
...