ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.19k stars 5.81k forks source link

Ray Data BigQuery pickling error [<Ray component: Core|Data>] #45197

Open Chromatius opened 6 months ago

Chromatius commented 6 months ago

What happened + What you expected to happen

I am trying to use the Ray Data BigQuery for data ingestion. So far I am just testing in local mode to debug the code. As I run the code, I run into this error:

2024-05-08 13:56:37,459 INFO worker.py:1749 -- Started a local Ray instance.
2024-05-08 13:56:40,423 INFO bigquery_datasource.py:70 -- Created streams: 1
2024-05-08 13:56:40,423 INFO bigquery_datasource.py:72 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.
2024-05-08 13:56:40,964 INFO dataset.py:2370 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-05-08 13:56:42,932 INFO bigquery_datasource.py:70 -- Created streams: 1
2024-05-08 13:56:42,933 INFO bigquery_datasource.py:72 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.
2024-05-08 13:56:42,939 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-05-08_13-56-34_764808_91580/logs/ray-data
2024-05-08 13:56:42,939 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBigQuery] -> LimitOperator[limit=1]
2024-05-08 13:56:44,435 INFO bigquery_datasource.py:70 -- Created streams: 1
2024-05-08 13:56:44,435 INFO bigquery_datasource.py:72 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.
2024-05-08 13:56:44,460 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File ".../train.py", line 36, in <module>
    main()
  File ".../train.py", line 32, in main
    dataset.show(limit=1)
  File ".../python3.11/site-packages/ray/data/dataset.py", line 2453, in show
    for row in self.take(limit):
               ^^^^^^^^^^^^^^^^
  File ".../python3.11/site-packages/ray/data/dataset.py", line 2377, in take
    for row in limited_ds.iter_rows():
  File ".../python3.11/site-packages/ray/data/iterator.py", line 245, in _wrapped_iterator
    for batch in batch_iterable:
  File ".../python3.11/site-packages/ray/data/iterator.py", line 162, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../python3.11/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../python3.11/site-packages/ray/data/exceptions.py", line 86, in handle_trace
    raise e.with_traceback(None) from SystemException()
TypeError: Could not serialize the put value <ray.data.datasource.datasource.ReadTask object at 0x1353dd290>:
================================================================================
Checking Serializability of <ray.data.datasource.datasource.ReadTask object at 0x1353dd290>
================================================================================
!!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
    Serializing '_read_fn' <function BigQueryDatasource.get_read_tasks.<locals>.<lambda> at 0x100961300>...
    !!! FAIL serialization: cannot pickle 'google._upb._message.Descriptor' object
    Detected 1 nonlocal variables. Checking serializability...
        Serializing '_read_single_partition' <function BigQueryDatasource.get_read_tasks.<locals>._read_single_partition at 0x1353c6e80>...
    WARNING: Did not find non-serializable object in <function BigQueryDatasource.get_read_tasks.<locals>.<lambda> at 0x100961300>. This may be an oversight.
    Serializing '_abc_impl' <_abc._abc_data object at 0x116ee1140>...
    !!! FAIL serialization: cannot pickle '_abc._abc_data' object
    WARNING: Did not find non-serializable object in <_abc._abc_data object at 0x116ee1140>. This may be an oversight.
================================================================================
Variable: 

    FailTuple(_read_fn [obj=<function BigQueryDatasource.get_read_tasks.<locals>.<lambda> at 0x100961300>, parent=<ray.data.datasource.datasource.ReadTask object at 0x1353dd290>])

was found to be non-serializable. There may be multiple other undetected variables that were non-serializable. 
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class. 
================================================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
================================================================================

Versions / Dependencies

I am on MacOS in a Python 3.11 environment with the following dependencies:

