Closed rettigl closed 9 months ago
darn, I removed the error message by re-running the job...
It can be seen by just selecting the run version: https://github.com/OpenCOMPES/sed/actions/runs/6942029903/job/18884095198
Not sure why this happens. Might be hard to reproduce so it could be a race condition. I think if we give unique filenames to each test outputs/inputs, the data race would go away.
Happened again:
=================================== FAILURES ===================================
__________________ test_forward_fill_lazy_multiple_iterations __________________
[gw1] linux -- Python 3.9.18 /home/runner/work/sed/sed/.venv/bin/python
cls = <class '_pytest.runner.CallInfo'>
func = <function call_runtest_hook.<locals>.<lambda> at 0x7f189becc9d0>
when = 'call'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
@classmethod
def from_call(
cls,
func: "Callable[[], TResult]",
when: "Literal['collect', 'setup', 'call', 'teardown']",
reraise: Optional[
Union[Type[BaseException], Tuple[Type[BaseException], ...]]
] = None,
) -> "CallInfo[TResult]":
"""Call func, wrapping the result in a CallInfo.
:param func:
The function to call. Called without arguments.
:param when:
The phase in which the function is called.
:param reraise:
Exception or exceptions that shall propagate if raised by the
function, instead of being wrapped in the CallInfo.
"""
excinfo = None
start = timing.time()
precise_start = timing.perf_counter()
try:
> result: Optional[TResult] = func()
.venv/lib/python3.9/site-packages/_pytest/runner.py:341:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> lambda: ihook(item=item, **kwds), when=when, reraise=reraise
)
.venv/lib/python3.9/site-packages/_pytest/runner.py:262:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <HookCaller 'pytest_runtest_call'>
kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def __call__(self, **kwargs: object) -> Any:
"""Call the hook.
Only accepts keyword arguments, which should match the hook
specification.
Returns the result(s) of calling all registered plugins, see
:ref:`calling`.
"""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
.venv/lib/python3.9/site-packages/pluggy/_hooks.py:493:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7f18e7388070>
hook_name = 'pytest_runtest_call'
methods = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.9/s...bleexception' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/unraisableexception.py'>>, ...]
kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _hookexec(
self,
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
.venv/lib/python3.9/site-packages/pluggy/_manager.py:115:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.9/s...bleexception' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/unraisableexception.py'>>, ...]
caller_kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
raise exception.with_traceback(exception.__traceback__)
else:
return result
# Slow path - need to support old-style wrappers.
else:
if firstresult: # first result hooks return a single value
outcome: Result[object | list[object]] = Result(
results[0] if results else None, exception
)
else:
outcome = Result(results, exception)
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
if isinstance(teardown, tuple):
try:
teardown[0].send(outcome)
_raise_wrapfail(teardown[0], "has second yield")
except StopIteration:
pass
else:
try:
if outcome._exception is not None:
teardown.throw(outcome._exception)
else:
teardown.send(outcome._result)
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close()
except StopIteration as si:
outcome.force_result(si.value)
continue
except BaseException as e:
outcome.force_exception(e)
continue
_raise_wrapfail(teardown, "has second yield")
> return outcome.get_result()
.venv/lib/python3.9/site-packages/pluggy/_callers.py:152:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pluggy._result.Result object at 0x7f189c845a00>
def get_result(self) -> ResultType:
"""Get the result(s) for this hook call.
If the hook was marked as a ``firstresult`` only a single value
will be returned, otherwise a list of results.
"""
__tracebackhide__ = True
exc = self._exception
if exc is None:
return cast(ResultType, self._result)
else:
> raise exc.with_traceback(exc.__traceback__)
.venv/lib/python3.9/site-packages/pluggy/_result.py:114:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.9/s...bleexception' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/unraisableexception.py'>>, ...]
caller_kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
> res = hook_impl.function(*args)
.venv/lib/python3.9/site-packages/pluggy/_callers.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function test_forward_fill_lazy_multiple_iterations>
def pytest_runtest_call(item: Item) -> None:
_update_current_test_var(item, "call")
try:
del sys.last_type
del sys.last_value
del sys.last_traceback
except AttributeError:
pass
try:
item.runtest()
except Exception as e:
# Store trace info to allow postmortem debugging
sys.last_type = type(e)
sys.last_value = e
assert e.__traceback__ is not None
# Skip *this* frame
sys.last_traceback = e.__traceback__.tb_next
> raise e
.venv/lib/python3.9/site-packages/_pytest/runner.py:177:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function test_forward_fill_lazy_multiple_iterations>
def pytest_runtest_call(item: Item) -> None:
_update_current_test_var(item, "call")
try:
del sys.last_type
del sys.last_value
del sys.last_traceback
except AttributeError:
pass
try:
> item.runtest()
.venv/lib/python3.9/site-packages/_pytest/runner.py:169:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Function test_forward_fill_lazy_multiple_iterations>
def runtest(self) -> None:
"""Execute the underlying test function."""
> self.ihook.pytest_pyfunc_call(pyfuncitem=self)
.venv/lib/python3.9/site-packages/_pytest/python.py:1792:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <HookCaller 'pytest_pyfunc_call'>
kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def __call__(self, **kwargs: object) -> Any:
"""Call the hook.
Only accepts keyword arguments, which should match the hook
specification.
Returns the result(s) of calling all registered plugins, see
:ref:`calling`.
"""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
.venv/lib/python3.9/site-packages/pluggy/_hooks.py:493:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7f18e7388070>
hook_name = 'pytest_pyfunc_call'
methods = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/python.py'>>]
kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _hookexec(
self,
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
.venv/lib/python3.9/site-packages/pluggy/_manager.py:115:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_pyfunc_call'
hook_impls = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/python.py'>>]
caller_kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
> raise exception.with_traceback(exception.__traceback__)
.venv/lib/python3.9/site-packages/pluggy/_callers.py:113:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_pyfunc_call'
hook_impls = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.9/site-packages/_pytest/python.py'>>]
caller_kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
> res = hook_impl.function(*args)
.venv/lib/python3.9/site-packages/pluggy/_callers.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pyfuncitem = <Function test_forward_fill_lazy_multiple_iterations>
@hookimpl(trylast=True)
def pytest_pyfunc_call(pyfuncitem: "Function") -> Optional[object]:
testfunction = pyfuncitem.obj
if is_async_function(testfunction):
async_warn_and_skip(pyfuncitem.nodeid)
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
> result = testfunction(**testargs)
.venv/lib/python3.9/site-packages/_pytest/python.py:194:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def test_forward_fill_lazy_multiple_iterations() -> None:
"""test that a lazy forward fill works as expected with multiple iterations"""
t_df = df.copy()
t_df["energy"][5:35] = np.nan
t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS)
t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max", iterations=5)
t_df = t_df.ffill()
> pd.testing.assert_frame_equal(t_df, t_dask_df.compute())
tests/test_dfops.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeSt... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers
kwargs = {}
def compute(self, **kwargs):
"""Compute this dask collection
This turns a lazy Dask collection into its in-memory equivalent.
For example a Dask array turns into a NumPy array and a Dask dataframe
turns into a Pandas dataframe. The entire dataset must fit into memory
before calling this operation.
Parameters
----------
scheduler : string, optional
Which scheduler to use like "threads", "synchronous" or "processes".
If not provided, the default is to check the global settings first,
and then fall back to the collection defaults.
optimize_graph : bool, optional
If True [default], the graph is optimized before computation.
Otherwise the graph is run as is. This can be useful for debugging.
kwargs
Extra keywords to forward to the scheduler function.
See Also
--------
dask.base.compute
"""
> (result,) = compute(self, traverse=False, **kwargs)
.venv/lib/python3.9/site-packages/dask/base.py:315:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
traverse = False, optimize_graph = True, scheduler = None, get = None
args = (Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeS... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers,)
kwargs = {}
collections = [Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeS... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers]
repack = <function unpack_collections.<locals>.repack at 0x7f189bb5daf0>
schedule = <function get at 0x7f18e49c7d30>
dsk = HighLevelGraph with 11 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f189afc3ca0>
0. from_pandas-067125a4f.... overlap-concat-4dbe302a5e47d6beafd9e353a4b5cbb9
10. overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748
keys = [[('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 0), ('overlap-forward_fill_partition-e3c44c45087...on-e3c44c450878070118cc647f47aa1748', 4), ('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 5), ...]]
postcomputes = [(<function finalize at 0x7f18b24795e0>, ())]
def compute(
*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs
):
"""Compute several dask collections at once.
Parameters
----------
args : object
Any number of objects. If it is a dask object, it's computed and the
result is returned. By default, python builtin collections are also
traversed to look for dask objects (for more information see the
``traverse`` keyword). Non-dask arguments are passed through unchanged.
traverse : bool, optional
By default dask traverses builtin python collections looking for dask
objects passed to ``compute``. For large collections this can be
expensive. If none of the arguments contain any dask objects, set
``traverse=False`` to avoid doing this traversal.
scheduler : string, optional
Which scheduler to use like "threads", "synchronous" or "processes".
If not provided, the default is to check the global settings first,
and then fall back to the collection defaults.
optimize_graph : bool, optional
If True [default], the optimizations for each collection are applied
before computation. Otherwise the graph is run as is. This can be
useful for debugging.
get : ``None``
Should be left to ``None`` The get= keyword has been removed.
kwargs
Extra keywords to forward to the scheduler function.
Examples
--------
>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> dask.compute(a, b)
(45, 4.5)
By default, dask objects inside python collections will also be computed:
>>> dask.compute({'a': a, 'b': b, 'c': 1})
({'a': 45, 'b': 4.5, 'c': 1},)
"""
collections, repack = unpack_collections(*args, traverse=traverse)
if not collections:
return args
schedule = get_scheduler(
scheduler=scheduler,
collections=collections,
get=get,
)
dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
keys, postcomputes = [], []
for x in collections:
keys.append(x.__dask_keys__())
postcomputes.append(x.__dask_postcompute__())
> results = schedule(dsk, keys, **kwargs)
.venv/lib/python3.9/site-packages/dask/base.py:600:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dsk = HighLevelGraph with 11 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f189afc3ca0>
0. from_pandas-067125a4f.... overlap-concat-4dbe302a5e47d6beafd9e353a4b5cbb9
10. overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748
keys = [[('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 0), ('overlap-forward_fill_partition-e3c44c45087...on-e3c44c450878070118cc647f47aa1748', 4), ('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 5), ...]]
cache = None, num_workers = None
pool = <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f189af64100>
kwargs = {}, thread = <_MainThread(MainThread, started 139744950561664)>
def get(
dsk: Mapping,
keys: Sequence[Hashable] | Hashable,
cache=None,
num_workers=None,
pool=None,
**kwargs,
):
"""Threaded cached implementation of dask.get
Parameters
----------
dsk: dict
A dask dictionary specifying a workflow
keys: key or list of keys
Keys corresponding to desired data
num_workers: integer of thread count
The number of threads to use in the ThreadPool that will actually execute tasks
cache: dict-like (optional)
Temporary storage of results
Examples
--------
>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}
>>> get(dsk, 'w')
4
>>> get(dsk, ['w', 'y'])
(4, 2)
"""
global default_pool
pool = pool or config.get("pool", None)
num_workers = num_workers or config.get("num_workers", None)
thread = current_thread()
with pools_lock:
if pool is None:
if num_workers is None and thread is main_thread:
if default_pool is None:
default_pool = ThreadPoolExecutor(CPU_COUNT)
atexit.register(default_pool.shutdown)
pool = default_pool
elif thread in pools and num_workers in pools[thread]:
pool = pools[thread][num_workers]
else:
pool = ThreadPoolExecutor(num_workers)
atexit.register(pool.shutdown)
pools[thread][num_workers] = pool
elif isinstance(pool, multiprocessing.pool.Pool):
pool = MultiprocessingPoolExecutor(pool)
> results = get_async(
pool.submit,
pool._max_workers,
dsk,
keys,
cache=cache,
get_id=_thread_get_id,
pack_exception=pack_exception,
**kwargs,
)
.venv/lib/python3.9/site-packages/dask/threaded.py:89:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
submit = <bound method ThreadPoolExecutor.submit of <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f189af64100>>
num_workers = 4
dsk = {('from_pandas-067125a4f6eba11f7bb16651b8039228', 0): posx posy energy ... posy_jittered energy_jitt...41e+09
39 -0.801982 1.029141 -0.557759 ... -1.110859 -0.866637 1.700841e+09
[10 rows x 7 columns], ...}
result = [[('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 0), ('overlap-forward_fill_partition-e3c44c45087...on-e3c44c450878070118cc647f47aa1748', 4), ('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 5), ...]]
cache = None, get_id = <function _thread_get_id at 0x7f18e4a2bdc0>
rerun_exceptions_locally = False
pack_exception = <function pack_exception at 0x7f18e49c79d0>
raise_exception = <function reraise at 0x7f18e4aa7820>, callbacks = ()
dumps = <function identity at 0x7f18e4aa78b0>
loads = <function identity at 0x7f18e4aa78b0>, chunksize = 1, kwargs = {}
result_flat = {('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 0), ('overlap-forward_fill_partition-e3c44c450878...ion-e3c44c450878070118cc647f47aa1748', 4), ('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 5), ...}
results = {('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 0), ('overlap-forward_fill_partition-e3c44c450878...ion-e3c44c450878070118cc647f47aa1748', 4), ('overlap-forward_fill_partition-e3c44c450878070118cc647f47aa1748', 5), ...}
_ = (), posttask_cbs = (), started_cbs = [], succeeded = False
def get_async(
submit,
num_workers,
dsk,
result,
cache=None,
get_id=default_get_id,
rerun_exceptions_locally=None,
pack_exception=default_pack_exception,
raise_exception=reraise,
callbacks=None,
dumps=identity,
loads=identity,
chunksize=None,
**kwargs,
):
"""Asynchronous get function
This is a general version of various asynchronous schedulers for dask. It
takes a ``concurrent.futures.Executor.submit`` function to form a more
specific ``get`` method that walks through the dask array with parallel
workers, avoiding repeat computation and minimizing memory use.
Parameters
----------
submit : function
A ``concurrent.futures.Executor.submit`` function
num_workers : int
The number of workers that task submissions can be spread over
dsk : dict
A dask dictionary specifying a workflow
result : key or list of keys
Keys corresponding to desired data
cache : dict-like, optional
Temporary storage of results
get_id : callable, optional
Function to return the worker id, takes no arguments. Examples are
`threading.current_thread` and `multiprocessing.current_process`.
rerun_exceptions_locally : bool, optional
Whether to rerun failing tasks in local process to enable debugging
(False by default)
pack_exception : callable, optional
Function to take an exception and ``dumps`` method, and return a
serialized tuple of ``(exception, traceback)`` to send back to the
scheduler. Default is to just raise the exception.
raise_exception : callable, optional
Function that takes an exception and a traceback, and raises an error.
callbacks : tuple or list of tuples, optional
Callbacks are passed in as tuples of length 5. Multiple sets of
callbacks may be passed in as a list of tuples. For more information,
see the dask.diagnostics documentation.
dumps: callable, optional
Function to serialize task data and results to communicate between
worker and parent. Defaults to identity.
loads: callable, optional
Inverse function of `dumps`. Defaults to identity.
chunksize: int, optional
Size of chunks to use when dispatching work. Defaults to 1.
If -1, will be computed to evenly divide ready work across workers.
See Also
--------
threaded.get
"""
chunksize = chunksize or config.get("chunksize", 1)
queue = Queue()
if isinstance(result, list):
result_flat = set(flatten(result))
else:
result_flat = {result}
results = set(result_flat)
dsk = dict(dsk)
with local_callbacks(callbacks) as callbacks:
_, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks)
started_cbs = []
succeeded = False
# if start_state_from_dask fails, we will have something
# to pass to the final block.
state = {}
try:
for cb in callbacks:
if cb[0]:
cb[0](dsk)
started_cbs.append(cb)
keyorder = order(dsk)
state = start_state_from_dask(dsk, cache=cache, sortkey=keyorder.get)
for _, start_state, _, _, _ in callbacks:
if start_state:
start_state(dsk, state)
if rerun_exceptions_locally is None:
rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
if state["waiting"] and not state["ready"]:
raise ValueError("Found no accessible jobs in dask")
def fire_tasks(chunksize):
"""Fire off a task to the thread pool"""
# Determine chunksize and/or number of tasks to submit
nready = len(state["ready"])
if chunksize == -1:
ntasks = nready
chunksize = -(ntasks // -num_workers)
else:
used_workers = -(len(state["running"]) // -chunksize)
avail_workers = max(num_workers - used_workers, 0)
ntasks = min(nready, chunksize * avail_workers)
# Prep all ready tasks for submission
args = []
for _ in range(ntasks):
# Get the next task to compute (most recently added)
key = state["ready"].pop()
# Notify task is running
state["running"].add(key)
for f in pretask_cbs:
f(key, dsk, state)
# Prep args to send
data = {
dep: state["cache"][dep] for dep in get_dependencies(dsk, key)
}
args.append(
(
key,
dumps((dsk[key], data)),
dumps,
loads,
get_id,
pack_exception,
)
)
# Batch submit
for i in range(-(len(args) // -chunksize)):
each_args = args[i * chunksize : (i + 1) * chunksize]
if not each_args:
break
fut = submit(batch_execute_tasks, each_args)
fut.add_done_callback(queue.put)
# Main loop, wait on tasks to finish, insert new ones
while state["waiting"] or state["ready"] or state["running"]:
fire_tasks(chunksize)
for key, res_info, failed in queue_get(queue).result():
if failed:
exc, tb = loads(res_info)
if rerun_exceptions_locally:
data = {
dep: state["cache"][dep]
for dep in get_dependencies(dsk, key)
}
task = dsk[key]
_execute_task(task, data) # Re-execute locally
else:
> raise_exception(exc, tb)
.venv/lib/python3.9/site-packages/dask/local.py:511:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
exc = ValueError('Plan shapes are not aligned')
tb = <traceback object at 0x7f189bcf29c0>
def reraise(exc, tb=None):
if exc.__traceback__ is not tb:
raise exc.with_traceback(tb)
> raise exc
.venv/lib/python3.9/site-packages/dask/local.py:319:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = ('overlap-concat-8bc698403480a774a3d1c9bf75840b86', 1)
task_info = ((<function _combined_parts at 0x7f18b24a3550>, ('overlap-prepend-fbbbe1b5bfed03b8d7752eb6a685fe44', 0), ('overlap-for....700841e+09
9 0.440616 -0.037753 -0.388121 ... 0.090247 -0.554471 1.700841e+09
[10 rows x 7 columns]})
dumps = <function identity at 0x7f18e4aa78b0>
loads = <function identity at 0x7f18e4aa78b0>
get_id = <function _thread_get_id at 0x7f18e4a2bdc0>
pack_exception = <function pack_exception at 0x7f18e49c79d0>
def execute_task(key, task_info, dumps, loads, get_id, pack_exception):
"""
Compute task and handle all administration
See Also
--------
_execute_task : actually execute task
"""
try:
task, data = loads(task_info)
> result = _execute_task(task, data)
.venv/lib/python3.9/site-packages/dask/local.py:224:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<function _combined_parts at 0x7f18b24a3550>, ('overlap-prepend-fbbbe1b5bfed03b8d7752eb6a685fe44', 0), ('overlap-forward_fill_partition-b314a163d33c9ca1d74a8202f2415d31', 1), None, 10, 0)
cache = {('overlap-forward_fill_partition-b314a163d33c9ca1d74a8202f2415d31', 1): posx posy energy ... posy_j...1.700841e+09
9 0.440616 -0.037753 -0.388121 ... 0.090247 -0.554471 1.700841e+09
[10 rows x 7 columns]}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
.venv/lib/python3.9/site-packages/dask/core.py:119:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
prev_part = posx posy energy ... posy_jittered energy_jittered timeStamps
0 -0.872030 -0.102163 -0.177901 ..... 1.700841e+09
9 0.440616 -0.037753 -0.388121 ... 0.090247 -0.554471 1.700841e+09
[10 rows x 7 columns]
current_part = posx posy energy ... posy_jittered energy_jittered timeStamps
10 0.807913 -0.679010 -1.026872 ...1.700841e+09
19 -0.832794 0.573525 -1.026872 ... -1.060530 -0.286718 1.700841e+09
[10 rows x 7 columns]
next_part = None, before = 10, after = 0
def _combined_parts(prev_part, current_part, next_part, before, after):
msg = (
"Partition size is less than overlapping "
"window size. Try using ``df.repartition`` "
"to increase the partition size."
)
if prev_part is not None and isinstance(before, Integral):
if prev_part.shape[0] != before:
raise NotImplementedError(msg)
if next_part is not None and isinstance(after, Integral):
if next_part.shape[0] != after:
raise NotImplementedError(msg)
parts = [p for p in (prev_part, current_part, next_part) if p is not None]
> combined = methods.concat(parts)
.venv/lib/python3.9/site-packages/dask/dataframe/rolling.py:55:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dfs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -0.872030 -0.102163 -0.177901 .....700841e+09
19 -0.832794 0.573525 -1.026872 ... -1.060530 -0.286718 1.700841e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', uniform = False, filter_warning = True
ignore_index = False, kwargs = {}
func = <function concat_pandas at 0x7f18b24a2f70>
def concat(
dfs,
axis=0,
join="outer",
uniform=False,
filter_warning=True,
ignore_index=False,
**kwargs,
):
"""Concatenate, handling some edge cases:
- Unions categoricals between partitions
- Ignores empty partitions
Parameters
----------
dfs : list of DataFrame, Series, or Index
axis : int or str, optional
join : str, optional
uniform : bool, optional
Whether to treat ``dfs[0]`` as representative of ``dfs[1:]``. Set to
True if all arguments have the same columns and dtypes (but not
necessarily categories). Default is False.
ignore_index : bool, optional
Whether to allow index values to be ignored/dropped during
concatenation. Default is False.
ignore_order : bool, optional
Whether to ignore the order when doing the union of categoricals.
Default is False.
"""
if len(dfs) == 1:
return dfs[0]
else:
func = concat_dispatch.dispatch(type(dfs[0]))
> return func(
dfs,
axis=axis,
join=join,
uniform=uniform,
filter_warning=filter_warning,
ignore_index=ignore_index,
**kwargs,
)
.venv/lib/python3.9/site-packages/dask/dataframe/dispatch.py:63:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dfs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -0.872030 -0.102163 -0.177901 .....700841e+09
19 -0.832794 0.573525 -1.026872 ... -1.060530 -0.286718 1.700841e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', uniform = False, filter_warning = True
ignore_index = False, kwargs = {}, ignore_order = False
dfs0_index = RangeIndex(start=0, stop=10, step=1)
@concat_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def concat_pandas(
dfs,
axis=0,
join="outer",
uniform=False,
filter_warning=True,
ignore_index=False,
**kwargs,
):
ignore_order = kwargs.pop("ignore_order", False)
if axis == 1:
return pd.concat(dfs, axis=axis, join=join, **kwargs)
# Support concatenating indices along axis 0
if isinstance(dfs[0], pd.Index):
if isinstance(dfs[0], pd.CategoricalIndex):
for i in range(1, len(dfs)):
if not isinstance(dfs[i], pd.CategoricalIndex):
dfs[i] = dfs[i].astype("category")
return pd.CategoricalIndex(
union_categoricals(dfs, ignore_order=ignore_order), name=dfs[0].name
)
elif isinstance(dfs[0], pd.MultiIndex):
first, rest = dfs[0], dfs[1:]
if all(
(isinstance(o, pd.MultiIndex) and o.nlevels >= first.nlevels)
for o in rest
):
arrays = [
concat([i._get_level_values(n) for i in dfs])
for n in range(first.nlevels)
]
return pd.MultiIndex.from_arrays(arrays, names=first.names)
to_concat = (first.values,) + tuple(k._values for k in rest)
new_tuples = np.concatenate(to_concat)
try:
return pd.MultiIndex.from_tuples(new_tuples, names=first.names)
except Exception:
return pd.Index(new_tuples)
return dfs[0].append(dfs[1:])
# Handle categorical index separately
dfs0_index = dfs[0].index
has_categoricalindex = isinstance(dfs0_index, pd.CategoricalIndex) or (
isinstance(dfs0_index, pd.MultiIndex)
and any(isinstance(i, pd.CategoricalIndex) for i in dfs0_index.levels)
)
if has_categoricalindex:
dfs2 = [df.reset_index(drop=True) for df in dfs]
ind = concat([df.index for df in dfs])
else:
dfs2 = dfs
ind = None
# Concatenate the partitions together, handling categories as needed
if (
isinstance(dfs2[0], pd.DataFrame)
if uniform
else any(isinstance(df, pd.DataFrame) for df in dfs2)
):
if uniform:
dfs3 = dfs2
cat_mask = dfs2[0].dtypes == "category"
else:
# When concatenating mixed dataframes and series on axis 1, Pandas
# converts series to dataframes with a single column named 0, then
# concatenates.
dfs3 = [
df
if isinstance(df, pd.DataFrame)
else df.to_frame().rename(columns={df.name: 0})
for df in dfs2
]
# pandas may raise a RuntimeWarning for comparing ints and strs
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
if filter_warning:
warnings.simplefilter("ignore", FutureWarning)
cat_mask = pd.concat(
[(df.dtypes == "category").to_frame().T for df in dfs3],
join=join,
**kwargs,
).any()
if cat_mask.any():
not_cat = cat_mask[~cat_mask].index
# this should be aligned, so no need to filter warning
out = pd.concat(
[df[df.columns.intersection(not_cat)] for df in dfs3],
join=join,
**kwargs,
)
temp_ind = out.index
for col in cat_mask.index.difference(not_cat):
# Find an example of categoricals in this column
for df in dfs3:
sample = df.get(col)
if sample is not None:
break
# Extract partitions, subbing in missing if needed
parts = []
for df in dfs3:
if col in df.columns:
parts.append(df[col])
else:
codes = np.full(len(df), -1, dtype="i8")
data = pd.Categorical.from_codes(
codes, sample.cat.categories, sample.cat.ordered
)
parts.append(data)
out[col] = union_categoricals(parts, ignore_order=ignore_order)
# Pandas resets index type on assignment if frame is empty
# https://github.com/pandas-dev/pandas/issues/17101
if not len(temp_ind):
out.index = temp_ind
out = out.reindex(columns=cat_mask.index)
else:
# pandas may raise a RuntimeWarning for comparing ints and strs
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
if filter_warning:
warnings.simplefilter("ignore", FutureWarning)
> out = pd.concat(dfs3, join=join, sort=False)
.venv/lib/python3.9/site-packages/dask/dataframe/backends.py:653:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = ([ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -0.872030 -0.102163 -0.177901 ...00841e+09
19 -0.832794 0.573525 -1.026872 ... -1.060530 -0.286718 1.700841e+09
[10 rows x 7 columns]],)
kwargs = {'join': 'outer', 'sort': False}
@wraps(func)
def wrapper(*args, **kwargs):
if len(args) > num_allow_args:
warnings.warn(
msg.format(arguments=_format_argument_list(allow_args)),
FutureWarning,
stacklevel=find_stack_level(),
)
> return func(*args, **kwargs)
.venv/lib/python3.9/site-packages/pandas/util/_decorators.py:331:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
objs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -0.872030 -0.102163 -0.177901 .....700841e+09
19 -0.832794 0.573525 -1.026872 ... -1.060530 -0.286718 1.700841e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', ignore_index = False, keys = None, levels = None
names = None, verify_integrity = False, sort = False, copy = True
@deprecate_nonkeyword_arguments(version=None, allowed_args=["objs"])
def concat(
objs: Iterable[NDFrame] | Mapping[HashableT, NDFrame],
axis: Axis = 0,
join: str = "outer",
ignore_index: bool = False,
keys=None,
levels=None,
names=None,
verify_integrity: bool = False,
sort: bool = False,
copy: bool = True,
) -> DataFrame | Series:
"""
Concatenate pandas objects along a particular axis.
Allows optional set logic along the other axes.
Can also add a layer of hierarchical indexing on the concatenation axis,
which may be useful if the labels are the same (or overlapping) on
the passed axis number.
Parameters
----------
objs : a sequence or mapping of Series or DataFrame objects
If a mapping is passed, the sorted keys will be used as the `keys`
argument, unless it is passed, in which case the values will be
selected (see below). Any None objects will be dropped silently unless
they are all None in which case a ValueError will be raised.
axis : {0/'index', 1/'columns'}, default 0
The axis to concatenate along.
join : {'inner', 'outer'}, default 'outer'
How to handle indexes on other axis (or axes).
ignore_index : bool, default False
If True, do not use the index values along the concatenation axis. The
resulting axis will be labeled 0, ..., n - 1. This is useful if you are
concatenating objects where the concatenation axis does not have
meaningful indexing information. Note the index values on the other
axes are still respected in the join.
keys : sequence, default None
If multiple levels passed, should contain tuples. Construct
hierarchical index using the passed keys as the outermost level.
levels : list of sequences, default None
Specific levels (unique values) to use for constructing a
MultiIndex. Otherwise they will be inferred from the keys.
names : list, default None
Names for the levels in the resulting hierarchical index.
verify_integrity : bool, default False
Check whether the new concatenated axis contains duplicates. This can
be very expensive relative to the actual data concatenation.
sort : bool, default False
Sort non-concatenation axis if it is not already aligned when `join`
is 'outer'.
This has no effect when ``join='inner'``, which already preserves
the order of the non-concatenation axis.
.. versionchanged:: 1.0.0
Changed to not sort by default.
copy : bool, default True
If False, do not copy data unnecessarily.
Returns
-------
object, type of objs
When concatenating all ``Series`` along the index (axis=0), a
``Series`` is returned. When ``objs`` contains at least one
``DataFrame``, a ``DataFrame`` is returned. When concatenating along
the columns (axis=1), a ``DataFrame`` is returned.
See Also
--------
DataFrame.join : Join DataFrames using indexes.
DataFrame.merge : Merge DataFrames by indexes or columns.
Notes
-----
The keys, levels, and names arguments are all optional.
A walkthrough of how this method fits in with other tools for combining
pandas objects can be found `here
<https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html>`__.
It is not recommended to build DataFrames by adding single rows in a
for loop. Build a list of rows and make a DataFrame in a single concat.
Examples
--------
Combine two ``Series``.
>>> s1 = pd.Series(['a', 'b'])
>>> s2 = pd.Series(['c', 'd'])
>>> pd.concat([s1, s2])
0 a
1 b
0 c
1 d
dtype: object
Clear the existing index and reset it in the result
by setting the ``ignore_index`` option to ``True``.
>>> pd.concat([s1, s2], ignore_index=True)
0 a
1 b
2 c
3 d
dtype: object
Add a hierarchical index at the outermost level of
the data with the ``keys`` option.
>>> pd.concat([s1, s2], keys=['s1', 's2'])
s1 0 a
1 b
s2 0 c
1 d
dtype: object
Label the index keys you create with the ``names`` option.
>>> pd.concat([s1, s2], keys=['s1', 's2'],
... names=['Series name', 'Row ID'])
Series name Row ID
s1 0 a
1 b
s2 0 c
1 d
dtype: object
Combine two ``DataFrame`` objects with identical columns.
>>> df1 = pd.DataFrame([['a', 1], ['b', 2]],
... columns=['letter', 'number'])
>>> df1
letter number
0 a 1
1 b 2
>>> df2 = pd.DataFrame([['c', 3], ['d', 4]],
... columns=['letter', 'number'])
>>> df2
letter number
0 c 3
1 d 4
>>> pd.concat([df1, df2])
letter number
0 a 1
1 b 2
0 c 3
1 d 4
Combine ``DataFrame`` objects with overlapping columns
and return everything. Columns outside the intersection will
be filled with ``NaN`` values.
>>> df3 = pd.DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']],
... columns=['letter', 'number', 'animal'])
>>> df3
letter number animal
0 c 3 cat
1 d 4 dog
>>> pd.concat([df1, df3], sort=False)
letter number animal
0 a 1 NaN
1 b 2 NaN
0 c 3 cat
1 d 4 dog
Combine ``DataFrame`` objects with overlapping columns
and return only those that are shared by passing ``inner`` to
the ``join`` keyword argument.
>>> pd.concat([df1, df3], join="inner")
letter number
0 a 1
1 b 2
0 c 3
1 d 4
Combine ``DataFrame`` objects horizontally along the x axis by
passing in ``axis=1``.
>>> df4 = pd.DataFrame([['bird', 'polly'], ['monkey', 'george']],
... columns=['animal', 'name'])
>>> pd.concat([df1, df4], axis=1)
letter number animal name
0 a 1 bird polly
1 b 2 monkey george
Prevent the result from including duplicate index values with the
``verify_integrity`` option.
>>> df5 = pd.DataFrame([1], index=['a'])
>>> df5
0
a 1
>>> df6 = pd.DataFrame([2], index=['a'])
>>> df6
0
a 2
>>> pd.concat([df5, df6], verify_integrity=True)
Traceback (most recent call last):
...
ValueError: Indexes have overlapping values: ['a']
Append a single row to the end of a ``DataFrame`` object.
>>> df7 = pd.DataFrame({'a': 1, 'b': 2}, index=[0])
>>> df7
a b
0 1 2
>>> new_row = pd.Series({'a': 3, 'b': 4})
>>> new_row
a 3
b 4
dtype: int64
>>> pd.concat([df7, new_row.to_frame().T], ignore_index=True)
a b
0 1 2
1 3 4
"""
op = _Concatenator(
objs,
axis=axis,
ignore_index=ignore_index,
join=join,
keys=keys,
levels=levels,
names=names,
verify_integrity=verify_integrity,
copy=copy,
sort=sort,
)
> return op.get_result()
.venv/lib/python3.9/site-packages/pandas/core/reshape/concat.py:381:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pandas.core.reshape.concat._Concatenator object at 0x7f189bbe1f70>
def get_result(self):
cons: Callable[..., DataFrame | Series]
sample: DataFrame | Series
# series only
if self._is_series:
sample = cast("Series", self.objs[0])
# stack blocks
if self.bm_axis == 0:
name = com.consensus_name_attr(self.objs)
cons = sample._constructor
arrs = [ser._values for ser in self.objs]
res = concat_compat(arrs, axis=0)
result = cons(res, index=self.new_axes[0], name=name, dtype=res.dtype)
return result.__finalize__(self, method="concat")
# combine as columns in a frame
else:
data = dict(zip(range(len(self.objs)), self.objs))
# GH28330 Preserves subclassed objects through concat
cons = sample._constructor_expanddim
index, columns = self.new_axes
df = cons(data, index=index, copy=self.copy)
df.columns = columns
return df.__finalize__(self, method="concat")
# combine block managers
else:
sample = cast("DataFrame", self.objs[0])
mgrs_indexers = []
for obj in self.objs:
indexers = {}
for ax, new_labels in enumerate(self.new_axes):
# ::-1 to convert BlockManager ax to DataFrame ax
if ax == self.bm_axis:
# Suppress reindexing on concat axis
continue
# 1-ax to convert BlockManager axis to DataFrame axis
obj_labels = obj.axes[1 - ax]
if not new_labels.equals(obj_labels):
indexers[ax] = obj_labels.get_indexer(new_labels)
mgrs_indexers.append((obj._mgr, indexers))
> new_data = concatenate_managers(
mgrs_indexers, self.new_axes, concat_axis=self.bm_axis, copy=self.copy
)
.venv/lib/python3.9/site-packages/pandas/core/reshape/concat.py:616:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mgrs_indexers = [(BlockManager
Items: Index(['posx', 'posy', 'energy', 'posx_jittered', 'posy_jittered',
'energy_jittered', 'ti... step=1)
NumericBlock: [0 1 3 4 5 6], 6 x 10, dtype: float64
NumericBlock: slice(2, 3, 1), 1 x 10, dtype: float64, {})]
axes = [Index(['posx', 'posy', 'energy', 'posx_jittered', 'posy_jittered',
'energy_jittered', 'timeStamps'],
dtype='object'), RangeIndex(start=0, stop=20, step=1)]
concat_axis = 1, copy = True
def concatenate_managers(
mgrs_indexers, axes: list[Index], concat_axis: int, copy: bool
) -> Manager:
"""
Concatenate block managers into one.
Parameters
----------
mgrs_indexers : list of (BlockManager, {axis: indexer,...}) tuples
axes : list of Index
concat_axis : int
copy : bool
Returns
-------
BlockManager
"""
# TODO(ArrayManager) this assumes that all managers are of the same type
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)
concat_plans = [
_get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers
]
concat_plan = _combine_concat_plans(concat_plans, concat_axis)
blocks = []
> for placement, join_units in concat_plan:
.venv/lib/python3.9/site-packages/pandas/core/internals/concat.py:205:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
plans = [<list_iterator object at 0x7f189b281310>, <list_iterator object at 0x7f189b281100>]
concat_axis = 1
def _combine_concat_plans(plans, concat_axis: int):
"""
Combine multiple concatenation plans into one.
existing_plan is updated in-place.
"""
if len(plans) == 1:
for p in plans[0]:
yield p[0], [p[1]]
elif concat_axis == 0:
offset = 0
for plan in plans:
last_plc = None
for plc, unit in plan:
yield plc.add(offset), [unit]
last_plc = plc
if last_plc is not None:
offset += last_plc.as_slice.stop
else:
# singleton list so we can modify it as a side-effect within _next_or_none
num_ended = [0]
def _next_or_none(seq):
retval = next(seq, None)
if retval is None:
num_ended[0] += 1
return retval
plans = list(map(iter, plans))
next_items = list(map(_next_or_none, plans))
while num_ended[0] != len(next_items):
if num_ended[0] > 0:
> raise ValueError("Plan shapes are not aligned")
E ValueError: Plan shapes are not aligned
.venv/lib/python3.9/site-packages/pandas/core/internals/concat.py:742: ValueError
Happened again (Py 3.10):
__________________ test_forward_fill_lazy_multiple_iterations __________________
[gw1] linux -- Python 3.10.13 /home/runner/work/sed/sed/.venv/bin/python
cls = <class '_pytest.runner.CallInfo'>
func = <function call_runtest_hook.<locals>.<lambda> at 0x7f8cfadf3f40>
when = 'call'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
@classmethod
def from_call(
cls,
func: "Callable[[], TResult]",
when: "Literal['collect', 'setup', 'call', 'teardown']",
reraise: Optional[
Union[Type[BaseException], Tuple[Type[BaseException], ...]]
] = None,
) -> "CallInfo[TResult]":
"""Call func, wrapping the result in a CallInfo.
:param func:
The function to call. Called without arguments.
:param when:
The phase in which the function is called.
:param reraise:
Exception or exceptions that shall propagate if raised by the
function, instead of being wrapped in the CallInfo.
"""
excinfo = None
start = timing.time()
precise_start = timing.perf_counter()
try:
> result: Optional[TResult] = func()
.venv/lib/python3.10/site-packages/_pytest/runner.py:341:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> lambda: ihook(item=item, **kwds), when=when, reraise=reraise
)
.venv/lib/python3.10/site-packages/_pytest/runner.py:262:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <HookCaller 'pytest_runtest_call'>
kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def __call__(self, **kwargs: object) -> Any:
"""Call the hook.
Only accepts keyword arguments, which should match the hook
specification.
Returns the result(s) of calling all registered plugins, see
:ref:`calling`.
"""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
.venv/lib/python3.10/site-packages/pluggy/_hooks.py:493:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7f8d4f3137c0>
hook_name = 'pytest_runtest_call'
methods = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.10/...test.threadexception' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/threadexception.py'>>]
kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _hookexec(
self,
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
.venv/lib/python3.10/site-packages/pluggy/_manager.py:115:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.10/...test.threadexception' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/threadexception.py'>>]
caller_kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
raise exception.with_traceback(exception.__traceback__)
else:
return result
# Slow path - need to support old-style wrappers.
else:
if firstresult: # first result hooks return a single value
outcome: Result[object | list[object]] = Result(
results[0] if results else None, exception
)
else:
outcome = Result(results, exception)
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
if isinstance(teardown, tuple):
try:
teardown[0].send(outcome)
_raise_wrapfail(teardown[0], "has second yield")
except StopIteration:
pass
else:
try:
if outcome._exception is not None:
teardown.throw(outcome._exception)
else:
teardown.send(outcome._result)
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close()
except StopIteration as si:
outcome.force_result(si.value)
continue
except BaseException as e:
outcome.force_exception(e)
continue
_raise_wrapfail(teardown, "has second yield")
> return outcome.get_result()
.venv/lib/python3.10/site-packages/pluggy/_callers.py:152:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pluggy._result.Result object at 0x7f8cfb5d8940>
def get_result(self) -> ResultType:
"""Get the result(s) for this hook call.
If the hook was marked as a ``firstresult`` only a single value
will be returned, otherwise a list of results.
"""
__tracebackhide__ = True
exc = self._exception
if exc is None:
return cast(ResultType, self._result)
else:
> raise exc.with_traceback(exc.__traceback__)
.venv/lib/python3.10/site-packages/pluggy/_result.py:114:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_runtest_call'
hook_impls = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/home/runner/work/sed/sed/.venv/lib/python3.10/...test.threadexception' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/threadexception.py'>>]
caller_kwargs = {'item': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = False
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
> res = hook_impl.function(*args)
.venv/lib/python3.10/site-packages/pluggy/_callers.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function test_forward_fill_lazy_multiple_iterations>
def pytest_runtest_call(item: Item) -> None:
_update_current_test_var(item, "call")
try:
del sys.last_type
del sys.last_value
del sys.last_traceback
except AttributeError:
pass
try:
item.runtest()
except Exception as e:
# Store trace info to allow postmortem debugging
sys.last_type = type(e)
sys.last_value = e
assert e.__traceback__ is not None
# Skip *this* frame
sys.last_traceback = e.__traceback__.tb_next
> raise e
.venv/lib/python3.10/site-packages/_pytest/runner.py:177:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function test_forward_fill_lazy_multiple_iterations>
def pytest_runtest_call(item: Item) -> None:
_update_current_test_var(item, "call")
try:
del sys.last_type
del sys.last_value
del sys.last_traceback
except AttributeError:
pass
try:
> item.runtest()
.venv/lib/python3.10/site-packages/_pytest/runner.py:169:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Function test_forward_fill_lazy_multiple_iterations>
def runtest(self) -> None:
"""Execute the underlying test function."""
> self.ihook.pytest_pyfunc_call(pyfuncitem=self)
.venv/lib/python3.10/site-packages/_pytest/python.py:1792:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <HookCaller 'pytest_pyfunc_call'>
kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def __call__(self, **kwargs: object) -> Any:
"""Call the hook.
Only accepts keyword arguments, which should match the hook
specification.
Returns the result(s) of calling all registered plugins, see
:ref:`calling`.
"""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
> return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
.venv/lib/python3.10/site-packages/pluggy/_hooks.py:493:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7f8d4f3137c0>
hook_name = 'pytest_pyfunc_call'
methods = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/python.py'>>]
kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _hookexec(
self,
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
.venv/lib/python3.10/site-packages/pluggy/_manager.py:115:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_pyfunc_call'
hook_impls = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/python.py'>>]
caller_kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
> raise exception.with_traceback(exception.__traceback__)
.venv/lib/python3.10/site-packages/pluggy/_callers.py:113:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook_name = 'pytest_pyfunc_call'
hook_impls = [<HookImpl plugin_name='python', plugin=<module '_pytest.python' from '/home/runner/work/sed/sed/.venv/lib/python3.10/site-packages/_pytest/python.py'>>]
caller_kwargs = {'pyfuncitem': <Function test_forward_fill_lazy_multiple_iterations>}
firstresult = True
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen,))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
> res = hook_impl.function(*args)
.venv/lib/python3.10/site-packages/pluggy/_callers.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pyfuncitem = <Function test_forward_fill_lazy_multiple_iterations>
@hookimpl(trylast=True)
def pytest_pyfunc_call(pyfuncitem: "Function") -> Optional[object]:
testfunction = pyfuncitem.obj
if is_async_function(testfunction):
async_warn_and_skip(pyfuncitem.nodeid)
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
> result = testfunction(**testargs)
.venv/lib/python3.10/site-packages/_pytest/python.py:194:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def test_forward_fill_lazy_multiple_iterations() -> None:
"""test that a lazy forward fill works as expected with multiple iterations"""
t_df = df.copy()
t_df["energy"][5:35] = np.nan
t_dask_df = ddf.from_pandas(t_df, npartitions=N_PARTITIONS)
t_dask_df = forward_fill_lazy(t_dask_df, "energy", before="max", iterations=5)
t_df = t_df.ffill()
> pd.testing.assert_frame_equal(t_df, t_dask_df.compute())
tests/test_dfops.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeSt... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers
kwargs = {}
def compute(self, **kwargs):
"""Compute this dask collection
This turns a lazy Dask collection into its in-memory equivalent.
For example a Dask array turns into a NumPy array and a Dask dataframe
turns into a Pandas dataframe. The entire dataset must fit into memory
before calling this operation.
Parameters
----------
scheduler : string, optional
Which scheduler to use like "threads", "synchronous" or "processes".
If not provided, the default is to check the global settings first,
and then fall back to the collection defaults.
optimize_graph : bool, optional
If True [default], the graph is optimized before computation.
Otherwise the graph is run as is. This can be useful for debugging.
kwargs
Extra keywords to forward to the scheduler function.
See Also
--------
dask.base.compute
"""
> (result,) = compute(self, traverse=False, **kwargs)
.venv/lib/python3.10/site-packages/dask/base.py:315:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
traverse = False, optimize_graph = True, scheduler = None, get = None
args = (Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeS... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers,)
kwargs = {}
collections = [Dask DataFrame Structure:
posx posy energy posx_jittered posy_jittered energy_jittered timeS... ... ... ... ... ... ... ...
Dask Name: overlap, 11 graph layers]
repack = <function unpack_collections.<locals>.repack at 0x7f8cfabe0b80>
schedule = <function get at 0x7f8d4ca89630>
dsk = HighLevelGraph with 11 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f8cf9e1e9e0>
0. from_pandas-5c2a30228.... overlap-concat-d90c3a36af38e3a3b119d008a4e62e53
10. overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f
keys = [[('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 0), ('overlap-forward_fill_partition-07caaa479c4...on-07caaa479c4a37f343caac5b52dc0e6f', 4), ('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 5), ...]]
postcomputes = [(<function finalize at 0x7f8d1a762cb0>, ())]
def compute(
*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs
):
"""Compute several dask collections at once.
Parameters
----------
args : object
Any number of objects. If it is a dask object, it's computed and the
result is returned. By default, python builtin collections are also
traversed to look for dask objects (for more information see the
``traverse`` keyword). Non-dask arguments are passed through unchanged.
traverse : bool, optional
By default dask traverses builtin python collections looking for dask
objects passed to ``compute``. For large collections this can be
expensive. If none of the arguments contain any dask objects, set
``traverse=False`` to avoid doing this traversal.
scheduler : string, optional
Which scheduler to use like "threads", "synchronous" or "processes".
If not provided, the default is to check the global settings first,
and then fall back to the collection defaults.
optimize_graph : bool, optional
If True [default], the optimizations for each collection are applied
before computation. Otherwise the graph is run as is. This can be
useful for debugging.
get : ``None``
Should be left to ``None`` The get= keyword has been removed.
kwargs
Extra keywords to forward to the scheduler function.
Examples
--------
>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> dask.compute(a, b)
(45, 4.5)
By default, dask objects inside python collections will also be computed:
>>> dask.compute({'a': a, 'b': b, 'c': 1})
({'a': 45, 'b': 4.5, 'c': 1},)
"""
collections, repack = unpack_collections(*args, traverse=traverse)
if not collections:
return args
schedule = get_scheduler(
scheduler=scheduler,
collections=collections,
get=get,
)
dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
keys, postcomputes = [], []
for x in collections:
keys.append(x.__dask_keys__())
postcomputes.append(x.__dask_postcompute__())
> results = schedule(dsk, keys, **kwargs)
.venv/lib/python3.10/site-packages/dask/base.py:600:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dsk = HighLevelGraph with 11 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f8cf9e1e9e0>
0. from_pandas-5c2a30228.... overlap-concat-d90c3a36af38e3a3b119d008a4e62e53
10. overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f
keys = [[('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 0), ('overlap-forward_fill_partition-07caaa479c4...on-07caaa479c4a37f343caac5b52dc0e6f', 4), ('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 5), ...]]
cache = None, num_workers = None
pool = <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f8cf9c87820>
kwargs = {}, thread = <_MainThread(MainThread, started 140244911975296)>
def get(
dsk: Mapping,
keys: Sequence[Hashable] | Hashable,
cache=None,
num_workers=None,
pool=None,
**kwargs,
):
"""Threaded cached implementation of dask.get
Parameters
----------
dsk: dict
A dask dictionary specifying a workflow
keys: key or list of keys
Keys corresponding to desired data
num_workers: integer of thread count
The number of threads to use in the ThreadPool that will actually execute tasks
cache: dict-like (optional)
Temporary storage of results
Examples
--------
>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}
>>> get(dsk, 'w')
4
>>> get(dsk, ['w', 'y'])
(4, 2)
"""
global default_pool
pool = pool or config.get("pool", None)
num_workers = num_workers or config.get("num_workers", None)
thread = current_thread()
with pools_lock:
if pool is None:
if num_workers is None and thread is main_thread:
if default_pool is None:
default_pool = ThreadPoolExecutor(CPU_COUNT)
atexit.register(default_pool.shutdown)
pool = default_pool
elif thread in pools and num_workers in pools[thread]:
pool = pools[thread][num_workers]
else:
pool = ThreadPoolExecutor(num_workers)
atexit.register(pool.shutdown)
pools[thread][num_workers] = pool
elif isinstance(pool, multiprocessing.pool.Pool):
pool = MultiprocessingPoolExecutor(pool)
> results = get_async(
pool.submit,
pool._max_workers,
dsk,
keys,
cache=cache,
get_id=_thread_get_id,
pack_exception=pack_exception,
**kwargs,
)
.venv/lib/python3.10/site-packages/dask/threaded.py:89:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
submit = <bound method ThreadPoolExecutor.submit of <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f8cf9c87820>>
num_workers = 4
dsk = {('from_pandas-5c2a30228a32043e8cc5a4d621b010eb', 0): posx posy energy ... posy_jittered energy_jitt...93e+09
39 -1.673833 0.745548 -0.965233 ... -1.245565 -0.536965 1.701693e+09
[10 rows x 7 columns], ...}
result = [[('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 0), ('overlap-forward_fill_partition-07caaa479c4...on-07caaa479c4a37f343caac5b52dc0e6f', 4), ('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 5), ...]]
cache = None, get_id = <function _thread_get_id at 0x7f8d4ca77880>
rerun_exceptions_locally = False
pack_exception = <function pack_exception at 0x7f8d4ca892d0>
raise_exception = <function reraise at 0x7f8d4cbe5750>, callbacks = ()
dumps = <function identity at 0x7f8d4cbe57e0>
loads = <function identity at 0x7f8d4cbe57e0>, chunksize = 1, kwargs = {}
result_flat = {('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 0), ('overlap-forward_fill_partition-07caaa479c4a...ion-07caaa479c4a37f343caac5b52dc0e6f', 4), ('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 5), ...}
results = {('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 0), ('overlap-forward_fill_partition-07caaa479c4a...ion-07caaa479c4a37f343caac5b52dc0e6f', 4), ('overlap-forward_fill_partition-07caaa479c4a37f343caac5b52dc0e6f', 5), ...}
_ = (), posttask_cbs = (), started_cbs = [], succeeded = False
def get_async(
submit,
num_workers,
dsk,
result,
cache=None,
get_id=default_get_id,
rerun_exceptions_locally=None,
pack_exception=default_pack_exception,
raise_exception=reraise,
callbacks=None,
dumps=identity,
loads=identity,
chunksize=None,
**kwargs,
):
"""Asynchronous get function
This is a general version of various asynchronous schedulers for dask. It
takes a ``concurrent.futures.Executor.submit`` function to form a more
specific ``get`` method that walks through the dask array with parallel
workers, avoiding repeat computation and minimizing memory use.
Parameters
----------
submit : function
A ``concurrent.futures.Executor.submit`` function
num_workers : int
The number of workers that task submissions can be spread over
dsk : dict
A dask dictionary specifying a workflow
result : key or list of keys
Keys corresponding to desired data
cache : dict-like, optional
Temporary storage of results
get_id : callable, optional
Function to return the worker id, takes no arguments. Examples are
`threading.current_thread` and `multiprocessing.current_process`.
rerun_exceptions_locally : bool, optional
Whether to rerun failing tasks in local process to enable debugging
(False by default)
pack_exception : callable, optional
Function to take an exception and ``dumps`` method, and return a
serialized tuple of ``(exception, traceback)`` to send back to the
scheduler. Default is to just raise the exception.
raise_exception : callable, optional
Function that takes an exception and a traceback, and raises an error.
callbacks : tuple or list of tuples, optional
Callbacks are passed in as tuples of length 5. Multiple sets of
callbacks may be passed in as a list of tuples. For more information,
see the dask.diagnostics documentation.
dumps: callable, optional
Function to serialize task data and results to communicate between
worker and parent. Defaults to identity.
loads: callable, optional
Inverse function of `dumps`. Defaults to identity.
chunksize: int, optional
Size of chunks to use when dispatching work. Defaults to 1.
If -1, will be computed to evenly divide ready work across workers.
See Also
--------
threaded.get
"""
chunksize = chunksize or config.get("chunksize", 1)
queue = Queue()
if isinstance(result, list):
result_flat = set(flatten(result))
else:
result_flat = {result}
results = set(result_flat)
dsk = dict(dsk)
with local_callbacks(callbacks) as callbacks:
_, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks)
started_cbs = []
succeeded = False
# if start_state_from_dask fails, we will have something
# to pass to the final block.
state = {}
try:
for cb in callbacks:
if cb[0]:
cb[0](dsk)
started_cbs.append(cb)
keyorder = order(dsk)
state = start_state_from_dask(dsk, cache=cache, sortkey=keyorder.get)
for _, start_state, _, _, _ in callbacks:
if start_state:
start_state(dsk, state)
if rerun_exceptions_locally is None:
rerun_exceptions_locally = config.get("rerun_exceptions_locally", False)
if state["waiting"] and not state["ready"]:
raise ValueError("Found no accessible jobs in dask")
def fire_tasks(chunksize):
"""Fire off a task to the thread pool"""
# Determine chunksize and/or number of tasks to submit
nready = len(state["ready"])
if chunksize == -1:
ntasks = nready
chunksize = -(ntasks // -num_workers)
else:
used_workers = -(len(state["running"]) // -chunksize)
avail_workers = max(num_workers - used_workers, 0)
ntasks = min(nready, chunksize * avail_workers)
# Prep all ready tasks for submission
args = []
for _ in range(ntasks):
# Get the next task to compute (most recently added)
key = state["ready"].pop()
# Notify task is running
state["running"].add(key)
for f in pretask_cbs:
f(key, dsk, state)
# Prep args to send
data = {
dep: state["cache"][dep] for dep in get_dependencies(dsk, key)
}
args.append(
(
key,
dumps((dsk[key], data)),
dumps,
loads,
get_id,
pack_exception,
)
)
# Batch submit
for i in range(-(len(args) // -chunksize)):
each_args = args[i * chunksize : (i + 1) * chunksize]
if not each_args:
break
fut = submit(batch_execute_tasks, each_args)
fut.add_done_callback(queue.put)
# Main loop, wait on tasks to finish, insert new ones
while state["waiting"] or state["ready"] or state["running"]:
fire_tasks(chunksize)
for key, res_info, failed in queue_get(queue).result():
if failed:
exc, tb = loads(res_info)
if rerun_exceptions_locally:
data = {
dep: state["cache"][dep]
for dep in get_dependencies(dsk, key)
}
task = dsk[key]
_execute_task(task, data) # Re-execute locally
else:
> raise_exception(exc, tb)
.venv/lib/python3.10/site-packages/dask/local.py:511:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
exc = ValueError('Plan shapes are not aligned')
tb = <traceback object at 0x7f8cf9c058c0>
def reraise(exc, tb=None):
if exc.__traceback__ is not tb:
raise exc.with_traceback(tb)
> raise exc
.venv/lib/python3.10/site-packages/dask/local.py:319:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
key = ('overlap-concat-4c96310036be4506e968f37270522ee3', 1)
task_info = ((<function _combined_parts at 0x7f8d1a795990>, ('overlap-prepend-4383310a36e1dbbcb37dd0b7d34bb6a7', 0), ('overlap-for....701693e+09
9 -0.116841 0.040652 -0.027516 ... -0.185009 1.212564 1.701693e+09
[10 rows x 7 columns]})
dumps = <function identity at 0x7f8d4cbe57e0>
loads = <function identity at 0x7f8d4cbe57e0>
get_id = <function _thread_get_id at 0x7f8d4ca77880>
pack_exception = <function pack_exception at 0x7f8d4ca892d0>
def execute_task(key, task_info, dumps, loads, get_id, pack_exception):
"""
Compute task and handle all administration
See Also
--------
_execute_task : actually execute task
"""
try:
task, data = loads(task_info)
> result = _execute_task(task, data)
.venv/lib/python3.10/site-packages/dask/local.py:224:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<function _combined_parts at 0x7f8d1a795990>, ('overlap-prepend-4383310a36e1dbbcb37dd0b7d34bb6a7', 0), ('overlap-forward_fill_partition-368c025acf7abdef5a5dc8b1ed8db357', 1), None, 10, 0)
cache = {('overlap-forward_fill_partition-368c025acf7abdef5a5dc8b1ed8db357', 1): posx posy energy ... posy_j...1.701693e+09
9 -0.116841 0.040652 -0.027516 ... -0.185009 1.212564 1.701693e+09
[10 rows x 7 columns]}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
.venv/lib/python3.10/site-packages/dask/core.py:119:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
prev_part = posx posy energy ... posy_jittered energy_jittered timeStamps
0 -1.312222 1.504660 1.977883 ..... 1.701693e+09
9 -0.116841 0.040652 -0.027516 ... -0.185009 1.212564 1.701693e+09
[10 rows x 7 columns]
current_part = posx posy energy ... posy_jittered energy_jittered timeStamps
10 0.427996 1.616292 1.052292 ...1.701693e+09
19 -0.408113 -1.394895 1.052292 ... -0.644395 -0.197647 1.701693e+09
[10 rows x 7 columns]
next_part = None, before = 10, after = 0
def _combined_parts(prev_part, current_part, next_part, before, after):
msg = (
"Partition size is less than overlapping "
"window size. Try using ``df.repartition`` "
"to increase the partition size."
)
if prev_part is not None and isinstance(before, Integral):
if prev_part.shape[0] != before:
raise NotImplementedError(msg)
if next_part is not None and isinstance(after, Integral):
if next_part.shape[0] != after:
raise NotImplementedError(msg)
parts = [p for p in (prev_part, current_part, next_part) if p is not None]
> combined = methods.concat(parts)
.venv/lib/python3.10/site-packages/dask/dataframe/rolling.py:55:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dfs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -1.312222 1.504660 1.977883 .....701693e+09
19 -0.408113 -1.394895 1.052292 ... -0.644395 -0.197647 1.701693e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', uniform = False, filter_warning = True
ignore_index = False, kwargs = {}
func = <function concat_pandas at 0x7f8d1a7953f0>
def concat(
dfs,
axis=0,
join="outer",
uniform=False,
filter_warning=True,
ignore_index=False,
**kwargs,
):
"""Concatenate, handling some edge cases:
- Unions categoricals between partitions
- Ignores empty partitions
Parameters
----------
dfs : list of DataFrame, Series, or Index
axis : int or str, optional
join : str, optional
uniform : bool, optional
Whether to treat ``dfs[0]`` as representative of ``dfs[1:]``. Set to
True if all arguments have the same columns and dtypes (but not
necessarily categories). Default is False.
ignore_index : bool, optional
Whether to allow index values to be ignored/dropped during
concatenation. Default is False.
ignore_order : bool, optional
Whether to ignore the order when doing the union of categoricals.
Default is False.
"""
if len(dfs) == 1:
return dfs[0]
else:
func = concat_dispatch.dispatch(type(dfs[0]))
> return func(
dfs,
axis=axis,
join=join,
uniform=uniform,
filter_warning=filter_warning,
ignore_index=ignore_index,
**kwargs,
)
.venv/lib/python3.10/site-packages/dask/dataframe/dispatch.py:63:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dfs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -1.312222 1.504660 1.977883 .....701693e+09
19 -0.408113 -1.394895 1.052292 ... -0.644395 -0.197647 1.701693e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', uniform = False, filter_warning = True
ignore_index = False, kwargs = {}, ignore_order = False
dfs0_index = RangeIndex(start=0, stop=10, step=1)
@concat_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
def concat_pandas(
dfs,
axis=0,
join="outer",
uniform=False,
filter_warning=True,
ignore_index=False,
**kwargs,
):
ignore_order = kwargs.pop("ignore_order", False)
if axis == 1:
return pd.concat(dfs, axis=axis, join=join, **kwargs)
# Support concatenating indices along axis 0
if isinstance(dfs[0], pd.Index):
if isinstance(dfs[0], pd.CategoricalIndex):
for i in range(1, len(dfs)):
if not isinstance(dfs[i], pd.CategoricalIndex):
dfs[i] = dfs[i].astype("category")
return pd.CategoricalIndex(
union_categoricals(dfs, ignore_order=ignore_order), name=dfs[0].name
)
elif isinstance(dfs[0], pd.MultiIndex):
first, rest = dfs[0], dfs[1:]
if all(
(isinstance(o, pd.MultiIndex) and o.nlevels >= first.nlevels)
for o in rest
):
arrays = [
concat([i._get_level_values(n) for i in dfs])
for n in range(first.nlevels)
]
return pd.MultiIndex.from_arrays(arrays, names=first.names)
to_concat = (first.values,) + tuple(k._values for k in rest)
new_tuples = np.concatenate(to_concat)
try:
return pd.MultiIndex.from_tuples(new_tuples, names=first.names)
except Exception:
return pd.Index(new_tuples)
return dfs[0].append(dfs[1:])
# Handle categorical index separately
dfs0_index = dfs[0].index
has_categoricalindex = isinstance(dfs0_index, pd.CategoricalIndex) or (
isinstance(dfs0_index, pd.MultiIndex)
and any(isinstance(i, pd.CategoricalIndex) for i in dfs0_index.levels)
)
if has_categoricalindex:
dfs2 = [df.reset_index(drop=True) for df in dfs]
ind = concat([df.index for df in dfs])
else:
dfs2 = dfs
ind = None
# Concatenate the partitions together, handling categories as needed
if (
isinstance(dfs2[0], pd.DataFrame)
if uniform
else any(isinstance(df, pd.DataFrame) for df in dfs2)
):
if uniform:
dfs3 = dfs2
cat_mask = dfs2[0].dtypes == "category"
else:
# When concatenating mixed dataframes and series on axis 1, Pandas
# converts series to dataframes with a single column named 0, then
# concatenates.
dfs3 = [
df
if isinstance(df, pd.DataFrame)
else df.to_frame().rename(columns={df.name: 0})
for df in dfs2
]
# pandas may raise a RuntimeWarning for comparing ints and strs
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
if filter_warning:
warnings.simplefilter("ignore", FutureWarning)
cat_mask = pd.concat(
[(df.dtypes == "category").to_frame().T for df in dfs3],
join=join,
**kwargs,
).any()
if cat_mask.any():
not_cat = cat_mask[~cat_mask].index
# this should be aligned, so no need to filter warning
out = pd.concat(
[df[df.columns.intersection(not_cat)] for df in dfs3],
join=join,
**kwargs,
)
temp_ind = out.index
for col in cat_mask.index.difference(not_cat):
# Find an example of categoricals in this column
for df in dfs3:
sample = df.get(col)
if sample is not None:
break
# Extract partitions, subbing in missing if needed
parts = []
for df in dfs3:
if col in df.columns:
parts.append(df[col])
else:
codes = np.full(len(df), -1, dtype="i8")
data = pd.Categorical.from_codes(
codes, sample.cat.categories, sample.cat.ordered
)
parts.append(data)
out[col] = union_categoricals(parts, ignore_order=ignore_order)
# Pandas resets index type on assignment if frame is empty
# https://github.com/pandas-dev/pandas/issues/17101
if not len(temp_ind):
out.index = temp_ind
out = out.reindex(columns=cat_mask.index)
else:
# pandas may raise a RuntimeWarning for comparing ints and strs
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
if filter_warning:
warnings.simplefilter("ignore", FutureWarning)
> out = pd.concat(dfs3, join=join, sort=False)
.venv/lib/python3.10/site-packages/dask/dataframe/backends.py:653:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = ([ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -1.312222 1.504660 1.977883 ...01693e+09
19 -0.408113 -1.394895 1.052292 ... -0.644395 -0.197647 1.701693e+09
[10 rows x 7 columns]],)
kwargs = {'join': 'outer', 'sort': False}
@wraps(func)
def wrapper(*args, **kwargs):
if len(args) > num_allow_args:
warnings.warn(
msg.format(arguments=_format_argument_list(allow_args)),
FutureWarning,
stacklevel=find_stack_level(),
)
> return func(*args, **kwargs)
.venv/lib/python3.10/site-packages/pandas/util/_decorators.py:331:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
objs = [ posx posy energy ... posy_jittered energy_jittered timeStamps
0 -1.312222 1.504660 1.977883 .....701693e+09
19 -0.408113 -1.394895 1.052292 ... -0.644395 -0.197647 1.701693e+09
[10 rows x 7 columns]]
axis = 0, join = 'outer', ignore_index = False, keys = None, levels = None
names = None, verify_integrity = False, sort = False, copy = True
@deprecate_nonkeyword_arguments(version=None, allowed_args=["objs"])
def concat(
objs: Iterable[NDFrame] | Mapping[HashableT, NDFrame],
axis: Axis = 0,
join: str = "outer",
ignore_index: bool = False,
keys=None,
levels=None,
names=None,
verify_integrity: bool = False,
sort: bool = False,
copy: bool = True,
) -> DataFrame | Series:
"""
Concatenate pandas objects along a particular axis.
Allows optional set logic along the other axes.
Can also add a layer of hierarchical indexing on the concatenation axis,
which may be useful if the labels are the same (or overlapping) on
the passed axis number.
Parameters
----------
objs : a sequence or mapping of Series or DataFrame objects
If a mapping is passed, the sorted keys will be used as the `keys`
argument, unless it is passed, in which case the values will be
selected (see below). Any None objects will be dropped silently unless
they are all None in which case a ValueError will be raised.
axis : {0/'index', 1/'columns'}, default 0
The axis to concatenate along.
join : {'inner', 'outer'}, default 'outer'
How to handle indexes on other axis (or axes).
ignore_index : bool, default False
If True, do not use the index values along the concatenation axis. The
resulting axis will be labeled 0, ..., n - 1. This is useful if you are
concatenating objects where the concatenation axis does not have
meaningful indexing information. Note the index values on the other
axes are still respected in the join.
keys : sequence, default None
If multiple levels passed, should contain tuples. Construct
hierarchical index using the passed keys as the outermost level.
levels : list of sequences, default None
Specific levels (unique values) to use for constructing a
MultiIndex. Otherwise they will be inferred from the keys.
names : list, default None
Names for the levels in the resulting hierarchical index.
verify_integrity : bool, default False
Check whether the new concatenated axis contains duplicates. This can
be very expensive relative to the actual data concatenation.
sort : bool, default False
Sort non-concatenation axis if it is not already aligned when `join`
is 'outer'.
This has no effect when ``join='inner'``, which already preserves
the order of the non-concatenation axis.
.. versionchanged:: 1.0.0
Changed to not sort by default.
copy : bool, default True
If False, do not copy data unnecessarily.
Returns
-------
object, type of objs
When concatenating all ``Series`` along the index (axis=0), a
``Series`` is returned. When ``objs`` contains at least one
``DataFrame``, a ``DataFrame`` is returned. When concatenating along
the columns (axis=1), a ``DataFrame`` is returned.
See Also
--------
DataFrame.join : Join DataFrames using indexes.
DataFrame.merge : Merge DataFrames by indexes or columns.
Notes
-----
The keys, levels, and names arguments are all optional.
A walkthrough of how this method fits in with other tools for combining
pandas objects can be found `here
<https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html>`__.
It is not recommended to build DataFrames by adding single rows in a
for loop. Build a list of rows and make a DataFrame in a single concat.
Examples
--------
Combine two ``Series``.
>>> s1 = pd.Series(['a', 'b'])
>>> s2 = pd.Series(['c', 'd'])
>>> pd.concat([s1, s2])
0 a
1 b
0 c
1 d
dtype: object
Clear the existing index and reset it in the result
by setting the ``ignore_index`` option to ``True``.
>>> pd.concat([s1, s2], ignore_index=True)
0 a
1 b
2 c
3 d
dtype: object
Add a hierarchical index at the outermost level of
the data with the ``keys`` option.
>>> pd.concat([s1, s2], keys=['s1', 's2'])
s1 0 a
1 b
s2 0 c
1 d
dtype: object
Label the index keys you create with the ``names`` option.
>>> pd.concat([s1, s2], keys=['s1', 's2'],
... names=['Series name', 'Row ID'])
Series name Row ID
s1 0 a
1 b
s2 0 c
1 d
dtype: object
Combine two ``DataFrame`` objects with identical columns.
>>> df1 = pd.DataFrame([['a', 1], ['b', 2]],
... columns=['letter', 'number'])
>>> df1
letter number
0 a 1
1 b 2
>>> df2 = pd.DataFrame([['c', 3], ['d', 4]],
... columns=['letter', 'number'])
>>> df2
letter number
0 c 3
1 d 4
>>> pd.concat([df1, df2])
letter number
0 a 1
1 b 2
0 c 3
1 d 4
Combine ``DataFrame`` objects with overlapping columns
and return everything. Columns outside the intersection will
be filled with ``NaN`` values.
>>> df3 = pd.DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']],
... columns=['letter', 'number', 'animal'])
>>> df3
letter number animal
0 c 3 cat
1 d 4 dog
>>> pd.concat([df1, df3], sort=False)
letter number animal
0 a 1 NaN
1 b 2 NaN
0 c 3 cat
1 d 4 dog
Combine ``DataFrame`` objects with overlapping columns
and return only those that are shared by passing ``inner`` to
the ``join`` keyword argument.
>>> pd.concat([df1, df3], join="inner")
letter number
0 a 1
1 b 2
0 c 3
1 d 4
Combine ``DataFrame`` objects horizontally along the x axis by
passing in ``axis=1``.
>>> df4 = pd.DataFrame([['bird', 'polly'], ['monkey', 'george']],
... columns=['animal', 'name'])
>>> pd.concat([df1, df4], axis=1)
letter number animal name
0 a 1 bird polly
1 b 2 monkey george
Prevent the result from including duplicate index values with the
``verify_integrity`` option.
>>> df5 = pd.DataFrame([1], index=['a'])
>>> df5
0
a 1
>>> df6 = pd.DataFrame([2], index=['a'])
>>> df6
0
a 2
>>> pd.concat([df5, df6], verify_integrity=True)
Traceback (most recent call last):
...
ValueError: Indexes have overlapping values: ['a']
Append a single row to the end of a ``DataFrame`` object.
>>> df7 = pd.DataFrame({'a': 1, 'b': 2}, index=[0])
>>> df7
a b
0 1 2
>>> new_row = pd.Series({'a': 3, 'b': 4})
>>> new_row
a 3
b 4
dtype: int64
>>> pd.concat([df7, new_row.to_frame().T], ignore_index=True)
a b
0 1 2
1 3 4
"""
op = _Concatenator(
objs,
axis=axis,
ignore_index=ignore_index,
join=join,
keys=keys,
levels=levels,
names=names,
verify_integrity=verify_integrity,
copy=copy,
sort=sort,
)
> return op.get_result()
.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py:381:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pandas.core.reshape.concat._Concatenator object at 0x7f8cf9d153c0>
def get_result(self):
cons: Callable[..., DataFrame | Series]
sample: DataFrame | Series
# series only
if self._is_series:
sample = cast("Series", self.objs[0])
# stack blocks
if self.bm_axis == 0:
name = com.consensus_name_attr(self.objs)
cons = sample._constructor
arrs = [ser._values for ser in self.objs]
res = concat_compat(arrs, axis=0)
result = cons(res, index=self.new_axes[0], name=name, dtype=res.dtype)
return result.__finalize__(self, method="concat")
# combine as columns in a frame
else:
data = dict(zip(range(len(self.objs)), self.objs))
# GH28330 Preserves subclassed objects through concat
cons = sample._constructor_expanddim
index, columns = self.new_axes
df = cons(data, index=index, copy=self.copy)
df.columns = columns
return df.__finalize__(self, method="concat")
# combine block managers
else:
sample = cast("DataFrame", self.objs[0])
mgrs_indexers = []
for obj in self.objs:
indexers = {}
for ax, new_labels in enumerate(self.new_axes):
# ::-1 to convert BlockManager ax to DataFrame ax
if ax == self.bm_axis:
# Suppress reindexing on concat axis
continue
# 1-ax to convert BlockManager axis to DataFrame axis
obj_labels = obj.axes[1 - ax]
if not new_labels.equals(obj_labels):
indexers[ax] = obj_labels.get_indexer(new_labels)
mgrs_indexers.append((obj._mgr, indexers))
> new_data = concatenate_managers(
mgrs_indexers, self.new_axes, concat_axis=self.bm_axis, copy=self.copy
)
.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py:616:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mgrs_indexers = [(BlockManager
Items: Index(['posx', 'posy', 'energy', 'posx_jittered', 'posy_jittered',
'energy_jittered', 'ti... step=1)
NumericBlock: [0 1 3 4 5 6], 6 x 10, dtype: float64
NumericBlock: slice(2, 3, 1), 1 x 10, dtype: float64, {})]
axes = [Index(['posx', 'posy', 'energy', 'posx_jittered', 'posy_jittered',
'energy_jittered', 'timeStamps'],
dtype='object'), RangeIndex(start=0, stop=20, step=1)]
concat_axis = 1, copy = True
def concatenate_managers(
mgrs_indexers, axes: list[Index], concat_axis: int, copy: bool
) -> Manager:
"""
Concatenate block managers into one.
Parameters
----------
mgrs_indexers : list of (BlockManager, {axis: indexer,...}) tuples
axes : list of Index
concat_axis : int
copy : bool
Returns
-------
BlockManager
"""
# TODO(ArrayManager) this assumes that all managers are of the same type
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)
concat_plans = [
_get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers
]
concat_plan = _combine_concat_plans(concat_plans, concat_axis)
blocks = []
> for placement, join_units in concat_plan:
.venv/lib/python3.10/site-packages/pandas/core/internals/concat.py:205:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
plans = [<list_iterator object at 0x7f8cf9d14a90>, <list_iterator object at 0x7f8cf9d172b0>]
concat_axis = 1
def _combine_concat_plans(plans, concat_axis: int):
"""
Combine multiple concatenation plans into one.
existing_plan is updated in-place.
"""
if len(plans) == 1:
for p in plans[0]:
yield p[0], [p[1]]
elif concat_axis == 0:
offset = 0
for plan in plans:
last_plc = None
for plc, unit in plan:
yield plc.add(offset), [unit]
last_plc = plc
if last_plc is not None:
offset += last_plc.as_slice.stop
else:
# singleton list so we can modify it as a side-effect within _next_or_none
num_ended = [0]
def _next_or_none(seq):
retval = next(seq, None)
if retval is None:
num_ended[0] += 1
return retval
plans = list(map(iter, plans))
next_items = list(map(_next_or_none, plans))
while num_ended[0] != len(next_items):
if num_ended[0] > 0:
> raise ValueError("Plan shapes are not aligned")
E ValueError: Plan shapes are not aligned
.venv/lib/python3.10/site-packages/pandas/core/internals/concat.py:742: ValueError
@steinnymir any ideas? I cant figure the cause. It also seems to fail onlt in 3.10
@steinnymir any ideas? I cant figure the cause. It also seems to fail onlt in 3.10
There is also a failed run on 3.9. I tried to reproduce locally with 3.8, but never got the error.
In workflow run https://github.com/OpenCOMPES/sed/actions/runs/6942029903 the test for forward_fill_lazy has been failing. Please investigate.