DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage/tracing and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.88k stars 125 forks source link

Caching a polars dataframe into parquet fails #1240

Open poldpold opened 1 day ago

poldpold commented 1 day ago

Current behavior

When trying to cache a node whose output is a polars DataFrame, an exception is raised.

Stack Traces

********************************************************************************
>[post-node-execute] hello [test_module.hello()] encountered an error          <
> Node inputs:
{}
********************************************************************************
Traceback (most recent call last):
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py", line 318, in execute_lifecycle_for_node
    __adapter.call_all_lifecycle_hooks_sync(
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py", line 915, in call_all_lifecycle_hooks_sync
    getattr(adapter, hook_name)(**kwargs)
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py", line 1446, in post_node_execute
    self.result_store.set(
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py", line 71, in set
    saver = saver_cls(path=str(materialized_path.absolute()))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'
-------------------------------------------------------------------
{
    "name": "TypeError",
    "message": "PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'",
    "stack": "---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 dr.execute(final_vars=[\"hello\"])

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:637, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
    635     error_execution = e
    636     error_telemetry = telemetry.sanitize_error(*sys.exc_info())
--> 637     raise e
    638 finally:
    639     if self.adapter.does_hook(\"post_graph_execute\", is_async=False):

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:623, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
    614     self.adapter.call_all_lifecycle_hooks_sync(
    615         \"pre_graph_execute\",
    616         run_id=run_id,
   (...)
    620         overrides=overrides,
    621     )
    622 try:
--> 623     outputs = self.__raw_execute(
    624         _final_vars, overrides, display_graph, inputs=inputs, _run_id=run_id
    625     )
    626     if self.adapter.does_method(\"do_build_result\", is_async=False):
    627         # Build the result if we have a result builder
    628         outputs = self.adapter.call_lifecycle_method_sync(
    629             \"do_build_result\", outputs=outputs
    630         )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:834, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
    832     return results
    833 except Exception as e:
--> 834     raise e

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:825, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
    823 results = None
    824 try:
--> 825     results = self.graph_executor.execute(
    826         function_graph,
    827         final_vars,
    828         overrides if overrides is not None else {},
    829         inputs if inputs is not None else {},
    830         run_id,
    831     )
    832     return results
    833 except Exception as e:

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:175, in DefaultGraphExecutor.execute(self, fg, final_vars, overrides, inputs, run_id)
    173 memoized_computation = dict()  # memoized storage
    174 nodes = [fg.nodes[node_name] for node_name in final_vars if node_name in fg.nodes]
--> 175 fg.execute(nodes, memoized_computation, overrides, inputs, run_id=run_id)
    176 outputs = {
    177     # we do this here to enable inputs to also be used as outputs
    178     # putting inputs into memoized before execution doesn't work due to some graphadapter assumptions.
    179     final_var: memoized_computation.get(final_var, inputs.get(final_var))
    180     for final_var in final_vars
    181 }  # only want request variables in df.
    182 del memoized_computation  # trying to cleanup some memory

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/graph.py:1099, in FunctionGraph.execute(self, nodes, computed, overrides, inputs, run_id)
   1097     run_id = str(uuid.uuid4())
   1098 inputs = graph_functions.combine_config_and_inputs(self.config, inputs)
-> 1099 return graph_functions.execute_subdag(
   1100     nodes=nodes,
   1101     inputs=inputs,
   1102     adapter=self.adapter,
   1103     computed=computed,
   1104     overrides=overrides,
   1105     run_id=run_id,
   1106 )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:250, in execute_subdag(nodes, inputs, adapter, computed, overrides, run_id, task_id)
    247     if final_var_node.user_defined:
    248         # from the top level, we don't know if this UserInput is required. So mark as optional.
    249         dep_type = node.DependencyType.OPTIONAL
--> 250     dfs_traverse(final_var_node, dep_type)
    251 return computed

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:222, in execute_subdag.<locals>.dfs_traverse(node_, dependency_type)
    215         result = adapter.call_lifecycle_method_sync(
    216             \"do_remote_execute\",
    217             node=node_,
    218             execute_lifecycle_for_node=execute_lifecycle_for_node_partial,
    219             **kwargs,
    220         )
    221     else:
--> 222         result = execute_lifecycle_for_node_partial(**kwargs)
    224 computed[node_.name] = result
    225 # > pruning the graph
    226 # This doesn't narrow it down to the entire space of the graph
    227 # E.G. if something is not needed by this current execution due to
    228 # the selection of nodes to run it might not prune everything.
    229 # to do this we'd need to first determine all nodes on the path, then prune
    230 # We may also want to use a reference counter for slightly cleaner/more efficient memory management

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:318, in execute_lifecycle_for_node(__node_, __adapter, __run_id, __task_id, **__kwargs)
    314 if not pre_node_execute_errored and __adapter.does_hook(
    315     \"post_node_execute\", is_async=False
    316 ):
    317     try:
--> 318         __adapter.call_all_lifecycle_hooks_sync(
    319             \"post_node_execute\",
    320             run_id=__run_id,
    321             node_=__node_,
    322             kwargs=__kwargs,
    323             success=success,
    324             error=error,
    325             result=result,
    326             task_id=__task_id,
    327         )
    328     except Exception:
    329         message = create_error_message(__kwargs, __node_, \"[post-node-execute]\")

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py:915, in LifecycleAdapterSet.call_all_lifecycle_hooks_sync(self, hook_name, **kwargs)
    909 \"\"\"Calls all the lifecycle hooks in this group, by hook name (stage)
    910 
    911 :param hook_name: Name of the hooks to call
    912 :param kwargs: Keyword arguments to pass into the hook
    913 \"\"\"
    914 for adapter in self.sync_hooks.get(hook_name, []):
--> 915     getattr(adapter, hook_name)(**kwargs)

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py:1446, in HamiltonCacheAdapter.post_node_execute(self, run_id, node_, result, success, error, task_id, **future_kwargs)
   1444 result_missing = not self.result_store.exists(data_version)
   1445 if result_missing or materialized_path_missing:
-> 1446     self.result_store.set(
   1447         data_version=data_version,
   1448         result=result,
   1449         saver_cls=saver_cls,
   1450         loader_cls=loader_cls,
   1451     )
   1452     self._log_event(
   1453         run_id=run_id,
   1454         node_name=node_name,
   (...)
   1458         value=data_version,
   1459     )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py:71, in FileResultStore.set(self, data_version, result, saver_cls, loader_cls)
     68 if saver_cls is not None:
     69     # materialized_path
     70     materialized_path = self._materialized_path(data_version, saver_cls)
---> 71     saver = saver_cls(path=str(materialized_path.absolute()))
     72     loader = loader_cls(path=str(materialized_path.absolute()))
     73 else:

TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'"
}

Steps to replicate behavior

Write and run a jupyter notebook with the following cells:

%load_ext hamilton.plugins.jupyter_magic
%%cell_to_module -m test_module --display --rebuild-drivers

import polars as pl
from hamilton.function_modifiers import cache

@cache(format="parquet")
def hello() -> pl.DataFrame:
    return pl.DataFrame({"a": [1,2]})
from hamilton import driver
import test_module

dr = (
    driver
    .Builder()
    .with_config({})
    .with_modules(test_module)
    .with_cache(path=".")
    .build()
)
dr.execute(final_vars=["hello"])

Library & System Information

python=3.11.8, sf-hamilton=1.81.0 and 1.83.2, polars=1.10.0

Expected behavior

I expected the node output to be persisted to disk in a parquet format.

skrawcz commented 1 day ago

@poldpold thanks for the issue! Will take a look.

skrawcz commented 1 day ago

@poldpold if you could try installing my fix and giving it a go please: Assuming you use SSH: pip install "git+ssh://git@github.com/dagworks-inc/hamilton.git@fix_1240"

poldpold commented 10 hours ago

@skrawcz, this works great, thank you! Looking forward to seeing it in an upcoming release!

skrawcz commented 1 hour ago

@skrawcz, this works great, thank you! Looking forward to seeing it in an upcoming release!

it will be out this week.