apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: writing to datastore with dynamic project id working with direct runner but failing with Dataflowrunner #26755

Open kumgaurav opened 1 year ago

kumgaurav commented 1 year ago

What happened?

This my code for writing into datastore :

def write_to_datastore(gcloud_options, consumer_options, pipeline_options):
  input_patterns = [] 
  input_patterns.append(consumer_options.input)
  """Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
    _ = (
        p
        | 'read' >> ReadCsvFiles(input_patterns)
        | 'create entity' >> beam.ParDo(EntityWrapperPardo(consumer_options))
        | 'write to datastore' >> WriteToDatastore(consumer_options.datastore_project.get()))

using direct runner: python -m testconsumer.py \ --runner DirectRunner \ --project itd-aia-dp \ --input gs://itd-aia-dp/dataflow/pipelines/datastore/input/Cortex-Model-Coefficients-2.csv \ --kind product \ --namespace datapipeline \ --datastore_project itd-aia-dp \ --output gs://itd-aia-dp/dataflow/pipelines/datastore/output/

using dataflow runner

python3 testconsumer.py \ --project=itd-aia-dp \ --runner=DataflowRunner \ --staging_location=gs://itd-aia-dp/staging \ --temp_location=gs://itd-aia-dp/temp \ --template_location=gs://itd-aia-dp/templates \ --requirements_file=requirements.txt \ --region=REGION \ --experiments=shuffle_mode=service \ --setup_file ./setup.py \ --input gs://itd-aia-dp/dataflow/pipelines/datastore/input/Cortex-Model-Coefficients.csv \ --kind product \ --output gs://itd-aia-dp/dataflow/pipelines/datastore/output/

stack trace : ERROR:apache_beam.runners.runner:Error while visiting write to datastore/Write Batch to Datastore Traceback (most recent call last): File "/Users/gkumargaur/workspace/python/office/datastoreconsumer/pysparks/gcs_to_datastore_consumer_main.py", line 53, in consumertemplate.run() File "/Users/gkumargaur/workspace/python/office/datastoreconsumer/pysparks/panw/paloalto/consumertemplate.py", line 176, in run write_to_datastore(gcloud_options, consumer_options, pipeline_options) File "/Users/gkumargaur/workspace/python/office/datastoreconsumer/pysparks/panw/paloalto/consumertemplate.py", line 131, in write_todatastore = ( File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 600, in exit self.result = self.run() File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 550, in run return Pipeline.from_runner_api( File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 577, in run return self.runner.run_pipeline(self, self._options) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 524, in run_pipeline self.visit_transforms(pipeline, options) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/runner.py", line 211, in visit_transforms pipeline.visit(RunVisitor(self)) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 626, in visit self._root_transform().visit(visitor, self, visited) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 1260, in visit part.visit(visitor, pipeline, visited) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 1260, in visit part.visit(visitor, pipeline, visited) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/pipeline.py", line 1263, in visit visitor.visit_transform(self) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/runner.py", line 206, in visit_transform self.runner.run_transform(transform_node, options) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/runner.py", line 233, in run_transform return m(transform_node, options) File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 877, in run_ParDo step = self._add_step( File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 652, in _add_step [ File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 653, in item.get_dict() File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/transforms/display.py", line 370, in get_dict self.is_valid() File "/Users/gkumargaur/opt/miniconda3/envs/beam/lib/python3.9/site-packages/apache_beam/transforms/display.py", line 336, in is_valid raise ValueError( ValueError: Invalid DisplayDataItem. Value RuntimeValueProvider(option: datastore_project, type: str, default_value: None) is of an unsupported type.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

johnjcasey commented 1 year ago

Initially, it looks like the displaydata is misconfigured somehow. My guess is that the datastore_project passed in as an option is a valueprovider, instead of a value directly, and that isn't working properly.

kumgaurav commented 1 year ago

@johnjcasey this is my requirement- where my dataflow project and datastore projects are different. I am looking to pass dynamically datastore project as valueprovider by creating classic template and used it from airflow for scheduling purpose. Based on dev, QA, prod it will write to respective datastore.

johnjcasey commented 1 year ago

Got it, that makes sense. In that case, I think the change would be to modify the display data logic to get the value itself from the ValueProvider