import os
import pathway as pw
if __name__ == "__main__":
t = pw.demo.range_stream(nb_rows=7, input_rate=1, autocommit_duration_ms=37)
@pw.udf
def foo(val: int) -> int:
if int(os.environ.get("PATHWAY_PROCESS_ID")) > 0:
assert 1 == 2
return val
t2 = t.select(foo=foo(t.value))
pw.io.null.write(t2)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
Run with multiprocessing
pathway spawn -n 4 python filename.py
Relevant log output
Finished dev [optimized + debuginfo] target(s) in 0.91s
Preparing 4 processes (4 total workers)
Finished dev [optimized + debuginfo] target(s) in 0.58s
[2024-05-20T09:49:42]:INFO:Preparing Pathway computation
[2024-05-20T09:49:43]:INFO:Telemetry enabled
[2024-05-20T09:49:43]:INFO:PythonReader-0: 0 entries (1 minibatch(es)) have been sent to the engine
[2024-05-20T09:49:43]:INFO:NullWriter-0: Done writing 0 entries, time 1716191383656. Current batch writes took: 0 ms. All writes so far took: 0 ms.
thread 'pathway:recv-2' panicked at external/timely-dataflow/communication/src/allocator/zero_copy/tcp.rs:20:5:
timely communication error: reading data: socket closed
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'pathway:work-0' panicked at external/timely-dataflow/communication/src/allocator/zero_copy/bytes_exchange.rs:96:48:
MergeQueue poisoned.
thread 'thread 'thread 'pathway:send-3pathway:send-2pathway:send-1' panicked at ' panicked at ' panicked at external/timely-dataflow/communication/src/allocator/zero_copy/bytes_exchange.rsexternal/timely-dataflow/communication/src/allocator/zero_copy/bytes_exchange.rsexternal/timely-dataflow/communication/src/allocator/zero_copy/bytes_exchange.rs:::969696:::4848:
48:
MergeQueue poisoned.:
MergeQueue poisoned.
MergeQueue poisoned.
thread 'thread 'pathway:recv-1<unnamed>' panicked at ' panicked at external/timely-dataflow/communication/src/allocator/zero_copy/tcp.rsexternal/timely-dataflow/communication/src/allocator/zero_copy/initialize.rs::2023::527:
:
timely communication error: reading data: socket closedSend thread panic: Any { .. }
thread 'pathway:recv-3' panicked at external/timely-dataflow/communication/src/allocator/zero_copy/tcp.rs:20:5:
timely communication error: reading data: socket closed
Traceback (most recent call last):
File "/home/mateusz/IoT-Pathway/test8.py", line 16, in <module>
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
File "/home/mateusz/IoT-Pathway/public/pathway/python/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
return beartype.beartype(f)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<@beartype(pathway.internals.run.run) at 0x7f97a922d440>", line 147, in run
File "/home/mateusz/IoT-Pathway/public/pathway/python/pathway/internals/run.py", line 51, in run
).run_outputs()
^^^^^^^^^^^^^
File "/home/mateusz/IoT-Pathway/public/pathway/python/pathway/internals/graph_runner/__init__.py", line 118, in run_outputs
self.run_nodes(self._graph.global_scope.output_nodes, after_build=after_build)
File "/home/mateusz/IoT-Pathway/public/pathway/python/pathway/internals/graph_runner/__init__.py", line 94, in run_nodes
self._run(all_nodes, after_build=after_build)
File "/home/mateusz/IoT-Pathway/public/pathway/python/pathway/internals/graph_runner/__init__.py", line 198, in _run
return api.run_with_new_graph(
^^^^^^^^^^^^^^^^^^^^^^^
pathway.engine.EngineError: worker panic: MergeQueue poisoned.
Sentry is attempting to send 2 pending events
Waiting up to 2 seconds
Press Ctrl-C to quit
What did you expect to happen?
Clear error message stating that assertion failed.
Steps to reproduce
Run with multiprocessing pathway spawn -n 4 python filename.py
Relevant log output
What did you expect to happen?
Clear error message stating that assertion failed.
Version
0.11
Docker Versions (if used)
No response
OS
Linux
On which CPU architecture did you run Pathway?
None