crflynn / pbspark

protobuf pyspark conversion
MIT License
21 stars 5 forks source link

Could not serialize object #26

Closed ttelfer closed 2 years ago

ttelfer commented 2 years ago

Been having some problem getting pbspark to work.

I get the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/lib/python3.10/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.10/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'google._upb._message.Descriptor' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/vscode/.vscode-server/extensions/ms-python.python-2022.6.3/pythonFiles/lib/python/debugpy/__main__.py", line 45, in <module>
    cli.main()
  File "/home/vscode/.vscode-server/extensions/ms-python.python-2022.6.3/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 444, in main
    run()
  File "/home/vscode/.vscode-server/extensions/ms-python.python-2022.6.3/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 285, in run_file
    runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
  File "/usr/local/lib/python3.10/runpy.py", line 269, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/local/lib/python3.10/runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/workspace/test_proto.py", line 61, in <module>
    dfs = df.select(mc.from_protobuf(df.value, SimpleMessage).alias("value"))
  File "/usr/local/lib/python3.10/site-packages/pbspark/_proto.py", line 316, in from_protobuf
    return protobuf_decoder_udf(column)
  File "/usr/local/lib/python3.10/site-packages/pyspark/sql/udf.py", line 199, in wrapper
    return self(*args)
  File "/usr/local/lib/python3.10/site-packages/pyspark/sql/udf.py", line 177, in __call__
    judf = self._judf
  File "/usr/local/lib/python3.10/site-packages/pyspark/sql/udf.py", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/local/lib/python3.10/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/usr/local/lib/python3.10/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/local/lib/python3.10/site-packages/pyspark/rdd.py", line 2816, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/local/lib/python3.10/site-packages/pyspark/serializers.py", line 447, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle 'google._upb._message.Descriptor' object

Using the following code:

from pyspark import SparkContext
from pyspark.serializers import CloudPickleSerializer
from pyspark.sql.functions import struct
from pyspark.sql.session import SparkSession

from pb.simple_pb2 import SimpleMessage
from pbspark._proto import MessageConverter

sc = SparkContext(serializer=CloudPickleSerializer())
spark = SparkSession(sc).builder.getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "UTC")

example = SimpleMessage(
    name="test",
    quantity=5,
    measure=5.5
)

e = example.SerializeToString()
p = SimpleMessage()
p.ParseFromString(e)

print(p)

data = [{"value": example.SerializeToString()}]

df = spark.createDataFrame(data)  # type: ignore[type-var]
df.show()
df.schema

df.printSchema()

# Everything above works

mc = MessageConverter()
s = SimpleMessage()

dfs = df.select(mc.from_protobuf(df.value, SimpleMessage).alias("value"))
df_again = dfs.select(mc.to_protobuf(dfs.value, SimpleMessage).alias("value"))
df_again.show()

simple.proto

syntax = "proto3";

package pb;

message SimpleMessage {
  string name = 1;
  int64 quantity = 2;
  float measure = 3;
}

requirements.txt

certifi==2022.5.18.1
charset-normalizer==2.0.12
click==8.1.3
cloudpickle==2.1.0
croniter==1.3.5
dask==2022.5.2
delta-spark==1.2.1
deltalake==0.5.7
distributed==2022.5.2
docker==5.0.3
duckdb==0.3.4
fsspec==2022.5.0
HeapDict==1.0.1
idna==3.3
importlib-metadata==4.11.4
importlib-resources==5.7.1
Jinja2==3.1.2
kafka-python==2.0.2
locket==1.0.0
MarkupSafe==2.1.1
marshmallow==3.16.0
marshmallow-oneofschema==3.0.1
msgpack==1.0.4
mypy==0.961
mypy-extensions==0.4.3
mypy-protobuf==3.2.0
numpy==1.22.4
packaging==21.3
pandas==1.4.2
partd==1.2.0
pbspark==0.5.0
pendulum==2.1.2
polars==0.13.42
prefect==1.2.2
protobuf==4.21.1
psutil==5.9.1
py4j==0.10.9.3
pyarrow==8.0.0
pyparsing==3.0.9
pyspark==3.2.1
python-box==6.0.2
python-dateutil==2.8.2
python-slugify==6.1.2
pytz==2022.1
pytzdata==2020.1
PyYAML==6.0
requests==2.27.1
six==1.16.0
sortedcontainers==2.4.0
tabulate==0.8.9
tblib==1.7.0
text-unidecode==1.3
toml==0.10.2
tomli==2.0.1
toolz==0.11.2
tornado==6.1
types-protobuf==3.19.21
typing_extensions==4.2.0
urllib3==1.26.9
websocket-client==1.3.2
zict==2.2.0
zipp==3.8.0

