Closed ankatiyar closed 1 month ago
Also reminder to revert changes in https://github.com/kedro-org/kedro-plugins/pull/591 after this is resolved
The outcome for this ticket is to investigate what's the root cause and propose a solution to fix it
Potential causes:
Tested with:
scikit-learn : 1.4.1.post1
numpy==1.26.4
Things explored so far:
scikit-learn
validates input data https://github.com/scikit-learn/scikit-learn/blob/941acc419b8e7bec86fdc6b27ab3c4703022f140/sklearn/utils/validation.py#L1099numpy array
and then setting array.flags.writeable = True
https://github.com/scikit-learn/scikit-learn/blob/941acc419b8e7bec86fdc6b27ab3c4703022f140/sklearn/utils/_array_api.py#L712ValueError: cannot set WRITEABLE flag to True of this array
OWNDATA
is False
, so the array owns the memory it uses or borrows it from another object. https://numpy.org/doc/stable/reference/generated/numpy.ndarray.flags.html
C_CONTIGUOUS : True
F_CONTIGUOUS : True
OWNDATA : False
WRITEABLE : True
ALIGNED : True
WRITEBACKIFCOPY : False
WRITEABLE
attribute if the original array has WRITEABLE =False
numpy array
shares memory with another object which is not WRITEABLE
numpy array
is created straight from input provided and the error happens only for pandas.core.series.Series
, the same conversion works well for pandas.core.frame.DataFrame
After further investigation, it was found out that the problem appears after the object is retrieved from SharedMemoryDataset
. In the example below we convert pandas.core.series.Series
to numpy array
and then set up WRITEABLE=True
which works well. After the object was saved to SharedMemoryDataset
and then retrieved OWNDATA
flag became False
and changing WRITEABLE
gives an error.
input_path = Path.cwd() / "data"
y_train = pd.read_csv(input_path / "02_intermediate" / "y_train.csv")
# converting to series
y_train = y_train.stack()
print(type(y_train))
test_y = numpy.asarray(y_train, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
manager = ParallelRunnerManager()
manager.start()
dataset = SharedMemoryDataset(manager=manager)
dataset._save(y_train)
out = dataset._load()
print(type(out))
test_y = numpy.asarray(out, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
Output:
A further plan is to investigate what's happening in the SharedMemoryDataset
, whether it's expected, and why it only affects pandas.core.series.Series
.
In the earlierscikit-learn
versions <= 1.4.0
the following step is absent, so the error is not happening:
# With an input pandas dataframe or series, we know we can always make the
# resulting array writeable:
# - if copy=True, we have already made a copy so it is fine to make the
# array writeable
# - if copy=False, the caller is telling us explicitly that we can do
# in-place modifications
# See https://pandas.pydata.org/docs/dev/user_guide/copy_on_write.html#read-only-numpy-arrays
# for more details about pandas copy-on-write mechanism, that is enabled by
# default in pandas 3.0.0.dev.
if _is_pandas_df_or_series(array_orig) and hasattr(array, "flags"):
array.flags.writeable = True
With the test below it was confirmed that the problem is in SharedMemoryDataset
as exactly the same example as above but with MemoryDataset
works well.
input_path = Path.cwd() / "data"
y_train = pd.read_csv(input_path / "02_intermediate" / "y_train.csv")
# converting to series
y_train = y_train.stack()
print(type(y_train))
test_y = numpy.asarray(y_train, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
dataset = MemoryDataset()
dataset._save(y_train)
out = dataset._load()
print(type(out))
test_y = numpy.asarray(out, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
Further tests excluded the kedro code base. The actual problem happens when using multiprocessing.managers.BaseManager
inside the ParallelRunner
. We registering MemoryDataset
to be used with multiprocessing.managers.BaseManager
as follows:
class ParallelRunnerManager(SyncManager):
"""``ParallelRunnerManager`` is used to create shared ``MemoryDataset``
objects as default data sets in a pipeline.
"""
ParallelRunnerManager.register("MemoryDataset", MemoryDataset)
When running ParallelRunner
places MemoryDataset
into shared memory and returns a proxy of MemoryDataset
object.
https://docs.python.org/3/library/multiprocessing.shared_memory.html
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.managers.BaseManager
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.managers.BaseProxy
After we retrieve a dataset from MemoryDataset
proxy object - we get this error if setting WRITEABLE=True
from multiprocessing.managers import BaseManager
from kedro.io import MemoryDataset
class MyManager(BaseManager): pass
MyManager.register("MemoryDataset", MemoryDataset, exposed=('_save', '_load'))
def main():
input_path = Path.cwd() / "data"
y_train = pd.read_csv(input_path / "02_intermediate" / "y_train.csv")
y_train = y_train.stack()
print(type(y_train))
test_y = numpy.asarray(y_train, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
manager = MyManager()
manager.start()
dataset = manager.MemoryDataset()
dataset._save(y_train)
out = dataset._load()
print(type(out))
test_y_out = numpy.asarray(out, order=None, dtype=None)
print(test_y_out.flags)
test_y_out.flags.writeable = True
The reason for the above is that numpy
doesn't allow arrays based on read-only buffer to be set as writeable. Possible reason of why the behaviour differs for pd.DataFrame
and pd.Series
is that the conversion numpy.asarray()
happens in a different way, so that in pd.DataFrame
case we are getting the copy of the object.
Thus making a copy of loaded from MemoryDataset
pd.Series
object solves the problem.
from multiprocessing.managers import BaseManager
from kedro.io import MemoryDataset
class MyManager(BaseManager): pass
MyManager.register("MemoryDataset", MemoryDataset, exposed=('_save', '_load'))
input_path = Path.cwd() / "data"
y_train = pd.read_csv(input_path / "02_intermediate" / "y_train.csv")
y_train = y_train.stack()
print(type(y_train))
test_y = numpy.asarray(y_train, order=None, dtype=None)
print(test_y.flags)
test_y.flags.writeable = True
manager = MyManager()
manager.start()
dataset = manager.MemoryDataset()
dataset._save(y_train)
out = copy.deepcopy(dataset._load())
print(type(out))
test_y_out = numpy.asarray(out, order=None, dtype=None)
print(test_y_out.flags)
test_y_out.flags.writeable = True
So the solution that might work for us is to modify the part where we retrieve data from the catalog before calling the node function here:
def _run_node_sequential(
node: Node,
catalog: DataCatalog,
hook_manager: PluginManager,
session_id: str | None = None,
) -> Node:
inputs = {}
for name in node.inputs:
hook_manager.hook.before_dataset_loaded(dataset_name=name, node=node)
data = catalog.load(name)
if isinstance(data, pd.Series):
inputs[name] = copy.deepcopy(data)
else:
inputs[name] = data
hook_manager.hook.after_dataset_loaded(
dataset_name=name, data=inputs[name], node=node
)
Tested this locally and it works.
Summary:
scikit-learn
versions as all seem valid on their side as well@noklam, @ankatiyar, @merelcht, @astrojuanlu need your thoughts here on whether we want to apply the suggested fix, though it might take time to follow through all my comments above š
@ElenaKhaustova Can you point to the changes that you have made?
I wonder if there is anything we can report upstream and create an example that strip away the kedro related context. From what I've read the problem is not a bug of pandas
or numpy
, but rather scikit-learn
did a validation and update the flag. So maybe we should report this upstream to scikit-learn
.
cannot set WRITEABLE flag to True of this array
Searching this bug there's tons of report everywhere, some are libraries compatibility issue.
Is this a scikit-learn
problem? It seems that from your latest comment you can reproduce the same issue even with just SharedMemoryDataset
and numpy
.
Can you also point me to the change that works?
@ElenaKhaustova Can you point to the changes that you have made?
I wonder if there is anything we can report upstream and create an example that strip away the kedro related context. From what I've read the problem is not a bug of
pandas
ornumpy
, but ratherscikit-learn
did a validation and update the flag. So maybe we should report this upstream toscikit-learn
.
These are the changes: https://github.com/kedro-org/kedro/issues/3674#issuecomment-2045291676 I'll open a draft PR as well for better visibility.
Yes, we can strip away the kedro related context by creating fake MemoryDataset
with save
and load
methods. We might try to report this though it doesn't seem like a bug from their side as well.
I can create a fake dataset and add scikit-learn
logic to showcase an error if we want to report them.
Oh sorry I didn't notice that was the change. This remind me of something. If you check MemoryDataset
.
if copy_mode == "deepcopy":
copied_data = copy.deepcopy(data)
elif copy_mode == "copy":
copied_data = data.copy()
elif copy_mode == "assign":
copied_data = data
We already have something like this, maybe we just need to do update _infer_copy_mode
?
def _infer_copy_mode(data: Any) -> str:
"""Infers the copy mode to use given the data type.
Args:
data: The data whose type will be used to infer the copy mode.
Returns:
One of "copy", "assign" or "deepcopy" as the copy mode to use.
"""
try:
import pandas as pd
except ImportError: # pragma: no cover
pd = None # type: ignore[assignment] # pragma: no cover
try:
import numpy as np
except ImportError: # pragma: no cover
np = None # type: ignore[assignment] # pragma: no cover
if pd and isinstance(data, pd.DataFrame) or np and isinstance(data, np.ndarray):
copy_mode = "copy"
elif type(data).__name__ == "DataFrame":
copy_mode = "assign"
else:
copy_mode = "deepcopy"
return copy_mode
Oh sorry I didn't notice that was the change. This remind me of something. If you check
MemoryDataset
.if copy_mode == "deepcopy": copied_data = copy.deepcopy(data) elif copy_mode == "copy": copied_data = data.copy() elif copy_mode == "assign": copied_data = data
We already have something like this, maybe we just need to do update
_infer_copy_mode
?def _infer_copy_mode(data: Any) -> str: """Infers the copy mode to use given the data type. Args: data: The data whose type will be used to infer the copy mode. Returns: One of "copy", "assign" or "deepcopy" as the copy mode to use. """ try: import pandas as pd except ImportError: # pragma: no cover pd = None # type: ignore[assignment] # pragma: no cover try: import numpy as np except ImportError: # pragma: no cover np = None # type: ignore[assignment] # pragma: no cover if pd and isinstance(data, pd.DataFrame) or np and isinstance(data, np.ndarray): copy_mode = "copy" elif type(data).__name__ == "DataFrame": copy_mode = "assign" else: copy_mode = "deepcopy" return copy_mode
Here is the draft pr: https://github.com/kedro-org/kedro/pull/3795/files
The problem is that we have to make the copy after the data was retrieved (after load()
) from the catalog, but the _infer_copy_mode
is done inside before it, so it doesn't change anything. So we cannot make it there š
Example with stripped kedro logic to reproduce the error.
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.managers import BaseManager
import traceback
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
class MemoryDataset:
def __init__(self):
self._ds = None
def save(self, ds):
self._ds = ds
def load(self):
return self._ds
def train_model(dataset: MemoryDataset) -> LinearRegression:
regressor = LinearRegression()
X_train, y_train = dataset.load()
try:
regressor.fit(X_train, y_train)
except Exception as _:
print(traceback.format_exc())
return regressor
class MyManager(BaseManager):
pass
MyManager.register("MemoryDataset", MemoryDataset, exposed=("save", "load"))
def main():
rng = np.random.default_rng()
n_samples = 1000
X_train = pd.DataFrame(rng.random((n_samples, 4)), columns=list('ABCD'))
y_train = pd.Series(rng.random(n_samples))
futures = set()
manager = MyManager()
manager.start()
dataset = manager.MemoryDataset()
dataset.save((X_train, y_train))
with ProcessPoolExecutor(max_workers=1) as pool:
futures.add(pool.submit(train_model, dataset))
if __name__ == "__main__":
main()
Looks like this is mostly an upstream bug and there's little we can do about it. Unfortunately this means that ParallelRunner
is mostly broken for a good chunk of basic use cases.
Removing this from our sprints, for now.
I can reproduce this issue with the SequentialRunner
.
(Rough) steps:
spaceflights-pandas-viz
X_test
and y_test
:X_test:
type: pickle.PickleDataset
filepath: data/05_model_input/X_test.pkl
y_test:
type: pickle.PickleDataset
filepath: data/05_model_input/y_test.pkl
$ kedro run --to-outputs=X_test,y_test
$ kedro run --from-nodes=evaluate_model_node
Full traceback:
It's funny because the array is WRITEABLE
already anyway.
āÆ python -m pdb -m kedro run --from-nodes=evaluate_model_node --params mlflow_run_id=4cba849c8f2d403887e95dbef109
1142 --runner=SequentialRunner
> /Users/juan_cano/Projects/QuantumBlackLabs/kedro-mlflow-playground/spaceflights-mlflow/.venv/lib/python3.11/site-packages/kedro/__main__.py(1)<module>()
-> """Entry point when invoked with python -m kedro.""" # pragma: no cover
(Pdb) b /Users/juan_cano/Projects/QuantumBlackLabs/kedro-mlflow-playground/spaceflights-mlflow/.venv/lib/python3.11/site-packages/sklearn/utils/validation.py:1107
Breakpoint 1 at /Users/juan_cano/Projects/QuantumBlackLabs/kedro-mlflow-playground/spaceflights-mlflow/.venv/lib/python3.11/site-packages/sklearn/utils/validation.py:1107
(Pdb) c
[06/04/24 11:31:57] INFO Using `conf/logging.yml` as logging configuration. You can change __init__.py:249
this by setting the KEDRO_LOGGING_CONFIG environment variable
accordingly.
[06/04/24 11:32:01] INFO Kedro project spaceflights-mlflow session.py:324
INFO Registering new custom resolver: 'km.random_name' mlflow_hook.py:65
INFO The 'tracking_uri' key in mlflow.yml is relative kedro_mlflow_config.py:260
('server.mlflow_(tracking|registry)_uri = mlflow_runs').
It is converted to a valid uri:
'file:///Users/juan_cano/Projects/QuantumBlackLabs/kedro-
mlflow-playground/spaceflights-mlflow/mlflow_runs'
[06/04/24 11:32:08] INFO Logging extra metadata to MLflow hooks.py:13
INFO Using synchronous mode for loading and saving data. Use the sequential_runner.py:64
--async flag for potential performance gains.
https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_p
ipeline.html#load-and-save-asynchronously
INFO Loading data from regressor (MlflowModelTrackingDataset)... data_catalog.py:508
INFO Loading data from X_test (PickleDataset)... data_catalog.py:508
INFO Loading data from y_test (PickleDataset)... data_catalog.py:508
INFO Running node: evaluate_model_node: node.py:361
evaluate_model([regressor;X_test;y_test]) -> [metrics]
> /Users/juan_cano/Projects/QuantumBlackLabs/kedro-mlflow-playground/spaceflights-mlflow/.venv/lib/python3.11/site-packages/sklearn/utils/validation.py(1107)check_array()
-> array.flags.writeable = True
(Pdb) p array.flags
C_CONTIGUOUS : False
F_CONTIGUOUS : True
OWNDATA : False
WRITEABLE : True
ALIGNED : True
WRITEBACKIFCOPY : False
Seems to have nothing to do with Kedro:
import pickle
from sklearn.metrics import mean_absolute_error
with open("_data/X_test.pkl", "rb") as fh:
X_test = pickle.load(fh)
with open("_data/y_test.pkl", "rb") as fh:
y_test = pickle.load(fh)
with open("_data/regressor.pickle", "rb") as fh:
regressor = pickle.load(fh)
y_pred = regressor.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
Attaching the contents of _data
.
And my uv pip freeze
:
And Python version:
$ python -VV
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)]
Seems to have nothing to do with Kedro:
That's sad it's still in the 1.5.0
version. Maybe we can open one more issue on their side since it is a completely different example causing the same behaviour?
There's a PR which can mitigate the problem but not solve it completely, since there's a conversation if setting writable=True
is correct in general: https://github.com/scikit-learn/scikit-learn/issues/28824
Maybe we can open one more issue on their side since it is a completely different example causing the same behaviour?
I'd love to do it myself but I prefer to focus on other things, if you have a moment feel free!
Maybe we can open one more issue on their side since it is a completely different example causing the same behaviour?
I'd love to do it myself but I prefer to focus on other things, if you have a moment feel free!
Done: https://github.com/scikit-learn/scikit-learn/issues/29182
I confirm https://github.com/scikit-learn/scikit-learn/pull/29018 fixes this issue š
Description
Flagged by failing CI on
kedro-docker
https://github.com/kedro-org/kedro-plugins/issues/558 Basically,scikit-learn
(which is a dependency of thespaceflights-*
starters) had a new release on 16th Feb - https://pypi.org/project/scikit-learn/1.4.1.post1/ which doesn't play well with theParallelRunner
Context
Stacktrace
Related https://github.com/scikit-learn/scikit-learn/pull/28348
Steps to Reproduce
kedro run --runner=ParallelRunner