nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
119 stars 57 forks source link

hashing functions/objects defined elsewhere #717

Closed satra closed 4 months ago

satra commented 8 months ago

with current master after hashing changes were incorporated.

FAILED pydra_ml/tests/test_classifier.py::test_classifier - pydra.utils.hash.UnhashableError: Cannot hash object {'permute': True, 'model': (Pipeline(steps=[('...
FAILED pydra_ml/tests/test_classifier.py::test_regressor - pydra.utils.hash.UnhashableError: Cannot hash object {'permute': True, 'model': (Pipeline(steps=[('...

in pydra_ml almost everything was marked as type Any, with the old hashing function figuring out things. i saw this as something a general scripter would do. i can go and do stricter type annotation (i have tried this too - see below), but i wouldn't expect a general user to do that when importing a function from an arbitrary library.

what's the minimal thing one can do to fix this on the user side, so that arbitrary functions could be imported in pydra? i suspect this may actually require changes to the hashing. an approach may be to pickle the object and generate the byte stream, which seems like a sensible fallback in a local setting instead of UnhashableError.

in this particular case, the scikit-learn Pipeline object is being used, which is an arbitrarily nested object of objects. it's the input to this function. an equivalent consideration would be someone decided to a pass a pydra workflow as an input to a function.

https://github.com/nipype/pydra-ml/blob/b58ad3d488857716df74c5917d5ad11729c25258/pydra_ml/tasks.py#L129

def get_feature_importance(*,
                           permute: bool,
                           model: tuple[Pipeline, list, list],
                           gen_feature_importance: bool = True):
effigies commented 8 months ago

This shouldn't be tied to type annotations, apart from files, but actual runtime types. Hashing the cloudpickle as a fallback for types we don't know how to hash seems reasonable.

effigies commented 8 months ago

Looking at the old function:

def hash_function(obj):
    """Generate hash of object."""
    return sha256(str(obj).encode()).hexdigest()

I see how that was so successful. :-) I do think it would be good to use something more like cloudpickle than str(). str() of many objects will be sensitive to changes in interpreter and insensitive to changes in contents.

effigies commented 8 months ago

Oh, and finally, the fastest way to enable hashing on a currently-unhashable object would be:

import cloudpickle as cp
from pydra.utils.hash import register_serializer, Cache

@register_serializer
def bytes_repr_Pipeline(obj: Pipeline, cache: Cache):
    yield cp.dumps(obj)
satra commented 8 months ago

thanks @effigies for the register code. that helps.

in this particular instance str(obj) works because a scikit-learn's pipeline is reproducible from it's repr:

Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))])

but a general pipeline could indeed use the cloudpickle.

should i leave this open for adding a fallback option?

ps. also we still haven't solved function hashing in general across putatively similar environments (say same versions of libraries installed in different operating system environments) - but that's a different challenge.

satra commented 8 months ago

actually the register code by itself doesn't work. as the object is in a dict and perhaps there is no recursion there:

{'permute': True, 'model': (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3], [1, 10, 12]), 
'gen_feature_importance': False, 
'_func': b'\x80\x05\x95-\x00\x00\x00\x00\x00\x00\x00\x8c\x0epydra_ml.tasks\x94\x8c\x16get_feature_importance\x94\x93\x94.'}

also i just injected that register in pydra_ml rather than pydra, which i think is the right thing to do.

effigies commented 8 months ago

There should be recursion. We built it that way.

satra commented 8 months ago

hmmm. this whole thing goes to hash_object

(Pdb) u
> /Users/satra/software/nipype/pydra/pydra/utils/hash.py(63)hash_function()
-> return hash_object(obj).hex()
(Pdb) obj
{'permute': True, 'model': (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 13, 16, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 38, 39, 40, 41, 42, 43, 44, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 65, 67, 68, 69, 70, 72, 73, 74, 76, 77, 79, 80, 81, 82, 83, 84, 86, 87, 88, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 119, 120, 121, 122, 123, 124, 125, 126, 128, 129, 130, 131, 133, 135, 136, 137, 138, 139, 141, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 158, 160, 161, 162, 163, 164, 166, 167, 168, 169, 171, 173, 174, 176, 177, 178, 180, 181, 182, 183, 184, 185, 186, 187, 189, 190, 191, 192, 193, 195, 197, 198, 199, 200, 201, 202, 203, 204, 206, 207, 208, 209, 212, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 232, 234, 236, 237, 238, 240, 241, 242, 243, 244, 245, 246, 248, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 265, 266, 267, 269, 270, 271, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 300, 302, 303, 304, 305, 306, 307, 309, 311, 312, 313, 314, 315, 316, 317, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 331, 332, 333, 334, 335, 336, 338, 339, 341, 342, 343, 344, 346, 347, 349, 351, 352, 355, 357, 359, 360, 361, 362, 363, 365, 366, 367, 368, 369, 370, 371, 373, 374, 375, 376, 377, 378, 379, 380, 381, 383, 384, 386, 387, 388, 390, 392, 393, 394, 395, 396, 397, 398, 399, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 415, 418, 419, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 433, 435, 436, 437, 438, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 459, 460, 461, 462, 464, 467, 469, 470, 472, 474, 475, 476, 477, 478, 479, 480, 481, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 509, 510, 511, 513, 514, 517, 520, 521, 522, 523, 524, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 539, 540, 541, 542, 543, 544, 545, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 561, 563, 565, 568], [1, 10, 12, 14, 15, 17, 21, 31, 37, 45, 46, 64, 66, 71, 75, 78, 85, 89, 90, 102, 103, 107, 108, 118, 127, 132, 134, 140, 142, 157, 159, 165, 170, 172, 175, 179, 188, 194, 196, 205, 210, 211, 213, 224, 231, 233, 235, 239, 247, 249, 250, 263, 264, 268, 272, 283, 298, 301, 308, 310, 318, 319, 330, 337, 340, 345, 348, 350, 353, 354, 356, 358, 364, 372, 382, 385, 389, 391, 400, 401, 412, 413, 414, 416, 417, 420, 421, 432, 434, 439, 457, 458, 463, 465, 466, 468, 471, 473, 482, 500, 504, 512, 515, 516, 518, 519, 525, 538, 546, 560, 562, 564, 566, 567]), 'gen_feature_importance': False, '_func': b'\x80\x05\x95-\x00\x00\x00\x00\x00\x00\x00\x8c\x0epydra_ml.tasks\x94\x8c\x16get_feature_importance\x94\x93\x94.'}
satra commented 8 months ago

this branch has the changes: https://github.com/nipype/pydra-ml/pull/59

and i'm just running this test to work through the changes:

pytest --pdb pydra_ml/tests/test_classifier.py::test_classifier
satra commented 8 months ago

actually nevermind. the env still seemed to have pydra 0.22. checking with 0.23.alpha now.

satra commented 8 months ago

the issue persists.

tclose commented 8 months ago

I think we probably need to provide a better debugging experience when hashing fails. Has the registration worked properly or is there an error inside the registered function, @satra?

satra commented 8 months ago

at least in pdb when i try to run the node it says UnhashableException and the place where it places the exception is the hash_object function which receives the entire dictionary object (see https://github.com/nipype/pydra/issues/717#issuecomment-1774109928).

since the Pipeline object is an input in other places, i do think the registration is working. i.e. i can make things not work by inserting something in the registration function.

effigies commented 8 months ago

The code is: https://github.com/nipype/pydra/blob/0e3d33cd782d88c5ad0f3f2c6d7a07c8eba8b160/pydra/utils/hash.py#L66-L78

With the raise ... from construct, the default behavior should be to see the chain of exceptions. Are we catching this exception somewhere else and re-raising it without context?

effigies commented 8 months ago

@satra I can't reproduce. I'm getting an entirely different error on your PR:

$ pydra_ml/tests/test_classifier.py F
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> captured stderr >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
100%|██████████| 114/114 [00:00<00:00, 273.40it/s]
100%|██████████| 114/114 [00:00<00:00, 282.79it/s]
100%|██████████| 114/114 [00:00<00:00, 271.81it/s]
100%|██████████| 114/114 [00:00<00:00, 268.36it/s]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

tmpdir = local('/tmp/pytest-of-chris/pytest-10/test_classifier0')

    def test_classifier(tmpdir):
        clfs = [
            ("sklearn.neural_network", "MLPClassifier", {"alpha": 1, "max_iter": 1000}),
            [
                ["sklearn.impute", "SimpleImputer"],
                ["sklearn.preprocessing", "StandardScaler"],
                ["sklearn.naive_bayes", "GaussianNB", {}],
            ],
        ]
        csv_file = os.path.join(os.path.dirname(__file__), "data", "breast_cancer.csv")
        inputs = {
            "filename": csv_file,
            "x_indices": range(10),
            "target_vars": ("target",),
            "group_var": None,
            "n_splits": 2,
            "test_size": 0.2,
            "clf_info": clfs,
            "permute": [True, False],
            "gen_feature_importance": False,
            "gen_permutation_importance": False,
            "permutation_importance_n_repeats": 5,
            "permutation_importance_scoring": "accuracy",
            "gen_shap": True,
            "nsamples": 15,
            "l1_reg": "aic",
            "plot_top_n_shap": 16,
            "metrics": ["roc_auc_score", "accuracy_score"],
        }
        wf = gen_workflow(inputs, cache_dir=tmpdir)
>       results = run_workflow(wf, "cf", {"n_procs": 1})

pydra_ml/tests/test_classifier.py:38: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
pydra_ml/classifier.py:175: in run_workflow
    sub(runnable=wf)
../pydra-tmp/pydra/engine/submitter.py:42: in __call__
    self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
../../../mambaforge/envs/pydra-dev/lib/python3.11/asyncio/base_events.py:653: in run_until_complete
    return future.result()
../pydra-tmp/pydra/engine/submitter.py:71: in submit_from_call
    await self.expand_runnable(runnable, wait=True, rerun=rerun)
../pydra-tmp/pydra/engine/submitter.py:128: in expand_runnable
    await asyncio.gather(*futures)
../pydra-tmp/pydra/engine/helpers.py:586: in load_and_run_async
    await task._run(submitter=submitter, rerun=rerun, **kwargs)
../pydra-tmp/pydra/engine/core.py:1237: in _run
    result.output = self._collect_outputs()
../pydra-tmp/pydra/engine/core.py:1365: in _collect_outputs
    val_out = val.get_value(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LazyOutField(name='feature_importance', field='feature_importance', type=pydra.engine.specs.StateArray[typing.List[typing.Any]], splits=frozenset({(('ml_wf.clf_info',), ('ml_wf.permute',))}), cast_from=None)
wf = <pydra.engine.core.Workflow object at 0x7f4873b5efd0>, state_index = None

    def get_value(
        self, wf: "pydra.Workflow", state_index: ty.Optional[int] = None
    ) -> ty.Any:
        """Return the value of a lazy field.

        Parameters
        ----------
        wf : Workflow
            the workflow the lazy field references
        state_index : int, optional
            the state index of the field to access

        Returns
        -------
        value : Any
            the resolved value of the lazy-field
        """
        from ..utils.typing import TypeParser  # pylint: disable=import-outside-toplevel

        node = getattr(wf, self.name)
        result = node.result(state_index=state_index)
        if result is None:
>           raise RuntimeError(
                f"Could not find results of '{node.name}' node in a sub-directory "
                f"named '{node.checksum}' in any of the cache locations:\n"
                + "\n".join(str(p) for p in set(node.cache_locations))
            )
E           RuntimeError: Could not find results of 'feature_importance' node in a sub-directory named 'FunctionTask_553735ecdd5564cc0b0913c68a4fa342' in any of the cache locations:
E           /tmp/pytest-of-chris/pytest-10/test_classifier0

../pydra-tmp/pydra/engine/specs.py:1012: RuntimeError

Can move this comment to https://github.com/nipype/pydra-ml/pull/59 if you'd prefer.

satra commented 8 months ago

that's the error, and if you pdb it, move up one slot in the stack and try to run the node variable node(), it gives an unhashableexception error. the fact that it can't find the results suggests something is off with hashing. we could move the comment there, but that test works with 0.22 and doesn't even with the changes in that PR with 0.23. i released version 0.6 yesterday pinning to 0.22. so your call where we should discuss it.

effigies commented 8 months ago

I really can't reproduce this:

>>>>>>>>>>>>>>>>>>>>>>> PDB post_mortem (IO-capturing turned off) >>>>>>>>>>>>>>>>>>>>>>>>
> /home/chris/Projects/nipype/pydra-tmp/pydra/engine/specs.py(1012)get_value()
-> raise RuntimeError(
(Pdb) node.checksum
'FunctionTask_5223246ff6af31c41f6fd9a6373f0f6e'
(Pdb) node.result()
(Pdb) node()
[Result(output=Output(feature_importance=[]), runtime=None, errored=False), Result(output=Output(feature_importance=[]), runtime=None, errored=False)]
(Pdb) node.result()
[Result(output=Output(feature_importance=[]), runtime=None, errored=False), Result(output=Output(feature_importance=[]), runtime=None, errored=False)]

Attempting to look at node() from any other stack level failed. I don't know why it's failing, I don't know why I'm getting something different from you. I've tried with the current master and with 0.23a. I can run hash_function(node.inputs.model):

(Pdb) hash_function(node.inputs.model)
'7fff85b269abee56eed987e50e4d203f'
(Pdb) node.inputs.model
StateArray((Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 13, 16, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 38, 39, 40, 41, 42, 43, 44, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 65, 67, 68, 69, 70, 72, 73, 74, 76, 77, 79, 80, 81, 82, 83, 84, 86, 87, 88, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 119, 120, 121, 122, 123, 124, 125, 126, 128, 129, 130, 131, 133, 135, 136, 137, 138, 139, 141, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 158, 160, 161, 162, 163, 164, 166, 167, 168, 169, 171, 173, 174, 176, 177, 178, 180, 181, 182, 183, 184, 185, 186, 187, 189, 190, 191, 192, 193, 195, 197, 198, 199, 200, 201, 202, 203, 204, 206, 207, 208, 209, 212, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 232, 234, 236, 237, 238, 240, 241, 242, 243, 244, 245, 246, 248, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 265, 266, 267, 269, 270, 271, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 300, 302, 303, 304, 305, 306, 307, 309, 311, 312, 313, 314, 315, 316, 317, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 331, 332, 333, 334, 335, 336, 338, 339, 341, 342, 343, 344, 346, 347, 349, 351, 352, 355, 357, 359, 360, 361, 362, 363, 365, 366, 367, 368, 369, 370, 371, 373, 374, 375, 376, 377, 378, 379, 380, 381, 383, 384, 386, 387, 388, 390, 392, 393, 394, 395, 396, 397, 398, 399, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 415, 418, 419, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 433, 435, 436, 437, 438, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 459, 460, 461, 462, 464, 467, 469, 470, 472, 474, 475, 476, 477, 478, 479, 480, 481, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 509, 510, 511, 513, 514, 517, 520, 521, 522, 523, 524, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 539, 540, 541, 542, 543, 544, 545, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 561, 563, 565, 568], [1, 10, 12, 14, 15, 17, 21, 31, 37, 45, 46, 64, 66, 71, 75, 78, 85, 89, 90, 102, 103, 107, 108, 118, 127, 132, 134, 140, 142, 157, 159, 165, 170, 172, 175, 179, 188, 194, 196, 205, 210, 211, 213, 224, 231, 233, 235, 239, 247, 249, 250, 263, 264, 268, 272, 283, 298, 301, 308, 310, 318, 319, 330, 337, 340, 345, 348, 350, 353, 354, 356, 358, 364, 372, 382, 385, 389, 391, 400, 401, 412, 413, 414, 416, 417, 420, 421, 432, 434, 439, 457, 458, 463, 465, 466, 468, 471, 473, 482, 500, 504, 512, 515, 516, 518, 519, 525, 538, 546, 560, 562, 564, 566, 567]), (Pipeline(steps=[('std', StandardScaler()),
                ('MLPClassifier', MLPClassifier(alpha=1, max_iter=1000))]), [0, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 16, 17, 19, 20, 21, 22, 24, 25, 26, 28, 29, 30, 33, 35, 37, 38, 39, 40, 42, 43, 45, 46, 47, 48, 49, 50, 51, 54, 55, 58, 59, 60, 61, 62, 65, 68, 71, 72, 73, 74, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 87, 88, 89, 90, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 161, 162, 163, 164, 165, 166, 167, 169, 170, 171, 172, 173, 174, 175, 177, 178, 179, 180, 181, 182, 183, 184, 185, 187, 188, 189, 190, 191, 193, 194, 195, 196, 197, 198, 199, 200, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 228, 229, 230, 231, 233, 234, 235, 236, 237, 238, 239, 241, 242, 243, 246, 247, 248, 249, 250, 252, 253, 254, 255, 257, 258, 260, 261, 262, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 278, 279, 281, 283, 284, 285, 286, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 299, 301, 302, 304, 306, 307, 308, 309, 310, 312, 313, 314, 316, 317, 318, 319, 320, 321, 322, 323, 325, 326, 327, 328, 329, 330, 331, 332, 333, 337, 338, 339, 340, 341, 343, 344, 345, 346, 348, 349, 350, 352, 353, 354, 355, 356, 357, 358, 361, 362, 363, 364, 365, 367, 368, 369, 370, 371, 372, 373, 375, 377, 378, 379, 380, 381, 382, 383, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 400, 401, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 416, 418, 419, 421, 424, 425, 429, 430, 431, 432, 433, 435, 437, 438, 439, 441, 442, 446, 447, 449, 450, 451, 453, 454, 455, 459, 462, 463, 464, 465, 466, 467, 470, 471, 472, 473, 474, 475, 477, 479, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 496, 497, 498, 499, 501, 502, 503, 505, 506, 507, 508, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 525, 527, 528, 529, 530, 531, 533, 535, 537, 538, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 556, 559, 561, 563, 564, 566, 568], [1, 4, 14, 18, 23, 27, 31, 32, 34, 36, 41, 44, 52, 53, 56, 57, 63, 64, 66, 67, 69, 70, 75, 86, 91, 107, 108, 124, 137, 160, 168, 176, 186, 192, 201, 202, 227, 232, 240, 244, 245, 251, 256, 259, 263, 277, 280, 282, 287, 298, 300, 303, 305, 311, 315, 324, 334, 335, 336, 342, 347, 351, 359, 360, 366, 374, 376, 384, 399, 402, 415, 417, 420, 422, 423, 426, 427, 428, 434, 436, 440, 443, 444, 445, 448, 452, 456, 457, 458, 460, 461, 468, 469, 476, 478, 480, 495, 500, 504, 509, 524, 526, 532, 534, 536, 539, 540, 555, 557, 558, 560, 562, 565, 567]))
satra commented 8 months ago

thanks @effigies for trying. i also don't know why we are seeing different outcomes. i'll dig in some more tomorrow morning, but you are getting the error that it couldn't find the results right? i get that too and then when i try to run the node, it crashes. i also don't know why that error surfaces only with 0.23. if you have a 0.22 + 0.6 that error doesn't surface. however, a lot has changed between 0.22 and 0.23.

effigies commented 8 months ago

you are getting the error that it couldn't find the results right?

Correct, but running node() in the debugger seems to work. So at a guess what's happening is that we're getting one hash when the node are run (with split state) and a different one after combining. Or possibly we are failing to perform a re-run that aggregates the results of the two sub-tasks before attempting to look up the aggregated result.

djarecka commented 8 months ago

@satra - I can try to reproduce and debug as well, but can you write your python version. and you're running this locally on your osx, right?

satra commented 8 months ago

i have tried with python 3.10 and 3.11 (same errors) on osx 14 (m1 chip).

djarecka commented 8 months ago

@satra - do you expect wf.fit_clf.lzout.model to be the same with each run?

satra commented 8 months ago

i don't think so. it's based on the clf_info (a split) and the split indices (a nested split).

djarecka commented 8 months ago

so it has a random component?

satra commented 8 months ago

I would say it has dynamic/generative components, more so that random. But there are random generators.

On Thu, Oct 26, 2023, 9:00 AM Dorota Jarecka @.***> wrote:

so it has a random component?

— Reply to this email directly, view it on GitHub https://github.com/nipype/pydra/issues/717#issuecomment-1781075846, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABM573JOPX7OULUYFVU4M3YBJNFFAVCNFSM6AAAAAA6K3V4QSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBRGA3TKOBUGY . You are receiving this because you were mentioned.Message ID: @.***>

djarecka commented 8 months ago

yes, of course, sorry, didn't think long enough about the workflow and your answer... for moment I got confused why my checksums keep changing...

effigies commented 8 months ago

Would it help debugging to add a random seed that could be fixed?

djarecka commented 7 months ago

I think it's not using random seed and yet still is changing

djarecka commented 7 months ago

@tclose - I've created this branch for testing: https://github.com/djarecka/pydra-ml/tree/newpydra_test

I removed some tasks that are not needed to get the error and also simplified the spliters part. I kept the commented part just so it's easy to see what has been removed from the original code. The error I'm getting is:

E           RuntimeError: Could not find results of 'feature_importance' node in a sub-directory named 'FunctionTask_3384430a0229b2b6770771fae01902aa' in any of the cache locations:

I removed the connections to the following nodes, but I kept ("feature_importance", wf.feature_importance.lzout.feature_importance), from the workflow output to have the error, since this is a problem after the task is run, when the results have to be collected.

You can see that that this task changes hash not only between the time it is run and the results have to be collected, but it is different every time the workflow is run. I understand that since random_state is set to 0 this should not be the case.

djarecka commented 7 months ago

@tclose , @satra : it looks like everything is fine if I remove from the pipeline ["sklearn.impute", "SimpleImputer"],, i.e. remove this line Any idea why this element of the pipeline can cause the problem?

satra commented 7 months ago

it's perhaps a function of how it gets checksummed. a Pipeline object is a composable object. but pydra has no direct control of what that pipeline object looks like. that's completely a user specification. but i don't know why and it wouldn't be a solution for this particular problem. i'm assuming you are testing in new cache locations each time. you could also try sending a deepcopy of the Pipeline object in case somehow that object is not threadsafe for some reason.

djarecka commented 7 months ago

yes, I'm testing in new locations.

but just to be clear, I still keep other things in the pipeline... At the beginning I thought that the problem is just with Pipeline object, but since it sometimes worked I just started removing some elements...

I'm wondering what should be the best solution for hashing the pipeline?

tclose commented 4 months ago

Just picking this up now, what do these Pipeline objects look like @satra? When you say that they are supplied by the user, are they completely arbitrary? Where do the pipelines in the test come from?

If they are arbitrary and have stochastic elements then there isn't much that we can do is there? We need the user to supply a hasher that can pick out the stable attributes and make a hash from them, don't we?

satra commented 4 months ago

@tclose - the original hash (sha256(str(obj).encode()).hexdigest()) should work on Pipeline as the string representation of a Pipeline is quite deterministic. see: https://github.com/nipype/pydra/issues/717#issuecomment-1774109928

hence the question of how to associate that hashing function with the Pipeline object. even @djarecka's experiments could be subsumed if i could associate that hash and in changes in imputer objects should not have an effect. but in general it doesn't seem to be calling the registered function whenever a Pipeline object is present.

satra commented 4 months ago

more generally, debugging improvements could help here.

tclose commented 4 months ago

more generally, debugging improvements could help here.

I have implemented debugging improvements on this issue in #698, which detect that the hash actually changes during the execution of the task. I dug into it with the debugger and can confirm that your Pipeline bytes_repr method is being used @satra.

I have narrowed things down a bit and it is a bit strange. It turns out that the hash of a Pipeline with a SimpleImputer step isn't stable between deepcopies of the Pipeline object, i.e. hash_function(pipeline) != hash_function(deepcopy(pipeline)). This is despite cloudpickle being used to create the hashes. Further to this pipeline != deepcopy(pipeline), which seems a little unfair.

When a task is run a deepcopy of the inputs is stored away to guard against inner states of objects being updated while the task runs, however, this assumes that the hash is stable across deepcopies. What would cause a deepcopy to be different from the original?... (will keep digging)

tclose commented 4 months ago

more generally, debugging improvements could help here.

I have implemented debugging improvements on this issue in #698, which detect that the hash actually changes during the execution of the task. I dug into it with the debugger and can confirm that your Pipeline bytes_repr method is being used @satra.

I have narrowed things down a bit and it is a bit strange. It turns out that the hash of a Pipeline with a SimpleImputer step isn't stable between deepcopies of the Pipeline object, i.e. hash_function(pipeline) != hash_function(deepcopy(pipeline)). This is despite cloudpickle being used to create the hashes. Further to this pipeline != deepcopy(pipeline), which seems a little unfair.

When a task is run a deepcopy of the inputs is stored away to guard against inner states of objects being updated while the task runs, however, this assumes that the hash is stable across deepcopies. What would cause a deepcopy to be different from the original?... (will keep digging)

Actually, the deepcopy inequality is just because the Imputer classes don't implement an __eq__ method. Not sure why the cloudpickle would be different though, I was thinking there might a function being set as an attribute somewhere but couldn't find it in the scikit-learn code anywhere.

To work with these "poorly behaved" types in general, perhaps what we should do is replace the inputs with the deepcopied version for the task run, and then replace them with the original afterwards. We could also perhaps check to see if the deepcopied version doesn't equal the original and raise a warning.

tclose commented 4 months ago

In that branch I have changed the way that TaskBase._modify_inputs() is defined so the actual original inputs are returned instead of a deepcopy and that resolves the initial problem you ran into. However, there is another deepcopy in TaskBase.checksum_states() which causes problems with a downstream node, and it is a bit too late at night here to start trying to unpick it. Can we get away with using the original objects instead of a deepcopy of them?

See https://github.com/nipype/pydra/blob/1720ba614e640d02f851cc711586f29ee4c7f40f/pydra/engine/core.py#L271-L314

tclose commented 4 months ago

Actually, couldn't help myself and had another look and if I'm reading it right I think a straight copy should be sufficient instead of a deepcopy. So I have pushed those changes to my branch and the test seems to get past that point and error somewhere else

satra commented 4 months ago

@tclose - thanks so much for digging into this. setting the deepcopy issue aside, if you use the original hasher sha256(str(obj).encode()).hexdigest() instead of the cloudpickle one, does it work. i'm hoping str(deepcopy(obj)) == str(obj) in this case. and this would be a situation where the repr of the Pipeline object is sufficient for a check.

to the question of deepcopies inside pydra, we did not want the functions wrapped by tasks changing inputs, which many functions are prone to doing, especially for mutable things like dictionaries and lists. and since split/combine also operate on the inputs and could get arbitrary inputs and functions, deepcopies seemed safest to prevent the mutation. i.e. functions can do whatever they want, but we send into a function cannot be mutated in pydra.

tclose commented 4 months ago

@tclose - thanks so much for digging into this. setting the deepcopy issue aside, if you use the original hasher sha256(str(obj).encode()).hexdigest() instead of the cloudpickle one, does it work. i'm hoping str(deepcopy(obj)) == str(obj) in this case. and this would be a situation where the repr of the Pipeline object is sufficient for a check.

to the question of deepcopies inside pydra, we did not want the functions wrapped by tasks changing inputs, which many functions are prone to doing, especially for mutable things like dictionaries and lists. and since split/combine also operate on the inputs and could get arbitrary inputs and functions, deepcopies seemed safest to prevent the mutation. i.e. functions can do whatever they want, but we send into a function cannot be mutated in pydra.

Hi @satra, I have done that and created a PR onto your PR https://github.com/nipype/pydra-ml/pull/61. Both tests pass now. Note that the tests also pass using the cloudpickle bytes_repr if you use my https://github.com/nipype/pydra/pull/698, Pydra branch once you add the other change in that PR.

satra commented 4 months ago

thanks @tclose for helping debug this.

satra commented 4 months ago

regarding the deepcopy changes, i'll leave it to you and other developers to review and add.

djarecka commented 4 months ago

thanks @tclose for finding and fixing the issue! the deepcopy issue is concerning...