apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.74k stars 4.21k forks source link

[Bug]: In Beam Dataframe api, the fillna operation doesn't work when applied on individual columns #31855

Open tvalentyn opened 1 month ago

tvalentyn commented 1 month ago

What happened?

I was experimenting with Beam Dataframe API through Beam Notebooks + Interactive runner and wasn't able to use fillna on individual columns. Here is a repro on a dataframe with two columns:

%%writefile numbers.csv
col1,col2
1,1
NaN,1
-1,1
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
pipeline = beam.Pipeline(InteractiveRunner())

beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv')
beam_df['col1'] = beam_df['col1'].fillna(0)
ib.collect(beam_df)

This fails with ValueError: "[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a pipeline to be specified as there are no deferred inputs.

A rewritten version:

c1 = beam_df['col1']
c1 = c1.fillna(0)
ib.collect(c1)  

also fails.

The snippets pass without issues on Pandas or Dask.

Full Stacktrace --------------------------------------------------------------------------- ValueError Traceback (most recent call last) Cell In[62], line 16 14 beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('numbers.csv') 15 beam_df['col1'] = beam_df['col1'].fillna(0) ---> 16 ib.collect(beam_df) # fails with ValueError: "[ConstantExpression[constant_int_140226804834624]]:140226804986784" requires a pipeline to be specified as there are no deferred inputs. 18 c1 = beam_df['col1'] 19 c1 = c1.fillna(0) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:277, in progress_indicated..run_within_progress_indicator(*args, **kwargs) 274 @functools.wraps(func) 275 def run_within_progress_indicator(*args, **kwargs): 276 with ProgressIndicator(f'Processing... {func.__name__}', 'Done.'): --> 277 return func(*args, **kwargs) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py:906, in collect(pcoll, n, duration, include_window_info) 902 # Remember the element type so we can make an informed decision on how to 903 # collect the result in elements_to_df. 904 if isinstance(pcoll, DeferredBase): 905 # Get the proxy so we can get the output shape of the DataFrame. --> 906 pcoll, element_type = deferred_df_to_pcollection(pcoll) 907 watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll}) 908 else: File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/utils.py:313, in deferred_df_to_pcollection(df) 310 cache.replace_with_cached(df._expr) 312 proxy = df._expr.proxy() --> 313 return to_pcollection(df, yield_elements='pandas', label=str(df._expr)), proxy File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/convert.py:261, in to_pcollection(label, always_return_tuple, yield_elements, include_indexes, pipeline, *dataframes) 257 new_dataframes = [ 258 df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE 259 ] 260 if len(new_dataframes): --> 261 new_results = {p: extract_input(p) 262 for p in placeholders 263 } | label >> transforms._DataframeExpressionsTransform({ 264 ix: df._expr 265 for (ix, df) in enumerate(new_dataframes) 266 }) # type: Dict[Any, pvalue.PCollection] 268 TO_PCOLLECTION_CACHE.update( 269 {new_dataframes[ix]._expr._id: pc 270 for ix, pc in new_results.items()}) 272 raw_results = { 273 ix: TO_PCOLLECTION_CACHE[df._expr._id] 274 for ix, 275 df in enumerate(dataframes) 276 } File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:1110, in _NamedPTransform.__ror__(self, pvalueish, _unused) 1109 def __ror__(self, pvalueish, _unused=None): -> 1110 return self.transform.__ror__(pvalueish, self.label) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:623, in PTransform.__ror__(self, left, label) 621 pvalueish = _SetInputPValues().visit(pvalueish, replacements) 622 self.pipeline = p --> 623 result = p.apply(self, pvalueish, label) 624 if deferred: 625 return result File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:679, in Pipeline.apply(self, transform, pvalueish, label) 677 old_label, transform.label = transform.label, label 678 try: --> 679 return self.apply(transform, pvalueish) 680 finally: 681 transform.label = old_label File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/pipeline.py:732, in Pipeline.apply(self, transform, pvalueish, label) 729 if type_options.pipeline_type_check: 730 transform.type_check_inputs(pvalueish) --> 732 pvalueish_result = self.runner.apply(transform, pvalueish, self._options) 734 if type_options is not None and type_options.pipeline_type_check: 735 transform.type_check_outputs(pvalueish_result) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/interactive/interactive_runner.py:131, in InteractiveRunner.apply(self, transform, pvalueish, options) 129 def apply(self, transform, pvalueish, options): 130 # TODO(qinyeli, BEAM-646): Remove runner interception of apply. --> 131 return self._underlying_runner.apply(transform, pvalueish, options) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:203, in PipelineRunner.apply(self, transform, input, options) 197 def apply(self, 198 transform, # type: PTransform 199 input, # type: Optional[pvalue.PValue] 200 options # type: PipelineOptions 201 ): 202 # TODO(robertwb): Remove indirection once internal references are fixed. --> 203 return self.apply_PTransform(transform, input, options) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/runners/runner.py:207, in PipelineRunner.apply_PTransform(self, transform, input, options) 205 def apply_PTransform(self, transform, input, options): 206 # TODO(robertwb): Remove indirection once internal references are fixed. --> 207 return transform.expand(input) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:151, in _DataframeExpressionsTransform.expand(self, inputs) 150 def expand(self, inputs): --> 151 return self._apply_deferred_ops(inputs, self._outputs) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470, in _DataframeExpressionsTransform._apply_deferred_ops(self, inputs, outputs) 467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline) 469 # Now we can compute and return the result. --> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()} File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:470, in (.0) 467 return arg_sizes | label >> beam.Flatten(pipeline=pipeline) 469 # Now we can compute and return the result. --> 470 return {k: expr_to_pcoll(expr) for k, expr in outputs.items()} File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs) 560 key = args, tuple(sorted(kwargs.items())) 561 if key not in cache: --> 562 cache[key] = f(*args, **kwargs) 563 return cache[key] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432, in _DataframeExpressionsTransform._apply_deferred_ops..expr_to_pcoll(expr) 430 return inputs[expr] 431 else: --> 432 return stage_to_result(expr_to_stage(expr))[expr._id] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs) 560 key = args, tuple(sorted(kwargs.items())) 561 if key not in cache: --> 562 cache[key] = f(*args, **kwargs) 563 return cache[key] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in _DataframeExpressionsTransform._apply_deferred_ops..stage_to_result(stage) 422 @_memoize 423 def stage_to_result(stage): --> 424 return {expr._id: expr_to_pcoll(expr) 425 for expr in stage.inputs} | ComputeStage(stage) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in (.0) 422 @_memoize 423 def stage_to_result(stage): --> 424 return {expr._id: expr_to_pcoll(expr) 425 for expr in stage.inputs} | ComputeStage(stage) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs) 560 key = args, tuple(sorted(kwargs.items())) 561 if key not in cache: --> 562 cache[key] = f(*args, **kwargs) 563 return cache[key] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:432, in _DataframeExpressionsTransform._apply_deferred_ops..expr_to_pcoll(expr) 430 return inputs[expr] 431 else: --> 432 return stage_to_result(expr_to_stage(expr))[expr._id] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:562, in _memoize..wrapper(*args, **kwargs) 560 key = args, tuple(sorted(kwargs.items())) 561 if key not in cache: --> 562 cache[key] = f(*args, **kwargs) 563 return cache[key] File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/dataframe/transforms.py:424, in _DataframeExpressionsTransform._apply_deferred_ops..stage_to_result(stage) 422 @_memoize 423 def stage_to_result(stage): --> 424 return {expr._id: expr_to_pcoll(expr) 425 for expr in stage.inputs} | ComputeStage(stage) File /jupyter/.kernels/apache-beam-2.56.0/packages/beam/sdks/python/apache_beam/transforms/ptransform.py:602, in PTransform.__ror__(self, left, label) 600 p = self.pipeline 601 else: --> 602 raise ValueError( 603 '"%s" requires a pipeline to be specified ' 604 'as there are no deferred inputs.' % self.label) 605 else: 606 p = self.pipeline or pipelines[0] ValueError: "[ConstantExpression[constant_int_140226803777984]]:140226803785616" requires a pipeline to be specified as there are no deferred inputs. import apache_beam as beam

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

tvalentyn commented 1 month ago

beam_df['col1'] = beam_df['col1'].abs() works as expected.