apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

Composite windowing fail with exception: AttributeError: 'NoneType' object has no attribute 'time' #19070

Open kennknowles opened 2 years ago

kennknowles commented 2 years ago

Tried to apply a window function to the existing pipeline that was running fine. Got the following error:


Traceback (most recent call last):
  File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py",
line 1664, in <module>
    main()
  File "E:\soft\ide\PyCharm 2018.1.2\helpers\pydev\pydevd.py", line
1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "E:\soft\ide\PyCharm
2018.1.2\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)
 # execute the script
  File "E:/work/source/ai-data-pipeline-research/metric_pipeline/batch_beam/batch_pipeline_main.py",
line 97, in <module>
    result.wait_until_finish()
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
line 421, in wait_until_finish
    self._executor.await_completion()
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 398, in await_completion
    self._executor.await_completion()
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 444, in await_completion
    six.reraise(t, v, tb)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 341, in call
    finish_state)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\executor.py",
line 378, in attempt_call
    evaluator.process_element(value)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py",
line 574, in process_element
    self.runner.process(element)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 577, in process
    self._reraise_augmented(exn)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 618, in _reraise_augmented
    six.reraise(type(new_exn), new_exn, original_traceback)
  File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 575, in process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 353, in invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\runners\common.py",
line 651, in process_outputs
    for result in results:
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 942, in process_entire_key
    state, windowed_values, output_watermark):
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 1098, in process_elements
    self.trigger_fn.on_element(value, window, context)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 488, in on_element
    self.underlying.on_element(element, window, context)
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 535, in on_element
    trigger.on_element(element, window, self._sub_context(context, ix))

File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 286, in on_element
    '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
  File
"C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 728, in get_current_time
    return self._outer.get_current_time()
  File "C:\Users\thang\.virtualenvs\metric_pipeline-Mpf2nmv6\lib\site-packages\apache_beam\transforms\trigger.py",
line 702, in get_current_time
    return self._clock.time()
AttributeError: 'NoneType' object has
no attribute 'time' [while running 'combine all sharpe_ratio to list/CombinePerKey/GroupByKey/GroupByWindow']

the composite window function is copied from official documents: https://beam.apache.org/documentation/programming-guide/#composite-triggers Please refer to pipeline relevant source code below:


windowing = beam.WindowInto(FixedWindows(1 * 60),
                            trigger=Repeatedly(AfterAny(AfterCount(100),
AfterProcessingTime(delay=1 * 60))),
                            accumulation_mode=AccumulationMode.DISCARDING)
valuesPCollection
\
| 'calculate sharpe_ratio' >> beam.FlatMap(fn_calculate_sharpe_ratio) \
| 'window sharpe_ratio'
>> windowing \
| 'combine all sharpe_ratio to list' >> beam.CombineGlobally(CombineAllToListFn()).without_defaults()
\
| 'store sharpe_ratio' >> beam.FlatMap(store_metric_with_now_ts)

Imported from Jira BEAM-5132. Original Jira may contain additional context. Reported by: thangbui.

nybbles commented 1 year ago

Just ran into this on 2.43.

kennknowles commented 1 year ago

@tvalentyn @robertwb any insights into this case where the _clock in the triggering logic is None?

robertwb commented 1 year ago

I'm not sure. Presumably the runner (the old python runner in this case) should have provided it?

tvalentyn commented 1 year ago

seems like a bug in portable runner, still reproducible as of 2.45.0, not reproducible in Dataflow runner.

Repro: https://gist.github.com/tvalentyn/8a1f10e4de5a337d025f9bae2dba04be

tvalentyn commented 1 year ago

I'll try to get some eyes on this and investigate if this looks like an easy fix.

RobMcKiernan commented 1 year ago

I'm also experiencing this issue when testing locally. I haven't tried it running with any other runner other than the DirectRunner though.

test_stream = TestStream()
test_stream.add_elements(input_data)
with TestPipeline() as testpipeline:
    results = (
                testpipeline
                | test_stream
                | "Add info"
                >> beam.ParDo(MyDoFn()
            )

    (
                results
                | "Form datapoints" >> beam.ParDo(FormDataPoints())
                | "Emit datapoints on over x num elems or after y time elapsed"
                >> beam.WindowInto(
                    window.GlobalWindows(),
                    trigger=trigger.Repeatedly(
                        trigger.AfterAny(trigger.AfterCount(1), trigger.AfterProcessingTime(2))
                    ),
                    accumulation_mode=trigger.AccumulationMode.DISCARDING,
                )
                | "Group by table" >> beam.GroupBy(lambda x: x.table)
                | "print" >> Map(print)
    )

There's some good clues from the user Subhash in this jira issue: https://issues.apache.org/jira/browse/BEAM-5132 if that helps

RobMcKiernan commented 1 year ago

I've just realised that I didn't have the pipeline options set to streaming:

        pipeline_options = PipelineOptions()
        pipeline_options.view_as(StandardOptions).streaming = True
        with TestPipeline(options=pipeline_options) as testpipeline:

It's now working fine

robertwb commented 1 year ago

Glad you were able to find a workaround, though it is still a bug in the sense that it should not fail with such an error even if streaming isn't set.

On Thu, Apr 13, 2023 at 8:40 AM RobMcKiernan @.***> wrote:

I've just realised that I didn't have the pipeline options set to streaming:

    pipeline_options = PipelineOptions()
    pipeline_options.view_as(StandardOptions).streaming = True
    with TestPipeline(options=pipeline_options) as testpipeline:

It's now working fine

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/19070#issuecomment-1507189914, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADWVAK6UX5M4JR3NC3TMUDXBAM5VANCNFSM5X2JJLWA . You are receiving this because you were mentioned.Message ID: @.***>