owid / etl

A compute graph for loading and transforming OWID's data
https://docs.owid.io/projects/etl
MIT License
58 stars 18 forks source link

:tada: etl d profile CLI for profiling steps #2843

Closed Marigold closed 1 week ago

Marigold commented 1 week ago

How to review

Simple test (try replacing --mem by --cpu)

etl d profile garden/biodiversity/2024-01-25/cherry_blossom --mem

More interesting test for surface temperature

etl d profile meadow/climate/2023-12-20/surface_temperature --mem

You can run this directly on the staging server with


ssh owid@staging-site-profile-cli "cd etl && poetry run etl d profile meadow/climate/2023-12-20/surface_temperature --mem"
owidbot commented 1 week ago
Quick links (staging server): Site Admin Wizard

Login: ssh owid@staging-site-profile-cli

chart-diff: ✅ No charts for review.
data-diff: ✅ No differences found ```diff = Dataset garden/climate/2023-12-20/surface_temperature = Table surface_temperature Legend: +New ~Modified -Removed =Identical Details Hint: Run this locally with etl diff REMOTE data/ --include yourdataset --verbose --snippet ``` Automatically updated datasets matching _weekly_wildfires|excess_mortality|covid|fluid|flunet|country_profile|garden/ihme_gbd/2019/gbd_risk_ are not included

Edited: 2024-06-19 19:53:24 UTC Execution time: 16.84 seconds

Marigold commented 1 week ago

