PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.67k stars 1.65k forks source link

inconsistent hashing with more than one object #16185

Open onlyjsmith opened 1 day ago

onlyjsmith commented 1 day ago

Bug summary

EDIT: should point out that I got to this after finding the Prefect cache wasn't being hit when I thought it should

import pandas as pd
from prefect.utilities.hashing import hash_objects
from pydantic import BaseModel

class Config(BaseModel):
    active: bool = False

config = Config()
data = pd.DataFrame({"value": [True]})

print(f"data                 : {hash_objects(data)}")
print(f"config               : {hash_objects(config)}")
print(f"data, config         : {hash_objects(data, config)}")

Version info

Version:             3.1.5
API version:         0.8.4
Python version:      3.12.2
Git commit:          3c06654e
Built:               Mon, Dec 2, 2024 6:57 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.10.2

Additional context

SCR-20241203-mwyr-2
zzstoatzz commented 1 day ago

hi @onlyjsmith - thank you for the issue! interesting, will take a look

zzstoatzz commented 1 day ago

just to document the process, i'm noticing that the hash of multiple objects is stable within the same process, but not between

```python # /// script # dependencies = ["pandas", "prefect"] # /// import pathlib import pandas as pd from pydantic import BaseModel from prefect.utilities.hashing import hash_objects class Config(BaseModel): active: bool = False config = Config() data = pd.DataFrame({"value": [True]}) w = 20 # Save the first hash first_combined_hash = hash_objects(data, config) assert first_combined_hash is not None print("\nWithin same run:") print(f"{'data':<{w}}: {hash_objects(data)}") print(f"{'config':<{w}}: {hash_objects(config)}") print(f"{'data, config (1)':<{w}}: {first_combined_hash}") print(f"{'data, config (2)':<{w}}: {hash_objects(data, config)}") # This will pass because it's within the same run assert hash_objects(data, config) == hash_objects(data, config) hash_file = pathlib.Path("previous_hash.txt") if hash_file.exists(): previous_hash = hash_file.read_text().strip() print(f"\nPrevious run's hash : {previous_hash}") print(f"This run's hash : {first_combined_hash}") if previous_hash != first_combined_hash: print("❌ Hashes differ between runs!") else: print("✅ Hashes match between runs!") hash_file.write_text(first_combined_hash) ```
zzstoatzz commented 1 day ago

@onlyjsmith ok!

The issue appears to occur because DataFrame objects can't be directly JSON serialized, causing the hash to fall back to cloudpickle which includes non-deterministic elements between runs.

in the short term, you can use the to_dict method to dump the df:

data_dict = data.to_dict(orient="split")
combined_hash = hash_objects(data_dict, config)

I'll look more into what exactly is changing between runs here

repro ```python # /// script # dependencies = ["pandas", "prefect"] # /// import pathlib import sys import cloudpickle import pandas as pd from pydantic import BaseModel from prefect.utilities.hashing import JSONSerializer, hash_objects class Config(BaseModel): active: bool = False config = Config() data = pd.DataFrame({"value": [True]}) hash_file = pathlib.Path("previous_hash.txt") # Check for --reset flag if len(sys.argv) > 1 and sys.argv[1] == "--reset": if hash_file.exists(): hash_file.unlink() print("Previous hash file deleted.") # Let's test each serialization method directly print("\nTesting serialization methods:") try: serializer = JSONSerializer(dumps_kwargs={"sort_keys": True}) json_bytes = serializer.dumps((data, config)) print("✅ JSON serialization succeeded") except Exception as e: print(f"❌ JSON serialization failed: {e}") try: pickle_bytes = cloudpickle.dumps((data, config)) print("✅ Cloudpickle serialization succeeded") except Exception as e: print(f"❌ Cloudpickle serialization failed: {e}") w = 20 # Convert DataFrame to a deterministic format before hashing data_dict = data.to_dict(orient="split") first_combined_hash = hash_objects(data_dict, config) assert first_combined_hash is not None print("\nWithin same run:") print(f"{'data':<{w}}: {hash_objects(data)}") print(f"{'config':<{w}}: {hash_objects(config)}") print(f"{'data, config (1)':<{w}}: {first_combined_hash}") print(f"{'data, config (2)':<{w}}: {hash_objects(data, config)}") # This will pass because it's within the same run assert hash_objects(data, config) == hash_objects(data, config) if hash_file.exists(): previous_hash = hash_file.read_text().strip() print(f"\nPrevious run's hash : {previous_hash}") print(f"This run's hash : {first_combined_hash}") if previous_hash != first_combined_hash: print("❌ Hashes differ between runs!") else: print("✅ Hashes match between runs!") hash_file.write_text(first_combined_hash) ``` ```python » uv run repros/16185.py Reading inline script metadata from `repros/16185.py` Testing serialization methods: ❌ JSON serialization failed: Unable to serialize unknown type: ✅ Cloudpickle serialization succeeded Within same run: data : 7bb5352e4148cf86417c819f588eb33e config : 74053ae609dd793e29f12548541a58ef data, config (1) : 8fdf53aec8d6839e51ef9cb8ac1c22af data, config (2) : 1e41378dbf1e4b949458487b5bf6bd60 Previous run's hash : 8fdf53aec8d6839e51ef9cb8ac1c22af This run's hash : 8fdf53aec8d6839e51ef9cb8ac1c22af ✅ Hashes match between runs! ```

these docs may be useful to you

onlyjsmith commented 8 hours ago

@zzstoatzz thanks for the rapid reply and for digging in. The docs link is helpful, but can I check if using a DataFrame as a cache key would be viewed as a good approach (if it was reliable)?

We take as input a geospatial polygon (as a GeoPandas GeoDataFrame) and use it to calculate a deterministic set of values for that area. The simplest would be to just pass the GeoDataFrame into the function and have it hit the cache if it can.

zzstoatzz commented 51 minutes ago

can I check if using a DataFrame as a cache key would be viewed as a good approach (if it was reliable)?

I think it depends on when you want to invalidate cache. The simplest strategy in my mind would be something like the to_dict snippet above, such that if the dataframe coming in was different than last time, that data_dict would also be different, i.e. dataframes with novel values would invalidate your cache