pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

Test _trigger_deferred_table_store_ops branch `started_ops_end = max(...)+1` #59

Closed windiana42 closed 1 year ago

windiana42 commented 1 year ago

In general, we can expand testing for deferred table store ops. We test with real project code. But this should also be tested with unit tests in this repo:

    def _trigger_deferred_table_store_ops(self, stage_id: int):
        if (
            stage_id in self._deferred_table_store_ops
            and len(self._deferred_table_store_ops[stage_id]) > 0
        ):
            if len(self._deferred_table_store_op_threads.get(stage_id, {})) == 0:
                self._deferred_table_store_op_threads[stage_id] = {}
                started_ops_end = 0
            else:
                started_ops_end = max(
                    self._deferred_table_store_op_threads[stage_id].keys()
                ) + 1

vs.

    def _trigger_deferred_table_store_ops(self, stage_id: int):
        if (
            stage_id in self._deferred_table_store_ops
            and len(self._deferred_table_store_ops[stage_id]) > 0
        ):
            if len(self._deferred_table_store_op_threads.get(stage_id, {})) == 0:
                self._deferred_table_store_op_threads[stage_id] = {}
                started_ops_end = 0
            else:
                started_ops_end = max(
                    self._deferred_table_store_op_threads[stage_id].keys()
                )
NMAC427 commented 1 year ago

Resolved by #84