Closed itamarst closed 4 years ago
Thanks @itamarst! I've tried out persist_with_trace
followed by compute_with_trace
, and that seems to work. However, in some cases, errors like the following occur:
.venv/lib/python3.7/site-packages/eliot/dask.py:99: in persist_with_trace
optimized = optimize(*args, optimizations=[_add_logging])
.venv/lib/python3.7/site-packages/dask/base.py:368: in optimize
dsk = collections_to_dsk(collections, **kwargs)
.venv/lib/python3.7/site-packages/dask/base.py:214: in collections_to_dsk
groups = {k: (opt(dsk, keys), keys) for k, (dsk, keys) in groups.items()}
.venv/lib/python3.7/site-packages/dask/base.py:214: in <dictcomp>
groups = {k: (opt(dsk, keys), keys) for k, (dsk, keys) in groups.items()}
.venv/lib/python3.7/site-packages/eliot/dask.py:129: in _add_logging
if not callable(value) and value in keys:
.venv/lib/python3.7/site-packages/pandas/core/ops/__init__.py:1561: in f
other = _align_method_FRAME(self, other, axis=None)
.venv/lib/python3.7/site-packages/pandas/core/ops/__init__.py:1458: in _align_method_FRAME
right = to_series(right)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
right = ('from_pandas-ef0c9d9245c84d0ca89a3f9c1d2c0349', 0)
def to_series(right):
msg = (
"Unable to coerce to Series, length must be {req_len}: " "given {given_len}"
)
if axis is not None and left._get_axis_name(axis) == "index":
if len(left.index) != len(right):
raise ValueError(
msg.format(req_len=len(left.index), given_len=len(right))
)
right = left._constructor_sliced(right, index=left.index)
else:
if len(left.columns) != len(right):
raise ValueError(
> msg.format(req_len=len(left.columns), given_len=len(right))
)
E ValueError: Unable to coerce to Series, length must be 4: given 2
In the trace above, right
looks like a Dask key instead of an actual value. Could this be an unhandled case in the optimization step?
I will try to create a minimal example to reproduce this. The error above occurred in our test suite, and is probably not very helpful.
This example produces the same error:
import dask.dataframe as dd
import pandas as pd
from eliot.dask import persist_with_trace
df = pd.DataFrame()
df = dd.from_pandas(df, npartitions=1)
persist_with_trace(df)
Thank you! I will see if I can fix that.
Try now?
Looking good, @itamarst! All tests passed on my end. Thanks for taking the time to fix this on such short notice!
OK, I just released 1.12.0 with this improvement.
Sponsorship message: If you're ever interested in Eliot or Dask-related consulting, I'm happy to talk, and if you're packaging your Python code into Docker images you might also be interested in my production-ready template: https://pythonspeed.com/products/pythoncontainer/
Thank you!
Fixes #446.
@mdwint could you try this out and see if solves your problem, and that the logs look as you'd expect?