ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34k stars 5.78k forks source link

[Data] ValueError (missing block size) when transforming result of `read_sql` with `parallelism=1` #36903

Closed aloysius-lim closed 7 months ago

aloysius-lim commented 1 year ago

What happened + What you expected to happen

Given an SQL database (tested on SQLite and PostgreSQL via psycopg) When a Dataset is retrieved with read_sql(..., parallelism=1) And transformations are applied to the Dataset (e.g. map_batches(), add_column()) Then any operation that materializes the data (e.g. show(), count(), write_csv()) fails with ValueError: The size in bytes of the block must be known

Stacktrace:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[23], line 4
      2     batch["scaled_score"] = batch["score"] * 10
      3     return batch
----> 4 ds.map_batches(transform).show()

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/dataset.py:2134](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/dataset.py:2134), in Dataset.show(self, limit)
   2125 @ConsumptionAPI(pattern="Time complexity:")
   2126 def show(self, limit: int = 20) -> None:
   2127     """Print up to the given number of records from the dataset.
   2128 
   2129     Time complexity: O(limit specified)
   (...)
   2132         limit: The max number of records to print.
   2133     """
-> 2134     for row in self.take(limit):
   2135         print(row)

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/dataset.py:2092](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/dataset.py:2092), in Dataset.take(self, limit)
   2087     logger.info(
   2088         "Tip: Use `take_batch()` instead of `take() [/](https://file+.vscode-resource.vscode-cdn.net/) show()` to return "
   2089         "records in pandas or numpy batch format."
   2090     )
   2091 output = []
-> 2092 for row in self.iter_rows():
   2093     output.append(row)
   2094     if len(output) >= limit:

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/iterator.py:225](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/iterator.py:225), in DataIterator.iter_rows(self, prefetch_blocks)
    221 else:
    222     # Since batch_size is None, 1 block is exactly 1 batch.
    223     iter_batch_args["prefetch_batches"] = prefetch_blocks
--> 225 for batch in self.iter_batches(**iter_batch_args):
    226     batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
    227     for row in batch.iter_rows(public_row_format=True):

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/iterator.py:158](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/iterator.py:158), in DataIterator.iter_batches(self, prefetch_batches, batch_size, batch_format, drop_last, local_shuffle_buffer_size, local_shuffle_seed, _collate_fn, prefetch_blocks)
    147     raise DeprecationWarning(
    148         "`prefetch_blocks` arg is deprecated in Ray 2.4. Use "
    149         "the `prefetch_batches` arg instead to specify the amount of "
   (...)
    153         "to True in the DataContext."
    154     )
    156 time_start = time.perf_counter()
--> 158 block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
    159 if use_legacy:
    160     # Legacy iter_batches does not use metadata.
    161     def drop_metadata(block_iterator):

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/iterator/iterator_impl.py:31](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/iterator/iterator_impl.py:31), in DataIteratorImpl._to_block_iterator(self)
     23 def _to_block_iterator(
     24     self,
     25 ) -> Tuple[
   (...)
     28     bool,
     29 ]:
     30     ds = self._base_dataset
---> 31     block_iterator, stats, executor = ds._plan.execute_to_iterator()
     32     ds._current_executor = executor
     33     return block_iterator, stats, False

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/plan.py:518](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/plan.py:518), in ExecutionPlan.execute_to_iterator(self, allow_clear_input_blocks, force_read)
    516 gen = iter(block_iter)
    517 try:
--> 518     block_iter = itertools.chain([next(gen)], gen)
    519 except StopIteration:
    520     pass

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:53](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:53), in execute_to_legacy_block_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid)
     46 def execute_to_legacy_block_iterator(
     47     executor: Executor,
     48     plan: ExecutionPlan,
     49     allow_clear_input_blocks: bool,
     50     dataset_uuid: str,
     51 ) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
     52     """Same as execute_to_legacy_bundle_iterator but returning blocks and metadata."""
---> 53     bundle_iter = execute_to_legacy_bundle_iterator(
     54         executor, plan, allow_clear_input_blocks, dataset_uuid
     55     )
     56     for bundle in bundle_iter:
     57         for block, metadata in bundle.blocks:

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:82](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:82), in execute_to_legacy_bundle_iterator(executor, plan, allow_clear_input_blocks, dataset_uuid, dag_rewrite)
     61 def execute_to_legacy_bundle_iterator(
     62     executor: Executor,
     63     plan: ExecutionPlan,
   (...)
     66     dag_rewrite=None,
     67 ) -> Iterator[RefBundle]:
     68     """Execute a plan with the new executor and return a bundle iterator.
     69 
     70     Args:
   (...)
     80         The output as a bundle iterator.
     81     """
