apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Task]: Enable to set timeout for Python TestPipeline #29646

Open Abacn opened 10 months ago

Abacn commented 10 months ago

What needs to happen?

Currently TestPipeline run indefinitely: https://github.com/apache/beam/blob/aef21959bf6f41b4fb646ef06da97c8b8adbcb8d/sdks/python/apache_beam/testing/test_pipeline.py#L116

In the case the test timeout, it does not print useful information, just a pytest timeout message and the stacktrace where it gets interrupted (e.g. https://github.com/apache/beam/runs/19275621816)

Failed: Timeout >4500.0s
self = <apache_beam.transforms.ptransform_test.PTransformTest testMethod=test_flatten_one_single_pcollection>

    @pytest.mark.it_validatesrunner
    def test_flatten_one_single_pcollection(self):
>     with TestPipeline() as pipeline:
...
        while thread.is_alive():
>         time.sleep(5.0)
E         Failed: Timeout >4500.0s

However, DataflowRunner.wait_until_finish() indeed supports duration: https://github.com/apache/beam/blob/aef21959bf6f41b4fb646ef06da97c8b8adbcb8d/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L746

and when timeout, it prints the job id so one can find the Dataflow job to investigate: https://github.com/apache/beam/blob/aef21959bf6f41b4fb646ef06da97c8b8adbcb8d/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L769-L771

We should be able to use this functionality for TestPipeline, for example,

with TestPipeline(timeout = 600.0) as p:
   ...

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

GautamGottipati commented 9 months ago

.take-issue

GautamGottipati commented 9 months ago

@Abacn As far as I understand we have to add Timeout for TestPipeline. So I plan to pass this parameter while creation of TestPipeline class by initializing a varible self.duration = timeout in testpipeline.__init_\(duration=None) and pass this initialized variable to state = result.wait_until_finish(duration=duration). But I am unable to understand how is this wait_until_finish() function is called, it would be helpful if I get some explaination regarding this. Secondly, Is my understanding correct.