Tried running it for modified surface temperature and it shows ~10gb less memory usage (not great, but it's an improvement...)

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    52    189.1 MiB    189.1 MiB           1   def run(dest_dir: str) -> None:
    53                                             # Activates the usage of the global context. Using this option can enhance the performance
    54                                             # of initializing objects in single-threaded applications.
    55    189.1 MiB      0.0 MiB           1       pyproj.set_use_global_context(True)  # type: ignore
    56
    57                                             #
    58                                             # Load inputs.
    59                                             #
    60                                             # Retrieve snapshot.
    61    190.3 MiB      1.3 MiB           1       snap = paths.load_snapshot("surface_temperature.gz")
    62
    63                                             # Read surface temperature data from snapshot
    64  16081.3 MiB  15891.0 MiB           1       da = _load_data_array(snap)
    65
    66                                             # Read the shapefile to extract country informaiton
    67   8063.3 MiB  -8018.1 MiB           1       snap_geo = paths.load_snapshot("world_bank.zip")
    68   8063.3 MiB      0.0 MiB           1       shapefile_name = "WB_countries_Admin0_10m/WB_countries_Admin0_10m.shp"
    69
    70                                             # Check if the shapefile exists in the ZIP archive
    71   8213.4 MiB      1.5 MiB           2       with zipfile.ZipFile(snap_geo.path, "r"):
    72                                                 # Construct the correct path for Geopandas
    73   8064.7 MiB      0.0 MiB           1           file_path = f"zip://{snap_geo.path}!/{shapefile_name}"
    74
    75                                                 # Read the shapefile directly from the ZIP archive
    76   8213.4 MiB    148.6 MiB           1           shapefile = _load_shapefile(file_path)
    77
    78                                             #
    79                                             # Process data.
    80                                             #
    81
    82                                             # Initialize an empty dictionary to store the country-wise average temperature.
    83   8213.4 MiB      0.0 MiB           1       temp_country = {}
    84
    85                                             # Add Global mean temperature
    86   8213.4 MiB      0.0 MiB           1       weights = np.cos(np.deg2rad(da.latitude))
    87   8213.4 MiB      0.0 MiB           1       weights.name = "weights"
    88   8213.5 MiB      0.1 MiB           1       clim_month_weighted = da.weighted(weights)
    89   8213.1 MiB     -0.3 MiB           1       global_mean = clim_month_weighted.mean(["longitude", "latitude"])
    90   8213.1 MiB      0.0 MiB           1       temp_country["World"] = global_mean
    91
    92                                             # Initialize a list to keep track of small countries where temperature data extraction fails.
    93   8213.1 MiB      0.0 MiB           1       small_countries = []
    94
    95                                             # Iterate over each row in the shapefile data.
    96  21120.7 MiB -894285.8 MiB         252       for i in tqdm(range(shapefile.shape[0])):
    97                                                 # Extract the data for the current row.
    98  21120.7 MiB -1152523.6 MiB         251           geometry = shapefile.iloc[i]["geometry"]
    99  21120.7 MiB -1152523.6 MiB         251           country_name = shapefile.iloc[i]["WB_NAME"]
   100
   101  21120.7 MiB -1152523.6 MiB         251           try:
   102                                                     # Clip to the bounding box for the country's shape to significantly improve performance.
   103  21120.7 MiB -1152523.6 MiB         251               xmin, ymin, xmax, ymax = geometry.bounds
   104  21576.0 MiB -1143158.7 MiB         251               clip = da.rio.clip_box(minx=xmin, miny=ymin, maxx=xmax, maxy=ymax)
   105
   106                                                     # Clip data to the country's shape.
   107                                                     # NOTE: if memory is an issue, we could use `from_disk=True` arg
   108  21120.6 MiB -1009634.0 MiB         221               clip = clip.rio.clip([mapping(geometry)], shapefile.crs)
   109
   110                                                     # Calculate weights based on latitude to account for area distortion in latitude-longitude grids.
   111  21120.6 MiB -832724.3 MiB         195               weights = np.cos(np.deg2rad(clip.latitude))
   112  21120.6 MiB -832724.3 MiB         195               weights.name = "weights"
   113
   114                                                     # Apply the weights to the clipped temperature data.
   115  21120.6 MiB -832724.3 MiB         195               clim_month_weighted = clip.weighted(weights)
   116
   117                                                     # Calculate the weighted mean temperature for the country.
   118  21120.7 MiB -833667.1 MiB         195               country_weighted_mean = clim_month_weighted.mean(dim=["longitude", "latitude"]).values
   119
   120                                                     # Store the calculated mean temperature in the dictionary with the country's name as the key.
   121  21120.7 MiB -833702.4 MiB         195               temp_country[country_name] = country_weighted_mean
   122
   123                                                     # Clean up the memory
   124  21120.7 MiB -833702.4 MiB         195               del clip
   125  21120.7 MiB -833702.4 MiB         195               del weights
   126  21120.7 MiB -833702.4 MiB         195               del clim_month_weighted
   127
   128  16602.2 MiB -331564.2 MiB          56           except (NoDataInBounds, OneDimensionalRaster):
   129  16602.2 MiB -121167.4 MiB         112               log.info(
   130  16602.2 MiB -60583.7 MiB          56                   f"No data was found in the specified bounds for {country_name}."
   131                                                     )  # If an error occurs (usually due to small size of the country), add the country's name to the small_countries list.  # If an error occurs (usually due to small size of the country), add the country's name to the small_countries list.
   132  16602.2 MiB -60583.7 MiB          56               small_countries.append(shapefile.iloc[i]["WB_NAME"])
   133
   134                                             # Log information about countries for which temperature data could not be extracted.
   135  18846.1 MiB  -2274.6 MiB           2       log.info(
   136  18846.1 MiB      0.0 MiB           1           f"It wasn't possible to extract temperature data for {len(small_countries)} small countries as they are too small for the resolution of the Copernicus data."
   137                                             )
   138
   139                                             # Define the start and end dates
   140  18846.2 MiB      0.1 MiB           1       start_time = da["time"].min().dt.date.astype(str).item()
   141  18846.2 MiB      0.0 MiB           1       end_time = da["time"].max().dt.date.astype(str).item()
   142
   143                                             # Generate a date range from start_time to end_time with monthly frequency
   144  18846.5 MiB      0.3 MiB           1       month_middles = pd.date_range(start=start_time, end=end_time, freq="MS") + pd.offsets.Day(14)
   145
   146                                             # month_starts is a DateTimeIndex object; you can convert it to a list if needed
   147  16205.9 MiB  -2640.6 MiB           1       month_starts_list = month_middles.tolist()
   148
   149                                             # df of temperatures for each country
   150  16205.9 MiB      0.0 MiB           1       df_temp = pd.DataFrame(temp_country)
   151  16205.9 MiB      0.0 MiB           1       df_temp["time"] = month_starts_list
   152
   153  16206.3 MiB      0.4 MiB           1       melted_df = df_temp.melt(id_vars=["time"], var_name="country", value_name="temperature_2m")
   154
   155                                             # Create a new table and ensure all columns are snake-case and add relevant metadata.
   156  16206.5 MiB      0.2 MiB           1       tb = Table(melted_df, short_name=paths.short_name, underscore=True)
   157  16206.5 MiB      0.0 MiB           1       tb = tb.set_index(["time", "country"], verify_integrity=True)
   158
   159  16206.5 MiB      0.0 MiB           1       tb["temperature_2m"].metadata.origins = [snap.metadata.origin]
   160                                             #
   161                                             # Save outputs.
   162                                             #
   163                                             # Create a new meadow dataset with the same metadata as the snapshot.
   164  16211.9 MiB      5.4 MiB           1       ds_meadow = create_dataset(dest_dir, tables=[tb], check_variables_metadata=True, default_metadata=snap.metadata)
   165
   166                                             # Save changes in the new garden dataset.
   167  16217.5 MiB      5.5 MiB           1       ds_meadow.save()
Marigold commented 1 week ago

@veronikasamborska1994 this is the profiling CLI I was talking about. I tested it on the surface temperature and made some naive optimizations based on it (mostly deleting objects with del ... which happens automatically in python, but sometimes helps to have it explicit). If the output above is right, peak memory should be around ~22GB compared to ~30GB. I didn't change the logic, so perhaps we could try merging it and see whether it fixes the nightly ETL run?

Marigold commented 1 week ago

could be potentially useful to maybe log the maximum memory/cpu load at the very end but if it's not super straightforward to add I wouldn't bother!

That's a good idea, I'll see if I can make it work in the next iteration.