aiosignal                     1.3.1       aiosignal: a list of registered asynchronous callbacks
attrs                         23.2.0      Classes Without Boilerplate
babel                         2.14.0      Internationalization utilities
bigquery                      0.0.41      Easily send data to Big Query
black                         23.12.1     The uncompromising code formatter.
cachetools                    5.3.3       Extensible memoizing collections and decorators
certifi                       2024.2.2    Python package for providing Mozilla's CA Bundle.
cffi                          1.16.0      Foreign Function Interface for Python calling C code.
cfgv                          3.4.0       Validate configuration and produce human readable error messages.
charset-normalizer            3.3.2       The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet.
click                         8.1.7       Composable command line interface toolkit
colorama                      0.4.6       Cross-platform colored terminal text.
cryptography                  42.0.7      cryptography is a package which provides cryptographic recipes and primitives to Python developers.
dacktool                      0.0.7       Some python tools
dbstream                      0.1.25      A meta package to be connected to several databases
distlib                       0.3.8       Distribution utilities
filelock                      3.13.1      A platform independent file lock.
frozenlist                    1.4.1       A list-like structure which implements collections.abc.MutableSequence
fsspec                        2024.3.1    File-system specification
ghp-import                    2.1.0       Copy your docs directly to the gh-pages branch.
google-api-core               2.8.0       Google API client core library
google-api-python-client      1.7.11      Google API Client Library for Python
google-auth                   2.29.0      Google Authentication Library
google-auth-httplib2          0.0.3       Google Authentication Library: httplib2 transport
google-auth-oauthlib          1.2.0       Google Authentication Library
google-cloud-bigquery         3.18.0      Google BigQuery API client library
google-cloud-bigquery-storage 2.13.1      BigQuery Storage API API client library
google-cloud-core             2.4.1       Google Cloud API client core library
google-cloud-secret-manager   2.7.2       Secret Manager API API client library
google-crc32c                 1.5.0       A python wrapper of the C library 'Google CRC32C'
google-resumable-media        2.7.0       Utilities for Google Media Downloads and Resumable Uploads
googleapis-common-protos      1.56.1      Common protobufs used in Google APIs
googleauthentication          0.0.17      A meta package to be connected to Google services
grpc-google-iam-v1            0.12.4      IAM API client library
grpcio                        1.63.0      HTTP/2-based RPC framework
grpcio-status                 1.63.0      Status proto mapping for gRPC
httplib2                      0.22.0      A comprehensive HTTP client library.
identify                      2.5.35      File identification library for Python
idna                          3.6         Internationalized Domain Names in Applications (IDNA)
iniconfig                     2.0.0       brain-dead simple config-ini parsing
jinja2                        3.1.3       A very fast and expressive template engine.
jsonschema                    4.22.0      An implementation of JSON Schema validation for Python
jsonschema-specifications     2023.12.1   The JSON Schema meta-schemas and vocabularies, exposed as a Registry
libcst                        1.3.1       A concrete syntax tree with AST-like properties for Python 3.0 through 3.12 programs.
markdown                      3.6         Python implementation of John Gruber's Markdown.
markupsafe                    2.1.5       Safely add untrusted strings to HTML/XML markup.
mergedeep                     1.3.4       A deep merge function for 🐍.
mkdocs                        1.5.3       Project documentation with Markdown.
mkdocs-autorefs               1.0.1       Automatically link across pages in MkDocs.
mkdocs-material               9.5.14      Documentation that simply works
mkdocs-material-extensions    1.3.1       Extension pack for Python Markdown and MkDocs Material.
mkdocstrings                  0.24.1      Automatic documentation from sources, for MkDocs.
msgpack                       1.0.8       MessagePack serializer
mypy-extensions               1.0.0       Type system extensions for programs checked with the mypy type checker.
nodeenv                       1.8.0       Node.js virtual environment builder
numpy                         1.26.4      Fundamental package for array computing in Python
oauthlib                      3.2.2       A generic, spec-compliant, thorough implementation of the OAuth request-signing logic
packaging                     24.0        Core utilities for Python packages
paginate                      0.5.6       Divides large result sets into pages for easier browsing
pandas                        1.3.4       Powerful data structures for data analysis, time series, and statistics
pathspec                      0.12.1      Utility library for gitignore style pattern matching of file paths.
platformdirs                  4.2.0       A small Python package for determining appropriate platform-specific dirs, e.g. a "user data dir".
pluggy                        1.4.0       plugin and hook calling mechanisms for python
pre-commit                    3.6.2       A framework for managing and maintaining multi-language pre-commit hooks.
proto-plus                    1.20.4      Beautiful, Pythonic protocol buffers.
protobuf                      5.26.1
pyarrow                       16.0.0      Python library for Apache Arrow
pyasn1                        0.6.0       Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)
pyasn1-modules                0.4.0       A collection of ASN.1-based protocols modules
pycparser                     2.22        C parser in Python
pygments                      2.17.2      Pygments is a syntax highlighting package written in Python.
pymdown-extensions            10.7.1      Extension pack for Python Markdown.
pyparsing                     3.1.2       pyparsing module - Classes and methods to define and execute parsing grammars
pytest                        7.4.4       pytest: simple powerful testing with Python
python-dateutil               2.9.0.post0 Extensions to the standard Python datetime module
pytz                          2024.1      World timezone definitions, modern and historical
pyyaml                        6.0.1       YAML parser and emitter for Python
pyyaml-env-tag                0.1         A custom YAML tag for referencing environment variables in YAML files.
ray                           2.20.0      Ray provides a simple, universal API for building distributed applications.
referencing                   0.35.1      JSON Referencing + Python
regex                         2023.12.25  Alternative regular expression module, to replace re.
requests                      2.31.0      Python HTTP for Humans.
requests-oauthlib             2.0.0       OAuthlib authentication support for Requests.
rpds-py                       0.18.1      Python bindings to Rust's persistent data structures (rpds)
rsa                           4.9         Pure-Python RSA implementation
setuptools                    69.2.0      Easily download, build, install, upgrade, and uninstall Python packages
six                           1.16.0      Python 2 and 3 compatibility utilities
tensorboardx                  2.6.2.2     TensorBoardX lets you watch Tensors Flow without Tensorflow
uritemplate                   3.0.1       URI templates
urllib3                       2.2.1       HTTP library with thread-safe connection pooling, file post, and more.
virtualenv                    20.25.1     Virtual Python Environment builder
watchdog                      4.0.0       Filesystem events monitoring

Reproduction script

I had to remove anything that could identify dataset, but I tried on two different datasets and got the same problem so I don't think it's specific for one dataset.

import logging

import ray
from ray.data import Dataset as RayDataset

def load_bigquery(
    project_id: str,
    mod_number: int = 100
) -> RayDataset:
    query = f"""
        SELECT *
        FROM `...`
        LIMIT 10
    """

    dataset: RayDataset = ray.data.read_bigquery(
        project_id=project_id,
        query=query,
    )

    logging.debug(f"fetched {dataset.count()} rows")

    return dataset

def main():

    # Load data
    dataset = load_bigquery(project_id="...")
    dataset.show(limit=1)

if __name__ == "__main__":
    main()

Issue Severity

High: It blocks me from completing my task.

Chromatius commented 6 months ago

I did figure out the issue myself. If you try to use ray.data.read_bigquery() without BigQuery available for import, Ray will give the following error:

ImportError: BigQueryDatasource depends on ‘bigquery’, but ‘bigquery’ couldn’t be imported. You can install ‘bigquery’ by running pip install bigquery.

That is not the correct dependency, and my initial error will appear if one does follow the instruction. Instead one should install google-cloud-bigquery which is imported as BigQuery.

I would consider the initial error message a bug in this case.