apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.23k forks source link

[Bug]: Race condition in Python SDK Harness ProcessBundleProgress #24776

Open cozos opened 1 year ago

cozos commented 1 year ago

What happened?

Hello, I am on Apache Beam v2.35.0 running on GCP Dataflow, and I've encountered what I believe are race conditions in the progress reporting machinery (i.e. process_bundle_progress or ProcessBundleProgressRequest):

Error processing instruction process_bundle_progress-1213241858972398550-1099. Original traceback is
Traceback (most recent call last):
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 672, in process_bundle_progress
    monitoring_infos = processor.monitoring_infos()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py", line 1131, in monitoring_infos
    op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 360, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 425, in apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 204, in int64_counter
    return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 303, in create_monitoring_info
    urn=urn, type=type_urn, labels=labels or {}, payload=payload)
SystemError: <class 'metrics_pb2.MonitoringInfo'> returned NULL without setting an error
Error processing instruction process_bundle_progress-5696618351405637733-1593. Original traceback is
Traceback (most recent call last):
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 672, in process_bundle_progress
    monitoring_infos = processor.monitoring_infos()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py", line 1131, in monitoring_infos
    op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 360, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 413, in apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 204, in int64_counter
    return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 303, in create_monitoring_info
    urn=urn, type=type_urn, labels=labels or {}, payload=payload)
  File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/_collections_abc.py", line 840, in update
    for key in other:
RuntimeError: dictionary changed size during iteration
Error processing instruction process_bundle_progress-6997054913682226470-568. Original traceback is
Traceback (most recent call last):
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 683, in process_bundle_progress
    for info in monitoring_infos
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 683, in <dictcomp>
    for info in monitoring_infos
AttributeError: 'bytes' object has no attribute 'payload'
Error processing instruction process_bundle_progress-5538519205655791026-1654. Original traceback is
Traceback (most recent call last):
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
    response = task()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 672, in process_bundle_progress
    monitoring_infos = processor.monitoring_infos()
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py", line 1131, in monitoring_infos
    op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 362, in apache_beam.runners.worker.operations.Operation.monitoring_infos
  File "apache_beam/runners/worker/operations.py", line 779, in apache_beam.runners.worker.operations.DoOperation.pcollection_count_monitoring_infos
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 238, in int64_distribution
    return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
  File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py", line 303, in create_monitoring_info
    urn=urn, type=type_urn, labels=labels or {}, payload=payload)
  File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/_collections_abc.py", line 841, in update
    self[key] = other[key]
ValueError: b'\x00\x00\x00\x00\xda?... REDACTED' has type str, but isn't valid UTF-8 encoding. Non-UTF-8 strings must be converted to unicode objects before being added.

I am running long running C++ code through pybind11 which I think might be a contributing factor. However my C++ code does not access any Python objects without holding the GIL and definitely doesn't change anything related to progress reporting.

I am marking this as P1 because I assume race conditions can cause data loss, etc - sorry if this is inappropriate and feel free to change it.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

AnandInguva commented 1 year ago

cc: @tvalentyn

AnandInguva commented 1 year ago

A similar issue was reported before: https://github.com/apache/beam/issues/20775 but resolved in https://github.com/apache/beam/pull/13526/

cozos commented 1 year ago

Note that https://github.com/apache/beam/pull/13526 is included in v2.35.0 - see: https://github.com/apache/beam/blame/v2.35.0/sdks/python/apache_beam/metrics/execution.py#L241

kennknowles commented 1 year ago

I agree this should remain P1.

tvalentyn commented 1 year ago

@cozos Thanks for reporting! Do you by chance have a repro that you could share? It would allow people looking into this issue to debug independently. Thank you.

cozos commented 1 year ago

I will try to create a minimal reproducible example

tvalentyn commented 1 year ago

Thank you. Much appreciated.

kennknowles commented 1 year ago

Any luck created a reproduction? I am not sure this kind of race condition could cause data loss or not.