dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.58k stars 1.4k forks source link

SSLWantReadError: The operation did not complete (read) #1535

Open zhgjun opened 6 years ago

zhgjun commented 6 years ago

hi, I use kafka-python-1.4.2 to connect the kafka_2.11-0.11.0.1, and i use SSL_SASL mode as security choose. I got ERROR as below, but if i remove the eventlet.monkey_patch() , every things get ok. Is anyone knew why?

this is my dev info: python version is 2.7.5 (centos)Linux version 3.10.0-514.44.5.10_44.x86_64

and my script is: ####################################

#-*- coding: utf-8 -*-
import eventlet
import os
from kafka import KafkaConsumer
import time
import ssl
from oslo_utils import eventletutils
eventlet.monkey_patch()
import logging
from kafka.client_async import selectors
if eventletutils.is_monkey_patched('select'):
    # monkeypatch the vendored SelectSelector._select like eventlet does
    # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
    from eventlet.green import select
    selectors.SelectSelector._select = staticmethod(select.select)

    # Force to use the select selectors
    KAFKA_SELECTOR = selectors.SelectSelector
else:
    KAFKA_SELECTOR = selectors.DefaultSelector
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
def log_init(log_name, filemode='w'):
    logging.basicConfig(filename=os.path.join(os.getcwd(), log_name), level=logging.DEBUG, filemode=filemode,
                        format='%(asctime)s - %(levelname)s: %(message)s')

log_init('test')

consumer = KafkaConsumer('topic1',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'],
                         ssl_context=context,
                         auto_offset_reset="latest",
                         api_version = (0,11),
                         sasl_mechanism="PLAIN",
                         sasl_plain_username="admin",
                         sasl_plain_password="pass",
                         selector=KAFKA_SELECTOR,
                         security_protocol='SASL_SSL')
for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key,msg.value)
        logging.info(recv)

####################################

and I get the ERROR as this:

ERROR: <BrokerConnection node_id=bootstrap host=xxx:9092 <authenticating> [IPv4 ('xxx', 9092)]>: Error receiving reply from server
Traceback (most recent call last):
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 558, in _try_authenticate_plain
    data = self._recv_bytes_blocking(4)
  File "xxxx/lib/python2.7/site-packages/kafka_python-1.4.2-py2.7.egg/kafka/conn.py", line 535, in _recv_bytes_blocking
    fragment = self._sock.recv(n - len(data))
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 198, in recv
    read = self.read(buflen)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 138, in read
    super(GreenSSLSocket, self).read, *args, **kwargs)
  File "xxxx/lib/python2.7/site-packages/eventlet/green/ssl.py", line 108, in _call_trampolining
    return func(*a, **kw)
  File "/usr/lib64/python2.7/ssl.py", line 634, in read
    v = self._sslobj.read(len or 1024)
SSLWantReadError: The operation did not complete (read) (_ssl.c:1936)
zhgjun commented 6 years ago

But if I use SASL or SSL only ,it has no problem. if I use SAAL_SSL , it do have the ERROR. (kerberos not use in my env)

zhgjun commented 6 years ago

why we use _sock.settimeout in _recv_bytes_blocking (\kafka\conn.py), as the blocking way to recv.

def _recv_bytes_blocking(self, n):
        self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
        try:
            data = b''
            while len(data) < n:
                fragment = self._sock.recv(n - len(data))
                if not fragment:
                    raise ConnectionError('Connection reset during recv')
                data += fragment
            return data
        finally:
            self._sock.settimeout(0.0)

can we use _sock.setblocking(True) instead, like this:

    def _recv_bytes_blocking(self, n):
        #self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
        self._sock.setblocking(True)
        try:
            data = b''
            while len(data) < n:
                fragment = self._sock.recv(n - len(data))
                if not fragment:
                    raise ConnectionError('Connection reset during recv')
                data += fragment
            return data
        finally:
            #self._sock.settimeout(0.0)
            self._sock.setblocking(False)

we i use _sock.setblocking , my problem was solved. so maybe this is a bug ,since when here use _sock.settimeout only in _recv_bytes_blocking, it will get ERROR as I have said above.

zhgjun commented 6 years ago

@dpkp , do you know why we use we use _sock.settimeout in _recv_bytes_blocking (\kafka\conn.py), as the blocking way to recv? can we use _sock.setblocking instead?