dataverbinders / nl-open-data

A Flexible Python ETL toolkit for datawarehousing framework based on Dask, Prefect and the pydata stack
https://dkapitan.github.io/nl-open-data
MIT License
0 stars 1 forks source link

Ensure a dataset is uploaded once it is converted #67

Closed galamit86 closed 3 years ago

galamit86 commented 3 years ago

Currently, if running a statline-bq flow over multiple datasets, the upload_to_gcs task starts only once all datasets have been converted to parquet. This is caused due to listing files_parquet as one of the upstream_tasks dependencies here, like so:

gcs_folders = upload_to_gcs.map(
        dir=pq_dir,
        source=unmapped(source),
        odata_version=odata_versions,
        id=ids,
        config=unmapped(config),
        gcp_env=unmapped(gcp_env),
        upstream_tasks=[files_parquet, col_desc_files, go_nogo],
    )

This was done as the upload is directed to a folder, and is a non-data dependancy. It cannot be simply removed, as that will cause the upload_to_gcs task to run prior to the files being converted.

This causes situations where if one of 10 datasets is significantly larger, the other 9 would not be uploaded until it is completed. If an error occurs, (i.e. the VM shuts down) - they are all unnecessarily erased.

This dependancy should be more nuanced - and prioritise upload of a dataset whenever possible.

galamit86 commented 3 years ago

Correction upload_to_gcs can start before all datasets finish, but still a single large dataset delays that significantly. This may be due to not having free workers to run the tasks.

The main issue stands - find a way to prioritise upload_to_gcs for a converted dataset over converting other datasets.

galamit86 commented 3 years ago

Effectively closed by #86