GoogleCloudPlatform / training-data-analyst

Labs and demos for courses for GCP Training (http://cloud.google.com/training).
Apache License 2.0
7.65k stars 5.77k forks source link

Issue with 7_Advanced_Streaming_Analytics/streaming_minute_traffic_pipeline.py #1908

Open F43RY opened 2 years ago

F43RY commented 2 years ago

Line 112 trigger=AfterProcessingTime(120) leads to the following issue:

python3 streaming_minute_traffic_pipeline.py --project=${PROJECT_ID} --region=${REGION} --staging_location=${PIPELINE_FOLDER}/staging --temp_location=${PIPELINE_FOLDER}/temp --runner=${RUNNER} --input_topic=${PUBSUB_TOPIC} --window_duration=${WINDOW_DURATION} --allowed_lateness=${ALLOWED_LATENESS} --table_name=${OUTPUT_TABLE_NAME} --dead_letter_bucket=${DEADLETTER_BUCKET} streaming_minute_traffic_pipeline.py:114: FutureWarning: WriteToFiles is experimental. | 'WriteUnparsedToGCS' >> fileio.WriteToFiles(output_path, shards=1, max_writers_per_bundle=0) /home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/io/fileio.py:550: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported p.options.view_as(GoogleCloudOptions).temp_location or Traceback (most recent call last): File "streaming_minute_traffic_pipeline.py", line 138, in <module> run() File "streaming_minute_traffic_pipeline.py", line 114, in run | 'WriteUnparsedToGCS' >> fileio.WriteToFiles(output_path, shards=1, max_writers_per_bundle=0) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pvalue.py", line 137, in __or__ return self.pipeline.apply(ptransform, self) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 652, in apply transform.transform, pvalueish, label or transform.label) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 662, in apply return self.apply(transform, pvalueish) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply return super().apply(transform, input, options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply return m(transform, input, options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform return transform.expand(input) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/io/fileio.py", line 577, in expand | beam.ParDo( File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pvalue.py", line 137, in __or__ return self.pipeline.apply(ptransform, self) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 652, in apply transform.transform, pvalueish, label or transform.label) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 662, in apply return self.apply(transform, pvalueish) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply return super().apply(transform, input, options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply return m(transform, input, options) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 830, in apply_GroupByKey return transform.expand(pcoll) File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 2609, in expand raise ValueError(msg) ValueError: GroupRecordsByDestinationAndShard: Unsafe trigger:AfterProcessingTime(delay=300)may lose data. Reason: MAY_FINISH. This can be overriden with the --allow_unsafe_triggers flag.

ashbeekim commented 2 years ago
/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/io/fileio.py:550: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  p.options.view_as(GoogleCloudOptions).temp_location or
Traceback (most recent call last):
  File "streaming_minute_traffic_pipeline.py", line 140, in <module>
    run()
  File "streaming_minute_traffic_pipeline.py", line 116, in run
    max_writers_per_bundle=0)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 652, in apply
    transform.transform, pvalueish, label or transform.label)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply
    return super().apply(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/io/fileio.py", line 577, in expand
    | beam.ParDo(
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 652, in apply
    transform.transform, pvalueish, label or transform.label)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply
    return super().apply(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 830, in apply_GroupByKey
    return transform.expand(pcoll)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 2609, in expand
    raise ValueError(msg)
ValueError: GroupRecordsByDestinationAndShard: Unsafe trigger: `AfterProcessingTime(delay=120)` may lose data. Reason: MAY_FINISH. This can be overriden with the --allow_unsafe_triggers flag.

So I add a line below errored code [^ref-1], 'AvoidValueError' >> beam.Group By Key()

It's still showed me up like this:

Traceback (most recent call last):
  File "streaming_minute_traffic_pipeline.py", line 138, in <module>
    run()
  File "streaming_minute_traffic_pipeline.py", line 112, in run
    | 'WriteUnparsedToGCS' >> fileio.WriteToFiles(output_path,
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 652, in apply
    transform.transform, pvalueish, label or transform.label)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply
    return super().apply(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 830, in apply_GroupByKey
    return transform.expand(pcoll)
  File "/home/jupyter/project/training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab/df-env/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 2609, in expand
    raise ValueError(msg)
ValueError: AvoidValueError: Unsafe trigger: `AfterProcessingTime(delay=120)` may lose data. Reason: MAY_FINISH. This can be overriden with the --allow_unsafe_triggers flag.

After I fixed the errored code [^ref-2], 'BatchOver10s' >> beam.WindowInto( FixedWindows(120), trigger=Repeatedly( AfterAny(AfterCount(100), AfterProcessingTime(120))), accumulation_mode=AccumulationMode.DISCARDING)

Finally could check up logging infomations on terminal and logs.minute_traffic table on BigQuery, and get 15 of 15 on "Run your pipeline". But the pipeline was running til end of this lab, I just earn 5 of 15 about "Test your pipeline". So I'm not sure that the code that I fixed is correct.

[^ref-1]: stackoverflow | Apache beam IllegalArgumentException: Unsafe trigger may lose data [^ref-2]: Apache Beam Programming Guide | 9.5. Composite triggers