Urban-Analytics-Technology-Platform / popgetter

https://popgetter.readthedocs.io/en/latest/
Apache License 2.0
5 stars 1 forks source link

Create centralised way to publish production versions of data to Azure #123

Open penelopeysm opened 1 month ago

penelopeysm commented 1 month ago

Right now, our production data is in the Azure blob storage container https://popgetter.blob.core.windows.net/popgetter-dagster-test/test_2 and one of us will populate this by setting ENV=prod and running all the Dagster pipelines locally :)

I think it's useful to have a single, centralised, way to generate all production data and upload it to another Azure blob storage container (that has a less testy name :-)). There are several benefits of this:

  1. Reproducibility — It is clear which data is being uploaded and how it is being generated.
  2. Handles top level countries.txt file cleanly — The CLI uses this file to determine which countries are present as it cannot traverse the Azure directory structure. Right now the file is being manually generated, which can easily lead to inconsistencies between what it says and the actual data that is tehre
  3. Statelessness — The pipeline should wipe the entire blob storage container before re-uploading everything. That way we don't end up with some data updated and others not (which would be bad if e.g. the metadata schema is changed).
  4. Continuous deployment — The pipeline can be automatically triggered by new versions/releases on GitHub.

I can throw together a quick Dockerfile for this and maybe investigate running this on GitHub Actions / Azure!

GHA has usage limits (https://docs.github.com/en/actions/learn-github-actions/usage-limits-billing-and-administration); in particular, "each job in a workflow can run for up to 6 hours of execution time" so it is not a deployment method that will scale well if we have many countries to run. I think for what we have now (BE + NI) it is still workable.

penelopeysm commented 1 month ago

Installing fiona on a Docker image on an M1 Mac is not fun 🫠

Edit: apt-get update && apt-get install -y libgdal-dev g++ takes care of the fiona dependencies

penelopeysm commented 1 month ago

Next problem: From the CLI the way to run all partitions of a job is to do dagster job backfill -j <job_name> --noprompt, but:

Not sure how to get the sensors to run. I got them to run by doing dagster-daemon run after the assets have been materialised, and that's basically the same as doing dagster dev offline, but this is finicky because there's no indication when the sensors have finished running. It doesn't seem like there's a dagster sensor run command, you can only turn them on/off and then the daemon is the one that's actually responsible for running them.

Oh, if that wasn't complicated enough: dagster job runs stuff in the background, so there's no obvious way of telling when it's done, too.

I don't know if it is possible and/or makes more sense to trigger the asset materialisations using Python code. If this is the case, we could in theory parse the DAG, inspect the asset to check whether it's partitioned or not, and then run it accordingly. This seems like something that would be really useful upstream, but I think it is also a lot of work.

penelopeysm commented 4 weeks ago

Code to run a job without the issue above!!!!!!

# import the top level defs thing from popgetter
from . import defs
import time
from dagster import materialize, DagsterInstance, DynamicPartitionsDefinition

job_name = "job"   # Replace with whatever
job = defs.get_job_def(job_name)

# Required for persisting outputs in $DAGSTER_HOME/storage
instance = DagsterInstance.get()

dependency_list = job._graph_def._dependencies

all_assets = {node_handle.name: definition
              for node_handle, definition in
              job._asset_layer.assets_defs_by_node_handle.items()}

def find_materialisable_asset_names(dep_list, done_asset_names) -> set[str]:
    materialisable_asset_names = set()

    for asset, dep_dict in dep_list.items():
        if asset.name in done_asset_names:
            continue

        if all(dep.node in done_asset_names for dep in dep_dict.values()):
            materialisable_asset_names.add(asset.name)

    return materialisable_asset_names

print("--------------------")
materialised_asset_names = set()
while len(materialised_asset_names) < len(all_assets):
    time.sleep(0.5)
    asset_names_to_materialise = find_materialisable_asset_names(dependency_list, materialised_asset_names)

    if len(asset_names_to_materialise) == 0:
        print("No more assets to materialise")
        break

    asset_name_to_materialise = asset_names_to_materialise.pop()
    asset_to_materialise = all_assets.get(asset_name_to_materialise)

    print(f"Materialising: {asset_name_to_materialise}")

    partitions_def = asset_to_materialise.partitions_def

    # https://docs.dagster.io/_apidocs/execution#dagster.materialize -- note
    # that the `assets` keyword argument needs to include upstream assets as
    # well. We use `selection` to specify the asset that is actually being
    # materialised.
    if partitions_def is None:
        # Unpartitioned
        materialize(assets=[asset_to_materialise,
                            *(all_assets.get(k) for k in materialised_asset_names)],
                    selection=[asset_to_materialise],
                    instance=instance)
        materialised_asset_names.add(asset_name_to_materialise)

    else:
        # Partitioned
        if type(partitions_def) != DynamicPartitionsDefinition:
            raise NotImplementedError("Non-dynamic partitions not implemented yet")
        partition_names = instance.get_dynamic_partitions(partitions_def.name)

        for partition in partition_names:
            materialize(assets=[asset_to_materialise,
                                *(all_assets.get(k) for k in materialised_asset_names)],
                        selection=[asset_to_materialise],
                        partition_key=partition,
                        instance=instance)
        materialised_asset_names.add(asset_name_to_materialise)
penelopeysm commented 4 weeks ago

Progress!!!!!!!

Run locally with:

git fetch
git checkout origin/automated-publishing

# activate your venv
# optionally, launch `dagster dev` so that you can watch this go on in the web UI

set -a; source .env; set +a
POPGETTER_COUNTRIES=bel,gb_nir python -m popgetter.run all

Or use the Dockerfile:

git fetch
git checkout origin/automated-publishing
export SAS_TOKEN=(whatever it should be)
docker build -t popgetter-prod . --build-arg "SAS_TOKEN=$SAS_TOKEN"
docker run -it popgetter-prod

Yet to figure out: