When receiving elements over the grpc stream, the data plane dispatches to per-bundle queues which have a maximum size. Thus if nothing consumes from the queue the data plane can become blocked forever.
This stuckness was observed in Python 2.38 but it seems like it could still trigger by exceptions during bundle processor setup (which currently includes user DoFn setup method).
Some possible fixes:
defer invoking setup until first data has arrived so that it is beneath the data_plane try block. However this wouldn't work if we add support to python sdk to inline data with the control request as the data plane may never be used for a request.
expose the cleanup method on the data plane and invoke it from process_bundle error paths in sdk_worker. This could work but it theoretically is possible to still get stuck if the set size of cleaned up instructions in the data_plane is exceeded.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
What happened?
When receiving elements over the grpc stream, the data plane dispatches to per-bundle queues which have a maximum size. Thus if nothing consumes from the queue the data plane can become blocked forever.
https://github.com/apache/beam/commit/216f0d9f80c3f2a169e139a0818a1b6b059f3219 fixed this issue when an exception was triggered during processing of data for a instruction by removing the channel and remembering that it had been cleaned up. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L578
However if an exception occurs during setup of the bundle processor, _clean_receiving_queue is not invoked. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L687 just appears to propagate the failure to the response on the control stream: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py#L312
This stuckness was observed in Python 2.38 but it seems like it could still trigger by exceptions during bundle processor setup (which currently includes user DoFn setup method).
Some possible fixes:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components