protoc version 21.1

crflynn commented 2 years ago

can you post your simple_pb2.py file?

ttelfer commented 2 years ago
vscode ➜ /workspace $ /usr/local/bin/protoc --version
libprotoc 3.21.1
vscode ➜ /workspace $ cd pb
vscode ➜ /workspace/pb $ /usr/local/bin/protoc -I . --python_out=. --pyi_out=. --proto_path=. ./*.proto

simple_pb2.py

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: simple.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()

DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0csimple.proto\x12\x02pb\"@\n\rSimpleMessage\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08quantity\x18\x02 \x01(\x03\x12\x0f\n\x07measure\x18\x03 \x01(\x02\x62\x06proto3')

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'simple_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  _SIMPLEMESSAGE._serialized_start=20
  _SIMPLEMESSAGE._serialized_end=84
# @@protoc_insertion_point(module_scope)
crflynn commented 2 years ago

I'm not able to reproduce this so far. I have a local environment using

protoc 21.1
java adoptopenjdk-8.0.275+1
python 3.10.5

and using the code and requirements.txt you've submitted here.

When I run it myself, I don't hit that pickle error, however I do get another error that looks like this:

22/06/06 17:38:18 ERROR TaskSetManager: Task 2 in stage 5.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/Users/flynn/projects/pbsparktest/code.py", line 40, in <module>
    df_again.show()
  File "/Users/flynn/projects/pbsparktest/.venv/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 494, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/Users/flynn/projects/pbsparktest/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/flynn/projects/pbsparktest/.venv/lib/python3.10/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/flynn/projects/pbsparktest/.venv/lib/python3.10/site-packages/pbspark/_proto.py", line -1, in decoder
TypeError: expected bytes, bytearray found

which is easy to fix. Once fixed, I'm able to run the code you've posted fine. I also tried removing the CloudPickleSerializer from the SparkContext, and it still worked.

Would it be possible to set up a repo with an example project and some commands that would reproduce easily?

ttelfer commented 2 years ago

Here you go:

https://github.com/ttelfer/spark_proto

crflynn commented 2 years ago

I think I figured it out. The problem is how you are generating your proto files, and then subsequently referencing them in python.

Note that in this project, when we generate using protoc in the Makefile we run something like this

poetry run protoc -I . --python_out=. --mypy_out=. --proto_path=. ./example/*.proto

We run this from the root directory and the resulting pb2 file creates messages with a reference to the fully qualified module here: https://github.com/crflynn/pbspark/blob/387eb74fc578145077117a976234e758510d5f5c/example/example_pb2.py#L23

When you are running your protoc you are running it from within the pb folder, which results in the fully qualified module being incorrect here: https://github.com/ttelfer/spark_proto/blob/02580c88e16245337ba2fc246e2bc67b5c7b3612/pb/simple_pb2.py#L19 which differs from how it's referenced in business logic here: https://github.com/ttelfer/spark_proto/blob/02580c88e16245337ba2fc246e2bc67b5c7b3612/test_proto.py#L6 Note the simple_pb2 vs pb.simple_pb2 whereas in the pbspark repo both are example.example_pb2.

This reference is important when it comes to pickling. Since this module reference is wrong it's not able to pass the pyspark udf down to the workers via pickling because it can't reference the message in the same way you would import it.

If you change that module reference to pb.simple_pb2, or rather regenerate your pb2 files by invoking protoc from the root directory, I think it should work. Although you will probably run into the bytearray bug I found above due to the newer version of protobuf. If you upgrade to 0.5.1 it should work.

ttelfer commented 2 years ago

@crflynn running protoc as you suggested fixed the problem Thank you for taking a look.

Can I buy you a coffee?

crflynn commented 2 years ago

No problem, and thanks for helping out with posting your project code.

I appreciate the offer; you should buy one for a friend of yours instead or consider making a donation to an organization like the EFF.