pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
19 stars 3 forks source link

Better error message when executing subflow and input tasks not found in cache #187

Open windiana42 opened 5 months ago

windiana42 commented 5 months ago

When we just execute a stage or single tasks as subflow, we need to find all inputs in cache. It should be logged on which cached inputs we depend and the error message should clarify the problem and help to fix it.

sequential.py:28

                        results[task] = task.run(
                            inputs={
                                in_id: results[in_t]
                                for in_id, in_t in task.input_tasks.items()
                                if in_t in results
                            },
                            run_context=run_context,
                            config_context=config_context,
                        )

Whenever the if in_t in results filters an entry from task.input_tasks. I suggest a logger.info() message informing that we depend on those caches for specific input tasks. Then, I suggest to catch the CacheError exception here and reraise it with a nicer message saying that the error is most likely caused by just executing as subflow without having executed all tasks which are needed as inputs repeating the list of tasks that might be to blame. I would also suggest to rerun the full flow once before continuing to develop with subflows again.

A typical error is that stages are reordered but the development code running individual stages is not up-to-date.