We've been seeing a periodic test failure. The failing tests seem unrelated to the source, which appears to be the death of the mthread cluster fixture sometime during the test run. I've been unable to replicate this locally. This could be due to changes in the circleci environment, changes in dask/distributed. It seems unlikely to be due to anything we're doing in prefect itself, we don't mess with features low-level enough to trigger the tracebacks being seen.
Test Failures
```
==================================== ERRORS ====================================
________ ERROR at teardown of test_gmail_notifier_ignores_ignore_states ________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909992.8139062, deadline = 1601910002.8139062
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f97dc3fd3b0>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
@pytest.fixture(scope="session")
def mthread():
"Multi-threaded executor"
with Client(
processes=False, scheduler_port=0, dashboard_address=":0", n_workers=2
) as client:
> yield DaskExecutor(client.scheduler.address)
tests/conftest.py:42:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/distributed/client.py:1188: in __exit__
self.close()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1423: in close
f = self.cluster.close()
/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py:104: in close
return self.sync(self._close, callback_timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/deploy/cluster.py:183: in sync
return sync(self.loop, func, *args, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/tornado/ioloop.py:743: in _run_callback
ret = callback()
/usr/local/lib/python3.7/site-packages/tornado/ioloop.py:767: in _discard_future_result
future.result()
/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py:401: in _close
await self._correct_state()
/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py:328: in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
/usr/local/lib/python3.7/site-packages/distributed/core.py:810: in send_recv_from_rpc
comm = await self.live_comm()
/usr/local/lib/python3.7/site-packages/distributed/core.py:772: in live_comm
**self.connection_args,
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
--------------------------- Captured stderr teardown ---------------------------
/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py:838: UserWarning: This task is running in a daemonic subprocess; consequently Prefect can only enforce a soft timeout limit, i.e., if your Task reaches its timeout limit it will enter a TimedOut state but continue running in the background.
self.task.run, timeout=self.task.timeout, **raw_inputs
tornado.application - ERROR - Exception in callback functools.partial(>, exception=OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")>)
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 401, in _close
await self._correct_state()
File "/usr/local/lib/python3.7/site-packages/distributed/deploy/spec.py", line 328, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
comm = await self.live_comm()
File "/usr/local/lib/python3.7/site-packages/distributed/core.py", line 772, in live_comm
**self.connection_args,
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
=================================== FAILURES ===================================
_____ test_flow_runner_properly_provides_context_to_task_runners[mthread] ______
executor =
@pytest.mark.parametrize(
"executor", ["local", "mproc", "mthread", "sync"], indirect=True
)
def test_flow_runner_properly_provides_context_to_task_runners(executor):
@prefect.task
def my_name():
return prefect.context.get("my_name")
@prefect.task
def flow_name():
return prefect.context.get("flow_name")
flow = Flow(name="test-dummy", tasks=[flow_name, my_name])
with prefect.context(my_name="marvin"):
res = flow.run(executor=executor)
assert res.result[flow_name].result == "test-dummy"
assert res.result[my_name].result == "marvin"
with Flow("test-map") as f:
tt = flow_name.map(upstream_tasks=[my_name])
with prefect.context(my_name="mapped-marvin"):
res = f.run(executor=executor)
> assert res.result[my_name].result == "mapped-marvin"
E TypeError: 'AttributeError' object is not subscriptable
tests/engine/test_flow_runner.py:813: TypeError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:50:29] INFO - prefect.FlowRunner | Beginning Flow run for 'test-dummy'
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'flow_name': Starting task run...
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'flow_name': finished task run for task with final state: 'Success'
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'my_name': Starting task run...
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'my_name': finished task run for task with final state: 'Success'
[2020-10-05 14:50:30] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-10-05 14:50:30] INFO - prefect.FlowRunner | Beginning Flow run for 'test-map'
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'my_name': Starting task run...
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'my_name': finished task run for task with final state: 'Success'
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'flow_name': Starting task run...
[2020-10-05 14:50:30] INFO - prefect.TaskRunner | Task 'flow_name': finished task run for task with final state: 'Mapped'
[2020-10-05 14:50:31] INFO - prefect.TaskRunner | Task 'flow_name[1]': Starting task run...
[2020-10-05 14:50:31] INFO - prefect.TaskRunner | Task 'flow_name[1]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:31] INFO - prefect.TaskRunner | Task 'flow_name[4]': Starting task run...
[2020-10-05 14:50:31] INFO - prefect.TaskRunner | Task 'flow_name[0]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[0]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[4]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[7]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[2]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[9]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[11]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[2]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[8]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[7]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[3]': Starting task run...
[2020-10-05 14:50:32] INFO - prefect.TaskRunner | Task 'flow_name[11]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:33] INFO - prefect.TaskRunner | Task 'flow_name[8]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:33] INFO - prefect.TaskRunner | Task 'flow_name[9]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:33] INFO - prefect.TaskRunner | Task 'flow_name[3]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:33] INFO - prefect.TaskRunner | Task 'flow_name[5]': Starting task run...
[2020-10-05 14:50:33] INFO - prefect.TaskRunner | Task 'flow_name[5]': finished task run for task with final state: 'Success'
[2020-10-05 14:50:42] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("'NoneType' object has no attribute 'result'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 615, in get_flow_run_state
s.result = [ms.result for ms in s.map_states]
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 615, in
s.result = [ms.result for ms in s.map_states]
AttributeError: 'NoneType' object has no attribute 'result'
[2020-10-05 14:50:42] ERROR - prefect.test-map | Unexpected error occured in FlowRunner: AttributeError("'NoneType' object has no attribute 'result'")
----------------------------- Captured stderr call -----------------------------
distributed.comm.inproc - WARNING - Closing dangling queue in
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test-dummy'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'my_name': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'my_name': finished task run for task with final state: 'Success'
INFO prefect.FlowRunner:flow_runner.py:667 Flow run SUCCESS: all reference tasks succeeded
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test-map'
INFO prefect.TaskRunner:task_runner.py:235 Task 'my_name': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'my_name': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name': finished task run for task with final state: 'Mapped'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[1]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[1]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[4]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[0]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[0]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[4]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[7]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[2]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[9]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[11]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[2]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[8]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[7]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[3]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[11]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[8]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[9]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[3]': finished task run for task with final state: 'Success'
INFO prefect.TaskRunner:task_runner.py:235 Task 'flow_name[5]': Starting task run...
INFO prefect.TaskRunner:task_runner.py:329 Task 'flow_name[5]': finished task run for task with final state: 'Success'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: AttributeError("'NoneType' object has no attribute 'result'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 615, in get_flow_run_state
s.result = [ms.result for ms in s.map_states]
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 615, in
s.result = [ms.result for ms in s.map_states]
AttributeError: 'NoneType' object has no attribute 'result'
ERROR prefect.test-map:flow.py:1251 Unexpected error occured in FlowRunner: AttributeError("'NoneType' object has no attribute 'result'")
__________________ test_flow_runner_handles_timeouts[mthread] __________________
executor =
@pytest.mark.parametrize("executor", ["local", "mthread", "sync"], indirect=True)
def test_flow_runner_handles_timeouts(executor):
sleeper = SlowTask(timeout=1)
with Flow(name="test") as flow:
res = sleeper(3)
state = FlowRunner(flow=flow).run(return_tasks=[res], executor=executor)
assert state.is_failed()
> assert isinstance(state.result[res], TimedOut)
E TypeError: 'OSError' object is not subscriptable
tests/engine/test_flow_runner.py:826: TypeError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:50:48] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:50:58] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
___ TestMapping.test_terminal_mapped_states_are_used_for_flow_state[mthread] ___
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_terminal_mapped_states_are_used_for_flow_state(self, executor):
with Flow(name="test") as flow:
res = ReturnTask().map([0, 1])
state = FlowRunner(flow=flow).run(return_tasks=[res], executor=executor)
assert state.is_failed()
> assert state.result[res].map_states[0].is_successful()
E TypeError: 'OSError' object is not subscriptable
tests/engine/test_flow_runner.py:1109: TypeError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:51:07] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:51:17] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
__ TestMapping.test_mapped_will_use_existing_map_states_if_available[mthread] __
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_mapped_will_use_existing_map_states_if_available(self, executor):
with Flow(name="test") as flow:
res = ReturnTask().map([0, 1])
state = FlowRunner(flow=flow).run(
return_tasks=[res],
executor=executor,
task_states={res: Mapped(map_states=[Success(), Success(result=100)])},
)
> assert state.is_successful()
E assert False
E + where False = >()
E + where > = .is_successful
tests/engine/test_flow_runner.py:1125: AssertionError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:51:20] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:51:30] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
_ TestMapping.test_mapped_will_use_partial_existing_map_states_if_available[mthread] _
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_mapped_will_use_partial_existing_map_states_if_available(self, executor):
with Flow(name="test") as flow:
res = ReturnTask().map([1, 1])
state = FlowRunner(flow=flow).run(
return_tasks=[res],
executor=executor,
task_states={res: Mapped(map_states=[None, Success(result=100)])},
)
assert state.is_failed()
> assert state.result[res].map_states[0].is_failed()
E TypeError: 'OSError' object is not subscriptable
tests/engine/test_flow_runner.py:1143: TypeError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:51:33] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:51:43] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
_____ TestMapping.test_mapped_tasks_dont_run_if_upstream_pending[mthread] ______
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_mapped_tasks_dont_run_if_upstream_pending(self, executor):
with Flow(name="test") as flow:
ups = SuccessTask()
res = ReturnTask().map([ups])
state = FlowRunner(flow=flow).run(
return_tasks=flow.tasks,
executor=executor,
task_states={ups: Retrying(start_time=pendulum.now().add(hours=1))},
)
> assert state.is_running()
E assert False
E + where False = >()
E + where > = .is_running
tests/engine/test_flow_runner.py:1161: AssertionError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:51:45] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:51:55] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
____________ TestMapping.test_mapped_task_can_be_scheduled[mthread] ____________
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_mapped_task_can_be_scheduled(self, executor):
with Flow(name="test") as flow:
res = ReturnTask().map([0, 0])
state = FlowRunner(flow=flow).run(
return_tasks=[res],
executor=executor,
task_states={res: Scheduled(start_time=pendulum.now().subtract(minutes=1))},
)
> assert state.is_successful()
E assert False
E + where False = >()
E + where > = .is_successful
tests/engine/test_flow_runner.py:1178: AssertionError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:51:59] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:52:09] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
______ TestMapping.test_mapped_task_can_be_scheduled_for_future[mthread] _______
self =
executor =
@pytest.mark.parametrize(
"executor", ["local", "mthread", "mproc", "sync"], indirect=True
)
def test_mapped_task_can_be_scheduled_for_future(self, executor):
with Flow(name="test") as flow:
res = ReturnTask().map([0, 0])
state = FlowRunner(flow=flow).run(
return_tasks=[res],
executor=executor,
task_states={res: Scheduled(start_time=pendulum.now().add(hours=1))},
)
> assert state.is_running()
E assert False
E + where False = >()
E + where > = .is_running
tests/engine/test_flow_runner.py:1193: AssertionError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:52:11] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:52:21] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
______________ test_dask_executor_with_flow_runner_sets_task_keys ______________
mthread =
def test_dask_executor_with_flow_runner_sets_task_keys(mthread):
"""Integration test that ensures the flow runner forwards the proper
information to the DaskExecutor so that key names are set based on
the task name"""
key_names = set()
class MyExecutor(Executor):
@contextmanager
def start(self):
with mthread.start():
yield
def submit(self, *args, **kwargs):
fut = mthread.submit(*args, **kwargs)
key_names.add(fut.key.split("-")[0])
return fut
def wait(self, x):
return mthread.wait(x)
@prefect.task
def inc(x):
return x + 1
@prefect.task
def do_sum(x):
return sum(x)
with Flow("test") as flow:
a = inc(1)
b = inc.map(range(3))
c = do_sum(b)
flow.run(executor=MyExecutor())
> assert key_names == {"inc", "do_sum"}
E AssertionError: assert set() == {'inc', 'do_sum'}
E Extra items in the right set:
E 'inc'
E 'do_sum'
E Full diff:
E - {'inc', 'do_sum'}
E + set()
tests/engine/test_flow_runner.py:1487: AssertionError
----------------------------- Captured stdout call -----------------------------
[2020-10-05 14:52:33] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-05 14:52:43] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/root/project/tests/engine/test_flow_runner.py", line 1461, in start
with mthread.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
[2020-10-05 14:52:43] ERROR - prefect.test | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
------------------------------ Captured log call -------------------------------
INFO prefect.FlowRunner:flow_runner.py:224 Beginning Flow run for 'test'
ERROR prefect.FlowRunner:runner.py:66 Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/root/project/tests/engine/test_flow_runner.py", line 1461, in start
with mthread.start():
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 252, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 744, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 949, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1039, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1097, in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
ERROR prefect.test:flow.py:1251 Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time")
_____________ test_submit_does_not_assume_pure_functions[mthread] ______________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909729.1363144, deadline = 1601909739.1363144
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f976084aef0>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
executor =
@pytest.mark.parametrize("executor", ["mproc", "mthread", "sync"], indirect=True)
def test_submit_does_not_assume_pure_functions(executor):
def random_fun():
return random.random()
> with executor.start():
tests/engine/executors/test_executors.py:226:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/contextlib.py:112: in __enter__
return next(self.gen)
/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py:252: in start
with Client(self.address, **self.client_kwargs) as client:
/usr/local/lib/python3.7/site-packages/distributed/client.py:744: in __init__
self.start(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:949: in start
sync(self.loop, self._start, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1039: in _start
await self._ensure_connected(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:1097: in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
________________ TestDaskExecutor.test_submit_and_wait[mthread] ________________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909739.8044631, deadline = 1601909749.8044631
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f9761237680>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
self =
executor =
@pytest.mark.parametrize("executor", ["mproc", "mthread"], indirect=True)
def test_submit_and_wait(self, executor):
to_compute = {}
> with executor.start():
tests/engine/executors/test_executors.py:237:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/contextlib.py:112: in __enter__
return next(self.gen)
/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py:252: in start
with Client(self.address, **self.client_kwargs) as client:
/usr/local/lib/python3.7/site-packages/distributed/client.py:744: in __init__
self.start(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:949: in start
sync(self.loop, self._start, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1039: in _start
await self._ensure_connected(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:1097: in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
_______________ TestDaskExecutor.test_runs_in_parallel[mthread] ________________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909751.296438, deadline = 1601909761.296438
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f9760380680>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
self =
executor =
@pytest.mark.skipif(
sys.platform == "win32", reason="Nondeterministically fails on Windows machines"
)
@pytest.mark.parametrize("executor", ["mproc", "mthread"], indirect=True)
def test_runs_in_parallel(self, executor):
# related: "https://stackoverflow.com/questions/52121686/why-is-dask-distributed-not-parallelizing-the-first-run-of-my-workflow"
def record_times():
start_time = time.time()
time.sleep(random.random() * 0.25 + 0.5)
end_time = time.time()
return start_time, end_time
> with executor.start():
tests/engine/executors/test_executors.py:258:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/contextlib.py:112: in __enter__
return next(self.gen)
/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py:252: in start
with Client(self.address, **self.client_kwargs) as client:
/usr/local/lib/python3.7/site-packages/distributed/client.py:744: in __init__
self.start(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:949: in start
sync(self.loop, self._start, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1039: in _start
await self._ensure_connected(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:1097: in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
_________________ TestDaskExecutor.test_submit_sets_task_name __________________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909766.590488, deadline = 1601909776.590488
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f97609c3560>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
self =
mthread =
def test_submit_sets_task_name(self, mthread):
> with mthread.start():
tests/engine/executors/test_executors.py:404:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/contextlib.py:112: in __enter__
return next(self.gen)
/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py:252: in start
with Client(self.address, **self.client_kwargs) as client:
/usr/local/lib/python3.7/site-packages/distributed/client.py:744: in __init__
self.start(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:949: in start
sync(self.loop, self._start, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1039: in _start
await self._ensure_connected(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:1097: in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
___________ TestDaskExecutor.test_is_pickleable_after_start[mthread] ___________
addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True
handshake_overrides = None
connection_args = {'require_encryption': False, 'ssl_context': None}
scheme = 'inproc'
backend =
comm = None, start = 1601909777.4789817, deadline = 1601909787.4789817
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
_raise = ._raise at 0x7f97605ed8c0>
async def connect(
addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args
):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
raise IOError(msg)
backoff = 0.01
if timeout and timeout / 20 < backoff:
backoff = timeout / 20
retry_timeout_backoff = random.randrange(140, 160) / 100
# This starts a thread
while True:
try:
while deadline - time() > 0:
async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
)
local_info = {
**comm.handshake_info(),
**(handshake_overrides or {}),
}
try:
handshake = await asyncio.wait_for(comm.read(), 1)
write = await asyncio.wait_for(comm.write(local_info), 1)
# This would be better, but connections leak if worker is closed quickly
# write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
comm.local_info = local_info
comm.local_info["address"] = comm._local_addr
comm.handshake_options = comm.handshake_configuration(
comm.local_info, comm.remote_info
)
return comm
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
break
if not comm:
> _raise(error)
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:322:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
During handling of the above exception, another exception occurred:
self =
executor =
@pytest.mark.parametrize("executor", ["mproc", "mthread"], indirect=True)
def test_is_pickleable_after_start(self, executor):
> with executor.start():
tests/engine/executors/test_executors.py:425:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/contextlib.py:112: in __enter__
return next(self.gen)
/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py:252: in start
with Client(self.address, **self.client_kwargs) as client:
/usr/local/lib/python3.7/site-packages/distributed/client.py:744: in __init__
self.start(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:949: in start
sync(self.loop, self._start, **kwargs)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:339: in sync
raise exc.with_traceback(tb)
/usr/local/lib/python3.7/site-packages/distributed/utils.py:323: in f
result[0] = yield future
/usr/local/lib/python3.7/site-packages/tornado/gen.py:735: in run
value = future.result()
/usr/local/lib/python3.7/site-packages/distributed/client.py:1039: in _start
await self._ensure_connected(timeout=timeout)
/usr/local/lib/python3.7/site-packages/distributed/client.py:1097: in _ensure_connected
self.scheduler.address, timeout=timeout, **self.connection_args
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:334: in connect
_raise(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
error = "Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time"
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
error,
)
> raise IOError(msg)
E OSError: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: Timed out trying to connect to 'inproc://192.168.224.3/513/1' after 10 s: connect() didn't finish in time
/usr/local/lib/python3.7/site-packages/distributed/comm/core.py:275: OSError
=============================== warnings summary ===============================
../../usr/local/lib/python3.7/site-packages/_pytest/config/__init__.py:1230
/usr/local/lib/python3.7/site-packages/_pytest/config/__init__.py:1230: PytestConfigWarning: Unknown config option: usedevelop
self._warn_or_fail_if_strict("Unknown config option: {}\n".format(key))
tests/core/test_flow.py::test_copy
tests/core/test_flow.py::test_infer_root_tasks
tests/core/test_flow.py::test_set_reference_tasks
tests/core/test_flow.py::test_reset_reference_tasks_to_terminal_tasks
/usr/local/lib/python3.7/contextlib.py:119: UserWarning: Tasks were created but not added to the flow: {, , }. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
tests/core/test_flow.py::test_infer_terminal_tasks
tests/core/test_flow.py::test_reference_tasks_are_terminal_tasks_by_default
/usr/local/lib/python3.7/contextlib.py:119: UserWarning: Tasks were created but not added to the flow: {, , , }. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
tests/core/test_flow.py::test_parameters_can_not_be_downstream_dependencies
tests/core/test_task.py::TestTaskCopy::test_copy_warns_if_dependencies_in_active_flow
tests/utilities/test_tasks.py::test_copying_then_setting_tags_doesnt_leak_backwards
/usr/local/lib/python3.7/contextlib.py:119: UserWarning: Tasks were created but not added to the flow: {}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
tests/core/test_flow.py::TestFlowDiagnostics::test_flow_diagnostics
/root/project/tests/core/test_flow.py:2494: UserWarning: Result Handlers are deprecated; please use the new style Result classes instead.
result_handler=prefect.engine.result_handlers.JSONResultHandler(),
tests/core/test_task_operators.py::TestMagicInteractionMethods::test_chained_operators
/usr/local/lib/python3.7/contextlib.py:119: UserWarning: Tasks were created but not added to the flow: {, }. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
tests/engine/test_task_runner.py::TestTargetExistsStep::test_check_target_exists
/root/project/tests/engine/test_task_runner.py:1453: UserWarning: Both `result.location` and `target` were provided. The `target` value will be used.
my_task = Task(target="{task_name}-test-file", result=result)
tests/engine/test_task_runner.py::TestTargetExistsStep::test_check_target_uses_callable
/root/project/tests/engine/test_task_runner.py:1489: UserWarning: Both `result.location` and `target` were provided. The `target` value will be used.
my_task = Task(target=lambda **kwargs: "testcall", result=result)
tests/engine/test_task_runner.py::TestTargetExistsStep::test_check_target_callable_uses_context
/root/project/tests/engine/test_task_runner.py:1509: UserWarning: Both `result.location` and `target` were provided. The `target` value will be used.
my_task = Task(target=lambda **kwargs: "{task_name}", result=result)
tests/engine/test_task_runner.py: 6 warnings
tests/engine/cloud/test_cloud_task_runner.py: 6 warnings
tests/environments/storage/test_docker_healthcheck.py: 2 warnings
/usr/local/lib/python3.7/site-packages/prefect/tasks/core/function.py:63: UserWarning: Result Handlers are deprecated; please use the new style Result classes instead.
super().__init__(name=name, **kwargs)
tests/engine/cloud/test_cloud_flow_runner.py::test_starting_at_arbitrary_loop_index_from_cloud_context
/root/project/tests/engine/cloud/test_cloud_flow_runner.py:462: UserWarning: Result Handlers are deprecated; please use the new style Result classes instead.
with prefect.Flow(name="looping", result_handler=JSONResultHandler()) as f:
tests/environments/storage/test_docker_healthcheck.py::TestSerialization::test_cloudpickle_deserialization_check_raises_on_bad_imports
/root/project/tests/environments/storage/test_docker_healthcheck.py:28: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules https://docs.prefect.io/api/latest/environments/storage.html#docker
["{}".format(f.name)]
tests/environments/storage/test_docker_healthcheck.py::TestResultCheck::test_doesnt_raise_for_checkpointed_tasks_if_flow_has_result_handler[kwargs0]
tests/environments/storage/test_docker_healthcheck.py::TestResultCheck::test_doesnt_raise_for_checkpointed_tasks_if_flow_has_result_handler[kwargs1]
/root/project/tests/environments/storage/test_docker_healthcheck.py:126: UserWarning: Result Handlers are deprecated; please use the new style Result classes instead.
f = Flow("foo-test", tasks=[up], result_handler=42)
tests/serialization/test_environments.py::test_serialize_dask_environment_with_private_registry
/root/project/tests/serialization/test_environments.py:123: UserWarning: The `private_registry` and `docker_secret` options are deprecated. Please set `imagePullSecrets` on custom work and scheduler YAML manifests.
private_registry=True, docker_secret="FOO"
tests/serialization/test_environments.py::test_serialize_dask_environment_with_private_registry
/usr/local/lib/python3.7/site-packages/prefect/utilities/serialization.py:164: UserWarning: The `private_registry` and `docker_secret` options are deprecated. Please set `imagePullSecrets` on custom work and scheduler YAML manifests.
return object_class(**init_data)
tests/serialization/test_environments.py::test_serialize_remote_environment
/root/project/tests/serialization/test_environments.py:235: UserWarning: `RemoteEnvironment` is deprecated, please use `LocalEnvironment` instead.
env = environments.RemoteEnvironment()
tests/serialization/test_environments.py::test_serialize_remote_environment
tests/serialization/test_environments.py::test_serialize_remote_environment_with_labels
tests/serialization/test_environments.py::test_serialize_local_environment_with_labels
tests/serialization/test_environments.py::test_deserialize_old_env_payload
tests/serialization/test_deserialization/test_flow_deserialization.py::test_old_flows_deserialize[/root/project/tests/serialization/test_deserialization/flows/flow-0_6_2.json]
tests/serialization/test_deserialization/test_flow_deserialization.py::test_old_flows_deserialize[/root/project/tests/serialization/test_deserialization/flows/flow-0_6_1.json]
/usr/local/lib/python3.7/site-packages/prefect/utilities/serialization.py:164: UserWarning: `RemoteEnvironment` is deprecated, please use `LocalEnvironment` instead.
return object_class(**init_data)
tests/serialization/test_environments.py::test_serialize_remote_environment_with_labels
/root/project/tests/serialization/test_environments.py:253: UserWarning: `RemoteEnvironment` is deprecated, please use `LocalEnvironment` instead.
env = environments.RemoteEnvironment(labels=["bob", "alice"])
tests/serialization/test_environments.py::test_serialize_remote_dask_environment
/root/project/tests/serialization/test_environments.py:270: UserWarning: `RemoteDaskEnvironment` is deprecated, please use `LocalEnvironment` with a `DaskExecutor` instead.
env = environments.RemoteDaskEnvironment(address="test")
tests/serialization/test_environments.py::test_serialize_remote_dask_environment
tests/serialization/test_environments.py::test_serialize_remote_dask_environment_with_labels
/usr/local/lib/python3.7/site-packages/prefect/utilities/serialization.py:164: UserWarning: `RemoteDaskEnvironment` is deprecated, please use `LocalEnvironment` with a `DaskExecutor` instead.
return object_class(**init_data)
tests/serialization/test_environments.py::test_serialize_remote_dask_environment_with_labels
/root/project/tests/serialization/test_environments.py:287: UserWarning: `RemoteDaskEnvironment` is deprecated, please use `LocalEnvironment` with a `DaskExecutor` instead.
address="test", labels=["bob", "alice"], executor_kwargs={"not": "present"}
tests/utilities/test_tasks.py::TestTaskDecorator::test_task_decorator_with_required_args_must_be_called_with_args
/usr/local/lib/python3.7/contextlib.py:119: UserWarning: Tasks were created but not added to the flow: {}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
next(self.gen)
-- Docs: https://docs.pytest.org/en/stable/warnings.html
----------- coverage: platform linux, python 3.7.9-final-0 -----------
Coverage XML written to file /tmp/workspace/coverage/vanilla-coverage.xml
=========================== short test summary info ============================
FAILED tests/engine/test_flow_runner.py::test_flow_runner_properly_provides_context_to_task_runners[mthread]
FAILED tests/engine/test_flow_runner.py::test_flow_runner_handles_timeouts[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_terminal_mapped_states_are_used_for_flow_state[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_mapped_will_use_existing_map_states_if_available[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_mapped_will_use_partial_existing_map_states_if_available[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_mapped_tasks_dont_run_if_upstream_pending[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_mapped_task_can_be_scheduled[mthread]
FAILED tests/engine/test_flow_runner.py::TestMapping::test_mapped_task_can_be_scheduled_for_future[mthread]
FAILED tests/engine/test_flow_runner.py::test_dask_executor_with_flow_runner_sets_task_keys
FAILED tests/engine/executors/test_executors.py::test_submit_does_not_assume_pure_functions[mthread]
FAILED tests/engine/executors/test_executors.py::TestDaskExecutor::test_submit_and_wait[mthread]
FAILED tests/engine/executors/test_executors.py::TestDaskExecutor::test_runs_in_parallel[mthread]
FAILED tests/engine/executors/test_executors.py::TestDaskExecutor::test_submit_sets_task_name
FAILED tests/engine/executors/test_executors.py::TestDaskExecutor::test_is_pickleable_after_start[mthread]
ERROR tests/utilities/notifications/test_notifications.py::test_gmail_notifier_ignores_ignore_states
SKIPPED [1] tests/agent/test_fargate_agent.py:11: could not import 'boto3': No module named 'boto3'
SKIPPED [1] tests/agent/test_k8s_agent.py:6: could not import 'kubernetes': No module named 'kubernetes'
SKIPPED [1] tests/cli/test_agent.py:8: could not import 'boto3': No module named 'boto3'
SKIPPED [1] tests/engine/results/test_azure_result.py:11: could not import 'azure.storage.blob': No module named 'azure'
SKIPPED [1] tests/engine/results/test_gcs_result.py:12: could not import 'google.cloud': No module named 'google'
SKIPPED [1] tests/engine/results/test_s3_result.py:10: could not import 'boto3': No module named 'boto3'
SKIPPED [8] tests/environments/execution/__init__.py:3: could not import 'boto3': No module named 'boto3'
SKIPPED [1] tests/environments/storage/test_azure_storage.py:8: could not import 'azure.storage.blob': No module named 'azure'
SKIPPED [1] tests/environments/storage/test_github_storage.py:8: could not import 'github': No module named 'github'
SKIPPED [1] tests/environments/storage/test_s3_storage.py:10: could not import 'boto3': No module named 'boto3'
SKIPPED [1] tests/tasks/airtable/__init__.py:3: could not import 'airtable': No module named 'airtable'
SKIPPED [4] tests/tasks/aws/__init__.py:3: could not import 'boto3': No module named 'boto3'
SKIPPED [2] tests/tasks/azure/__init__.py:3: could not import 'azure.storage.blob': No module named 'azure'
SKIPPED [2] tests/tasks/azureml/__init__.py:3: could not import 'azureml': No module named 'azureml'
SKIPPED [1] tests/tasks/dbt/__init__.py:3: could not import 'dbt': No module named 'dbt'
SKIPPED [1] tests/tasks/dropbox/__init__.py:3: could not import 'dropbox': No module named 'dropbox'
SKIPPED [3] tests/tasks/gcp/__init__.py:3: could not import 'google.cloud': No module named 'google'
SKIPPED [1] tests/tasks/great_expectations/__init__.py:3: could not import 'great_expectations': No module named 'great_expectations'
SKIPPED [1] tests/tasks/jira/test_jira_service_desk.py:8: could not import 'jira': No module named 'jira'
SKIPPED [1] tests/tasks/jira/test_jira_task.py:8: could not import 'jira': No module named 'jira'
SKIPPED [4] tests/tasks/kubernetes/__init__.py:3: could not import 'kubernetes': No module named 'kubernetes'
SKIPPED [1] tests/tasks/mysql/__init__.py:3: could not import 'pymysql': No module named 'pymysql'
SKIPPED [1] tests/tasks/notifications/test_pushbullet.py:8: could not import 'pushbullet': No module named 'pushbullet'
SKIPPED [1] tests/tasks/postgres/__init__.py:3: could not import 'psycopg2': No module named 'psycopg2'
SKIPPED [1] tests/tasks/redis/__init__.py:3: could not import 'redis': No module named 'redis'
SKIPPED [1] tests/tasks/rss/__init__.py:3: could not import 'feedparser': No module named 'feedparser'
SKIPPED [1] tests/tasks/snowflake/__init__.py:3: could not import 'snowflake.connector': No module named 'snowflake'
SKIPPED [1] tests/tasks/spacy/__init__.py:3: could not import 'spacy': No module named 'spacy'
SKIPPED [1] tests/tasks/templates/test_jinja.py:13: Jinja requirements not installed.
SKIPPED [1] tests/tasks/twitter/__init__.py:3: could not import 'tweepy': No module named 'tweepy'
SKIPPED [1] tests/utilities/test_aws.py:5: could not import 'boto3': No module named 'boto3'
SKIPPED [1] tests/utilities/test_gcp.py:4: could not import 'google.cloud': No module named 'google'
SKIPPED [1] tests/utilities/test_git.py:5: could not import 'github': No module named 'github'
SKIPPED [1] tests/utilities/test_kubernetes.py:5: could not import 'kubernetes': No module named 'kubernetes'
SKIPPED [1] tests/utilities/notifications/test_jira_notification.py:5: could not import 'jira': No module named 'jira'
SKIPPED [1] tests/core/test_flow.py:1246: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1258: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1275: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1288: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1301: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1314: viz extras not installed.
SKIPPED [3] tests/core/test_flow.py:1331: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1347: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1381: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1413: viz extras not installed.
SKIPPED [5] tests/core/test_flow.py:1447: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:1465: viz extras not installed.
SKIPPED [1] tests/core/test_flow.py:2971: Result handlers not yet deprecated
SKIPPED [1] tests/core/test_task.py:649: Result handlers not yet deprecated
SKIPPED [1] tests/engine/test_serializers.py:87: Pandas not installed
SKIPPED [5] tests/engine/test_serializers.py:83: Pandas not installed
SKIPPED [1] tests/engine/test_serializers.py:134: Pandas not installed
SKIPPED [1] tests/engine/results/test_results.py:241: Windows specific test
SKIPPED [1] tests/environments/storage/test_docker_storage.py:325: Needs to be mocked so it can work on CircleCI
SKIPPED [1] tests/environments/storage/test_docker_storage.py:350: Needs to be mocked so it can work on CircleCI
SKIPPED [2] tests/utilities/test_filesystems.py:19: Windows only test
SKIPPED [1] tests/utilities/test_filesystems.py:56: could not import 'prefect.utilities.gcp': No module named 'google'
SKIPPED [1] tests/utilities/test_filesystems.py:70: could not import 'prefect.utilities.aws': No module named 'boto3'
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestGCSResultHandler::test_gcs_init
google extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestGCSResultHandler::test_gcs_writes_to_blob_prefixed_by_date_suffixed_by_prefect
google extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestGCSResultHandler::test_gcs_uses_custom_secret_name
google extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestGCSResultHandler::test_gcs_writes_binary_string
google extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestGCSResultHandler::test_gcs_handler_is_pickleable
google extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_client_init_uses_secrets
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_client_init_uses_custom_secrets
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_writes_to_blob_prefixed_by_date_suffixed_by_prefect
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_handler_is_pickleable
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_uninitialized_client
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_with_kwargs_invalid_service_name
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_with_kwarg_overwrite_aws_keys
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_with_kwargs_aws_keys
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestS3ResultHandler::test_s3_with_kwargs
aws extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestAzureResultHandler::test_azure_service_init_uses_secrets_with_account_key
azure extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestAzureResultHandler::test_azure_service_init_uses_secrets_with_sas_token
azure extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestAzureResultHandler::test_azure_service_init_uses_custom_secrets
azure extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestAzureResultHandler::test_azure_service_init_uses_connection_string_over_secret
azure extras not installed.
XFAIL tests/engine/result_handlers/test_result_handlers.py::TestAzureResultHandler::test_azure_service_writes_to_blob_prefixed_by_date_suffixed_by_prefect
azure extras not installed.
= 14 failed, 3879 passed, 86 skipped, 19 xfailed, 16 xpassed, 48 warnings, 1 error in 995.27s (0:16:35) =
Too long with no output (exceeded 10m0s): context deadline exceeded
```
We've been seeing a periodic test failure. The failing tests seem unrelated to the source, which appears to be the death of the
mthread
cluster fixture sometime during the test run. I've been unable to replicate this locally. This could be due to changes in the circleci environment, changes indask/distributed
. It seems unlikely to be due to anything we're doing in prefect itself, we don't mess with features low-level enough to trigger the tracebacks being seen.Test Failures
``` ==================================== ERRORS ==================================== ________ ERROR at teardown of test_gmail_notifier_ignores_ignore_states ________ addr = 'inproc://192.168.224.3/513/1', timeout = 10, deserialize = True handshake_overrides = None connection_args = {'require_encryption': False, 'ssl_context': None} scheme = 'inproc' backend =