pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
19 stars 3 forks source link

Add validation tasks to a Stage which get access to multiple schemas at once (last committed schema of this instance or another instance for this Stage) #181

Closed windiana42 closed 5 months ago

windiana42 commented 5 months ago

Before committing a stage, due to schema swapping, we are in a good position to compare the current prepared schema with the last committed stage version of this pipeline instance or another pipeline instance.

Interface options are numerous. It could look like this:

@second_stage_version(input_type=sa.Table)
def my_validator(tbls_transaction_schema: dict[str, sa.Alias], tbls_compare_schema: dict[str, sa.Alias]):
  # has access to autoloaded tables of both schemas
  return dag.Table(compare_result_table, name="_cmp_%%")

with Stage("stage_2"):
    lazy_2 = lazy_task_2(lazy_1, b)
    lazy_3 = lazy_task_3(lazy_2)
    eager = eager_task(lazy_1, b)
    cmp3 = my_validator(lazy_3)  # we could limit the dictionaries to only include tables referenced in arguments
    post_cmp3 = post_process(cmp3)  # this could be a normal task
    my_validator()  # by default, all tables of both schemas are reflected so the validator could also report about which tables are missing or unexpected

We can define that in case of @second_stage_version(input_type=None), the decorated function only receives schema names as values in the dictionary:

def get_store():
  return ConfigContext.get().store.table_store

@second_stage_version(input_type=None)
def my_validator(tbls_transaction_schema: dict[str, str], tbls_compare_schema: dict[str, str]):
  store = get_store()
  for table_name in tbls_compare_schema.keys():
    schema1 = tbls_transaction_schema[table_name]
    schema2 = tbls_compare_schema[table_name]
    assert store.execute(f"""SELECT (SELECT count(*) FROM "{schema1}"."{table_name}") - (SELECT count(*) FROM "{schema2}"."{table_name}") as diff""").fetchone()[0] == 0

Some users might even want to prevent the screening of tables in schema altogether. In case the user provides explicit table names, they could still be provided as dict[table_name, schema_name]. However, when no tables are provided, we would simply just have the schema names:

@second_stage_version(input_type=None, disable_reflect_schema=True)
def my_validator(tbls_transaction_schema: str, tbls_compare_schema: str):

It is easy to define syntactic sugar for engine access in user space:

def get_engine():
  return ConfigContext.get().store.table_store.engine

In case we want to compare another instance_id, we need to consider the situation where both instances can only be accessed via separate ConfigContext objects or engines. Another problem is that a ConfigContext does not include any reference to the underlying PipedagConfig. Thus we could move this setup to wiring code. At runtime we would pass on the alternative ConfigContext as well:

@second_stage_version(input_type=None)
def my_validator(tbls_transaction_schema: dict[str, str], tbls_compare_schema: dict[str, str], cmp_ctx: ConfigContext):
  store1 = get_store()
  store2 = cmp_txt.store.table_store
  for table_name in tbls_compare_schema.keys():
    schema1 = tbls_transaction_schema[table_name]
    schema2 = tbls_compare_schema[table_name]
    assert store1.execute(f"""SELECT count(*) FROM "{schema1}"."{table_name}" as c""").fetchone()[0] == store2.execute(f"""SELECT count(*) FROM "{schema2}"."{table_name}" as c""").fetchone()[0]

pipedag_config = PipedagConfig.default
cmp_cfg = pipedag_config.get(cmp_instance_id)
with Flow() as f:
  with Stage("stage_2"):
      lazy_2 = lazy_task_2(lazy_1, b)
      lazy_3 = lazy_task_3(lazy_2)
      eager = eager_task(lazy_1, b)
      cmp3 = my_validator(lazy_3,cmp_cfg)  # we could limit the dictionaries to only include only tables referenced in arguments
      post_cmp3 = post_process(cmp3)  # this could be a normal task
      my_validator(cmp_cfg)  # by default, all tables of both schemas are reflected so the validator could also report about which tables are missing or unexpected
f.run(config=pipedag_config.get(instance_id))

Pipedag would allow zero or one ConfigContext objects within args or kwargs to any stage-validator task. If one is passed, it would use it for listing and autoloading Tables of the compare schema and it would add it as additional parameter to the task.

@second_stage_version can be used for building validation tasks before schema swapping. They could also be used for copying data from one instance to another. This is useful to base filtered mini/midi pipelines on a full pipeline to keep them in-sync and to avoid having to implement all filtering techniques for all kind of sourcing techniques. Furthermore, one might think about what to do with blobs. It would be nice to integrate them. But if they complicate the task interface significantly, it might be nicer to only do this when explicitly activated:

@second_stage_version(input_type=None, add_blobs=True)
def my_validator(tbls_transaction_schema: dict[str, str], transaction_blobs: dict[str, Any], tbls_compare_schema: dict[str, str], compare_blobs: dict[str, Any]):

Currently, blob stores don't support discovery of all blobs within stage. We can also say blobs are only added whenever a blob reference is passed to a stage validator:

@second_stage_version(input_type=None, disable_reflect_schema=True)
def my_validator(tbls_transaction_schema: str, transaction_blobs: dict[str, Any], tbls_compare_schema: dict[str, str], compare_blobs: dict[str, Any]):
  assert transaction_blobs == compare_blobs  # assuming deep comparison

with Stage("stage_2"):
    blob = blob_taskl()
    cmp = my_validator(blob)  # blob reference is explicitly passed
windiana42 commented 5 months ago

https://github.com/Quantco/datajudge might be a package that nicely fits with the features of this PR

windiana42 commented 5 months ago

@nicolasmueller @NMAC427 @DominikZuercherQC any feedback is welcome. Especially regarding firstly, the decorator name @stage_validator since it can also be used for filtered copying from another instance, and secondly the use of dictionaries even in the case of just referencing one table: my_validator(tbl1) -> my_validator({tbl1.name:schema1}, {tbl1.name:schema2}).

windiana42 commented 5 months ago

In the internal naming contest, the name @second_stage_version won over the name @stage_validator (second came @second_source). The reason is that this decorator can also be nicely used for filtered copying from one pipeline instance to another. Furthermore, we had trouble with @external_input because it suggested a relation to the external in "ExternalTableReference". We also had trouble with @second_input because this sounded too narrow (rather table level than stage level). We thought about putting stage in the name but @second_stage sounds like different stage within same flow. A downside of @second_stage_version is that it sounds similar to @materialize(version=) parameter. Furthermore, it sounds like one could freely pick a version. However, if we implement multiple cache slots in the future and allow pinning some cache slots with a name (similar to the active cache slot that people can use for exploration), then picking a pinned version is in deed possible.

windiana42 commented 5 months ago

It is useful that there is a natural ordering of @materialize tasks before any @second_stage_version tasks. This ordering should be superseded by explicit dependencies transported via arguments. With this in mind, there might be generally the wish to schedule a task that is executed at the end of a stage. This could be ensured with a with GroupNode() block that can be configured to be a "Barrier" and thus ensures all tasks before the block must be completed before the first task within this group starts as well as all tasks within must be finished before any task afterwards continues:

@materialize
def _cleanup_schema(s: Stage):
  store = ConfigContext.get().store.table_store
  schema = store.get_schema(s.current_name)
  store.execute(f"""DROP TABLE "{schema}"."_tmp_table" """)
  # engine = store.engine is also available

def cleanup_schema(s: Stage):
  with GroupNode(ordering_barrier=True):
    _cleanup_schema(s)

with Stage("s") as s:
  a = normal_task()
  cleanup_schema(s)

This feature can be combined with #180 such that GroupNode can be configured to act as order defining barrier as well as drawing a box in visualization with configurable style.

windiana42 commented 5 months ago

Combined with #103, we could enable feeding reflected tables or just schema name into a cleanup @materialize task as follows:

@materialize(input_type=sa.Table)
def _check_schema(tbls: dict[str, sa.Alias]):
  # check tables
  assert list(sorted(tbls.keys())) == ["a"]
  # check columns
  assert [c.name for c in tbls["a"].c] == ["x", "y", "z"]

def check_schema(tbls):
  with GroupNode(ordering_barrier=True):
    _check_schemas(tbls)

with Stage("s") as s:
  a = normal_task()
  cleanup_schema(s.tbls)

Similar but smaller syntactic sugar might be Stage.schema returning a lazy schema reference:

class Stage:
  @property
  def schema():
    @materialize
    def get_schema(stage: Stage):
      return ConfigContext.get().store.table_store.get_schema(stage.current_name)
    return get_schema(self)

We might need to adapt our JSON (de)serializer to support Schema classes.

windiana42 commented 5 months ago

103 could also be combined with @second_stage_version tasks:

@second_stage_version(input_type=sa.Table)
def check_produced_tables_with_other_schema(my_tbls, cmp_tbls)
  # receives the same tables from both schemas and not all tables that exist in cmp_schema

with Stage("s") as s:
  a = normal_task()
  check_produced_tables_with_other_schema(s.tbls)
windiana42 commented 5 months ago

For @materialize, I like that it is a verb that I want to be applied to my task function. "Materialize my task". Thus @input_stage_versions could be a verb that tells: "input stage versions to my task".

windiana42 commented 5 months ago

Implemented by #188