GoogleCloudPlatform / DataflowPythonSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
163 stars 37 forks source link

--streaming flag not recognized/used with WriteToPubSub #56

Closed jmwoloso closed 2 years ago

jmwoloso commented 5 years ago
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

options = PipelineOptions(
        pipeline_type_check=True,
        job_name="reveal-ip-addresses",
        staging_location="gs://the_refinery/staging",
        temp_location="gs://the_refinery/temp",
        project="infusionsoft-looker-poc",
        runner="DirectRunner",
        streaming=True
    ).view_as(GoogleCloudOptions)

# generate the DAG
with beam.Pipeline(options=options) as pipe:
    # gather the batch of ip addresses
    companies = (
        pipe | "read_table_from_bq" >> beam.io.Read(bq_source)
             | "reveal_ip_addresses" >> beam.ParDo(ClearbitReveal()).with_outputs("message_id", main="company_payload")
             | "publish_company_payloads" >> beam.io.WriteToPubSub(topic=TOPIC)
     )

results in the following traceback:

Traceback (most recent call last):
  File "<input>", line 2336, in <module>
  File "<input>", line 1860, in main
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 835, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 496, in __ror__
    p.run().wait_until_finish()
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 416, in run
    return self.runner.run_pipeline(self)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 138, in run_pipeline
    return runner.run_pipeline(pipeline)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 360, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(pipeline.options))
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 388, in replace_all
    self._replace(override)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 299, in _replace
    self.visit(TransformUpdater(self))
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 444, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 780, in visit
    part.visit(visitor, pipeline, visited)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 778, in visit
    visitor.enter_composite_transform(self)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 294, in enter_composite_transform
    self._replace_if_needed(transform_node)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 214, in _replace_if_needed
    original_transform_node.transform)
  File "/home/jason/virtualenvs/refinery_dataflow/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 334, in get_replacement_transform
    raise Exception('PubSub I/O is only available in streaming mode '
Exception: PubSub I/O is only available in streaming mode (use the --streaming flag).
meredithslota commented 2 years ago

We moved to Apache Beam!

Google Cloud Dataflow for Python is now Apache Beam Python SDK and the code development moved to the Apache Beam repo.

If you want to contribute to the project (please do!) use this Apache Beam contributor's guide. Closing out this issue accordingly.