apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.26k forks source link

Python Direct Runner doesn't support both streaming & non streaming sources #21103

Open damccorm opened 2 years ago

damccorm commented 2 years ago

Please see Stack Overflow discussion:

https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir

When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner.

Code example:


gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \

                 | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read
from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \

    | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))

input_rec = (gcsEventsColl, liveEventsColl)
| 'flatten' >> beam.Flatten()

Error:


File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]

  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read
from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,

   _GroupByKeyOnly) was not replaced as expected.

The direct runner corrupts the pipeline when it rewrites the transforms.

 

Imported from Jira BEAM-12586. Original Jira may contain additional context. Reported by: rodriguezc.

jamesandreou commented 2 years ago

Hello. We are currently experiencing this issue as well trying to use beam.Flatten() on a historical Pcol from bigquery and a streaming Pcol from pub/sub.

Has anyone found a temporary workaround?

tvalentyn commented 2 years ago

@jamesandreou would an in-process Flink runner work for you?

# (in a separate terminal)
docker run --net=host apache/beam_flink1.11_job_server:latest

python -m your_pipeline --runner PortableRunner  --job_endpoint="localhost:8099" --environment_type="LOOPBACK"  --streaming
krishnap commented 2 years ago

any update on this?

tvalentyn commented 2 years ago

I don't think there has been significant work on Python streaming direct runner recently.

ptrmcrthr commented 2 years ago

We are running into this issue trying to implement a slowly changing side input as seen here: https://beam.apache.org/documentation/patterns/side-inputs/

Maybe a note on that page saying it's not working with DirectRunner? Unfortunately my pipeline is not working with Flink runner

tvalentyn commented 2 years ago

@damccorm is working on a fix for PeriodicImpulse transform that may help with that pattern. Not sure if it will work with DirectRunner though as it has other limitations.

tvalentyn commented 2 years ago

@BjornPrime - when you will document direct runner streaming limitations, incorporate https://github.com/apache/beam/issues/21103#issuecomment-1242760530

precabal commented 1 year ago

any update on this?

paulleroyza commented 1 year ago

This is also affecting my pipeline, snippet below:

  with beam.Pipeline(argv=pipeline_args) as pipeline:
    send_data = (pipeline | "Read Parquet" >> beam.io.ReadFromParquet(known_args.source)
                          | "Write to PubSub" >> beam.io.WriteToPubSub(topic=known_args.topic)
                )
franklin113 commented 10 months ago

Are there any workarounds for this? Using PeriodicImpulse for updating side inputs in the DirectRunner throws this error in my streaming pipeline.