confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
124 stars 896 forks source link

Error in the execution of a confluent-kafka-python producer on spark for Kafka #485

Open abigbigbird opened 6 years ago

abigbigbird commented 6 years ago

It was well for me when i use it in pyspark

Python 2.7.5 (default, Apr 11 2018, 07:36:10) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from confluent_kafka import Producer
>>> p = Producer({'bootstrap.servers': '10.10.10.10:11236'})
>>> p.poll(0)
0
>>> 
>>> p.produce('sherlock_spark_sql', 'test'.encode('utf-8'))
>>> p.flush()
0
>>> 

but when i run it on spark, the command is

spark-submit --jars /usr/home/hadoop/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar --py-files /usr/home/hadoop/sherlock/sherlock.zip /usr/home/hadoop/sherlock/sherlock/analyze/mysql_streaming.py

There is an error:

Traceback (most recent call last):
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 780, in save_reduce
    save(cls)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 639, in save_global
    self.save_dynamic_class(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 476, in save_dynamic_class
    save(clsdict)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 718, in save_classmethod
    self.save_reduce(type(obj), (orig_func,), obj=obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 786, in save_reduce
    save(args)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 630, in save_global
    __import__(modname)
ImportError: No module named cimpl
edenhill commented 6 years ago

Please fill out the checklist:

Checklist

Please provide the following information:

abigbigbird commented 6 years ago

confluent_kafka.version(0.11.6) confluent_kafka.libversion(0.11.6) Apache Kafka broker version: 0.11.0.0 Client configuration:kafka_prod = Producer({'bootstrap.servers': '10.10.10.10:19000'}) Operating system(CentOS Linux release 7.3.1611) Spark version(spark-2.3.2-bin-hadoop2.7) Critical issue(maybe): pickle.PicklingError

File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 630, in save_global
    __import__(modname)
ImportError: No module named cimpl
edenhill commented 6 years ago

How did you install confluent-kafka? The binary wheels for your platform or by installing librdkafka separately?

abigbigbird commented 6 years ago

I install librdkafka separately, then I reinstall again with pip install confluent-kafka, and install successed。Maybe there is some files i have not clean? Sorry, i have no idea for how uninstall librdkafka, so i just renamed the confluent-kafka-python-0.11.6 directory before 'pip install confluent-kafka'.

edenhill commented 6 years ago

I suggest removing the installed librdkafka, just to keep it clean, and then using a binary wheel installation of confluent-kafka

abigbigbird commented 6 years ago

Ths, i would try it.

abigbigbird commented 6 years ago

It doesn't work.....

[root@h22554 ~]# pip install confluent-kafka
Collecting confluent-kafka
  Downloading https://files.pythonhosted.org/packages/76/8c/f98574a41aefd0c9eb9c57336631de853f9b18abcbf624185b1553e63cab/confluent_kafka-0.11.6-cp27-cp27mu-manylinux1_x86_64.whl (3.9MB)
    100% |████████████████████████████████| 3.9MB 33kB/s 
Requirement already satisfied: futures in /usr/lib64/python2.7/site-packages/futures-3.2.0-py2.7.egg (from confluent-kafka) (3.2.0)
Requirement already satisfied: enum34 in /usr/lib/python2.7/site-packages (from confluent-kafka) (1.1.6)
Installing collected packages: confluent-kafka
Successfully installed confluent-kafka-0.11.6
Traceback (most recent call last):
  File "/usr/home/hadoop/sherlock/sherlock/analyze/mysql_streaming.py", line 457, in <module>
    sherlock.run()
  File "/usr/home/hadoop/sherlock/sherlock/analyze/mysql_streaming.py", line 449, in run
    ssc.awaitTermination()
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", line 206, in awaitTermination
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2472, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2405, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2391, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 575, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "/usr/home/hadoop/spark-2.3.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: ImportError: No module named cimpl
mauliksoneji commented 5 years ago

Any update on this? @abigbigbird were you able to fix this. I am facing the same issue

rnpridgeon commented 5 years ago

In the end I think confluent-kafka-python simply cannot be reliably pickled. As a result it can't be distributed properly across the cluster. Acorrding to the pyspark docs c libraries such as numpy can be used without issue but its unclear at this point what makes its so difficult to pickle the extension code. We will try to dig into it more.

edenhill commented 5 years ago

Is this really a pickling problem of confluent_kafka? It looks more like the required python dependencies are not properly bundled with for the spark job.

mlepicka commented 5 years ago

I'm also struggling with confluent-kafka with sparks. But I have a different ERROR:

File ~/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "~/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "~/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "~/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads return pickle.loads(obj, encoding=encoding) AttributeError: type object 'Producer' has no attribute '__len__'

pyspark 2.4.3 confluent-kafka 1.1.0 spark 2.4.3

dmitrypasternak commented 4 years ago

Any updates on this? Also have the same issue using it on spark cluster: python 3.6.9 confluent-kafka 1.3.0 spark 2.4.0 confluent-kafka was installed on each spark worker

When create Producer() for each partition in RDD.

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 359, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: type object 'Producer' has no attribute '__len__'
SivaAccionLabs commented 4 years ago

I have been struggling with confluent-kafka with sparks on azure databricks. Below is the error I am getting while producing the messages to the Kafka.

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 182, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 695, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: type object 'Producer' has no attribute '__len__'
edenhill commented 4 years ago

Pickling of confluent-kafka-python is currently not supported, it is missing a reduce method and Pickle seems to not properly handle len has a C extension method.

edenhill commented 4 years ago

A workaround could be to create your own producer class that instantiates a producer based on a config dict, and simply have the config dict be pickled (not the Producer object itself).

bebound commented 3 years ago

I'm facing the same problem. Currently, it's not possible to use confluent-kafka in spark executor.

confluent-kafka 1.5.0 Error message when trying to create Consumer in executor:

File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 700, in loads
return pickle.loads(obj, encoding=encoding)
AttributeError: type object 'Consumer' has no attribute 'subscribe'
bebound commented 3 years ago

A workaround could be to create your own producer class that instantiates a producer based on a config dict, and simply have the config dict be pickled (not the Producer object itself).

Init Producer in the map function will also raise no attribute '__len__' error.

def dummy_func(*args):
    a=create_kafka_producer()
    return 1

dummy = sc.parallelize([1])
dummy.map(dummy_func).count
bebound commented 3 years ago

edenhill 's solution is correct. But the import statement should also be moved. Put from confluent_kafka import Producer inside create_kafka_producer() works fine. In contract, the error will be raised if this line is in global.

Celestine47 commented 3 years ago

error occurs when use pyspark 2.4.0, Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark-2.4.7-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/usr/lib/spark-2.4.7-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/usr/lib/spark-2.4.7-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/usr/lib/spark-2.4.7-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 587, in loads return pickle.loads(obj, encoding=encoding) AttributeError: type object 'Producer' has no attribute '__len__'