A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
Since we changed output_json to CLOB column in DB2, it is not allowed to order by it any more:
result = conn.execute(
sa.select(self.tasks_table.c.output_json)
.where(self.tasks_table.c.stage == stage.name)
.where(self.tasks_table.c.in_transactionschema.in([False]))
.order_by(self.tasks_table.c.output_json)
).all()
It might make sense to also add a test for get_stage_hash() since we use it actively in projects when we feed one pipeline from the input layer of another to have better control over filtering code.
The problem to be solved is that we need a stable hash over output_json results independent of row order returned by the query. We can either change the ordering to use our structure hashes. Or we can simply do the sorting in pandas.
Since we changed output_json to CLOB column in DB2, it is not allowed to order by it any more: result = conn.execute( sa.select(self.tasks_table.c.output_json) .where(self.tasks_table.c.stage == stage.name) .where(self.tasks_table.c.in_transactionschema.in([False])) .order_by(self.tasks_table.c.output_json) ).all()
It might make sense to also add a test for get_stage_hash() since we use it actively in projects when we feed one pipeline from the input layer of another to have better control over filtering code.
The problem to be solved is that we need a stable hash over output_json results independent of row order returned by the query. We can either change the ordering to use our structure hashes. Or we can simply do the sorting in pandas.