---> 82     dag, stats = _get_execution_dag(
     83         executor,
     84         plan,
     85         allow_clear_input_blocks,
     86         preserve_order=False,
     87     )
     88     if dag_rewrite:
     89         dag = dag_rewrite(dag)

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:143](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:143), in _get_execution_dag(executor, plan, allow_clear_input_blocks, preserve_order)
    141     stats = _get_initial_stats_from_plan(plan)
    142 else:
--> 143     dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
    145 # Enforce to preserve ordering if the plan has stages required to do so, such as
    146 # Zip and Sort.
    147 # TODO(chengsu): implement this for operator as well.
    148 if preserve_order or plan.require_preserve_order():

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:176](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:176), in _to_operator_dag(plan, allow_clear_input_blocks)
    174 else:
    175     owns_blocks = False
--> 176 operator = _blocks_to_input_buffer(blocks, owns_blocks)
    177 for stage in stages:
    178     operator = _stage_to_operator(stage, operator)

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:249](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:249), in _blocks_to_input_buffer(blocks, owns_blocks)
    245     return MapOperator.create(
    246         do_read, inputs, name=task_name, ray_remote_args=remote_args
    247     )
    248 else:
--> 249     output = _block_list_to_bundles(blocks, owns_blocks=owns_blocks)
    250     for i in output:
    251         for b in i.blocks:

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:368](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:368), in _block_list_to_bundles(blocks, owns_blocks)
    365 output = []
    366 for block, meta in blocks.iter_blocks_with_metadata():
    367     output.append(
--> 368         RefBundle(
    369             [
    370                 (
    371                     block,
    372                     meta,
    373                 )
    374             ],
    375             owns_blocks=owns_blocks,
    376         )
    377     )
    378 return output

File :7, in __init__(self, blocks, owns_blocks, output_split_idx, _cached_location)

File [~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces.py:58](https://file+.vscode-resource.vscode-cdn.net/Users/aloysius/Development/cuezen/cuerank/experimental/ray/notebooks/~/mambaforge/envs/ray/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces.py:58), in RefBundle.__post_init__(self)
     56 assert isinstance(b[1], BlockMetadata), b
     57 if b[1].size_bytes is None:
---> 58     raise ValueError(
     59         "The size in bytes of the block must be known: {}".format(b)
     60     )

ValueError: The size in bytes of the block must be known: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000), BlockMetadata(num_rows=None, size_bytes=None, schema=None, input_files=[], exec_stats=None))

Versions / Dependencies

My environment:

Databases tested:

Reproduction script

import sqlite3
import ray

# From https://docs.ray.io/en/latest/data/api/doc/ray.data.read_sql.html
connection = sqlite3.connect("example.db")
connection.execute("CREATE TABLE movie(title, year, score)")
connection.execute(
    """
    INSERT INTO movie VALUES
        ('Monty Python and the Holy Grail', 1975, 8.2),
        ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
        ("Monty Python's Life of Brian", 1979, 8.0),
        ("Rocky II", 1979, 7.3)
    """
)
connection.commit()
connection.close()

def create_connection():
    return sqlite3.connect("example.db")

def transform(batch):
    batch["scaled_score"] = batch["score"] * 10
    return batch

# These work (any parallelism other than 1)
ds = ray.data.read_sql("SELECT * FROM movie", create_connection)
ds.count()
ds.map_batches(transform).show()
ds.add_column("hello", lambda df: "x").show()

# Set parallelism = 1
ds2 = ray.data.read_sql("SELECT * FROM movie", create_connection, parallelism=1)
# This works (no transformations applied yet)
ds2.count()
# These fail
ds2.map_batches(transform).show()
ds2.add_column("hello", lambda df: "x").show()

Issue Severity

Medium: It is a significant difficulty but I can work around it.

ujjawal-khare-27 commented 9 months ago

Hey @aloysius-lim I am not able to reproduce this.

scottjlee commented 7 months ago

On latest master, I also cannot reproduce this. Please feel free to reopen with more details if you run into the issue after upgrading ray (I see you are on ray 2.5.1).