wildlife-dynamics / ecoscope-workflows

An extensible task specification and compiler for local and distributed workflows.
BSD 3-Clause "New" or "Revised" License
0 stars 1 forks source link

POC: Split-Apply-Combine using lithops #28

Open walljcg opened 2 weeks ago

walljcg commented 2 weeks ago

We want to be able to perform split-apply-combine operations on (geo)pandas dataframes in a distributed / scalable way both on a single multi-core machine and in the cloud. It looks like lithops may take care of the architecture for this already (https://github.com/lithops-cloud/lithops). But we need to research how to generalize the use of lithops across tasks may need both the local/cloud split/apply/combine paradigm.

cisaacstern commented 1 week ago

In an initial exploration session today I'm encouraged by what I'm seeing with lithops so far. 👍

Using the lithops.LocalhostExecutor, I've adapted this official map reduce example into a script (see below) that generates EcoMaps in parallel using python multiprocessing (mediated by lithops).

The script (click to expand) ```python # mr.py import sys import time from importlib import resources import geopandas as gpd import lithops from ecoscope_workflows.tasks.results import draw_ecomap def get_source_table() -> gpd.GeoDataFrame: path = resources.files("ecoscope_workflows.tasks.analysis") / "calculate-time-density.example-return.parquet" return gpd.read_parquet(path) def split_source_table(source_table: gpd.GeoDataFrame, n: int): # TODO actually split source_table into groups here return [source_table for _ in range(n)] def map_function(geodataframe: gpd.GeoDataFrame) -> str: ecomap_kws = { "static": False, "height": 1000, "width": 1500, "search_control": True, "title": "Great Map", "title_kws": {}, "tile_layers": [], "north_arrow_kws": {}, "add_gdf_kws": {}, } return draw_ecomap(geodataframe, **ecomap_kws) def reduce_function(results): ecomaps = [] for ecomap in results: ecomaps.append(ecomap) return ecomaps if __name__ == "__main__": n = int(sys.argv[1]) print (f"Splitting source table into {n} parts") source_table = get_source_table() fexec = lithops.LocalhostExecutor(log_level="CRITICAL") start = time.time() fexec.map_reduce(map_function, split_source_table(source_table, n), reduce_function, spawn_reducer=0) fexec.get_result() elapsed = time.time() - start print(f"Elapsed time for generating {n} ecomaps: {round(elapsed)}s") ```

Initial results:

➜ python3 mr.py 1
Splitting source table into 1 parts
Elapsed time for generating 1 ecomaps: 12s

➜ python3 mr.py 2
Splitting source table into 2 parts
Elapsed time for generating 2 ecomaps: 12s

➜ python3 mr.py 5
Splitting source table into 5 parts
Elapsed time for generating 5 ecomaps: 13s

➜ python3 mr.py 15
Splitting source table into 15 parts
Elapsed time for generating 15 ecomaps: 28s

So wall time is basically flat from 1 -> 5 maps, and then once I get to 15, it starts going up, because I've crossed the threshold of the number of cores I have on my laptop (8), and the scheduler can't run everything in parallel.

Next steps which I'll work on later this weekend:

walljcg commented 1 week ago

This looks great Charles and indeed very promising. I'm curious about the spawn_reducer parameter - doesn't setting that to zero effectively then not wait for any results to be returned before executing the reduce_function()?

cisaacstern commented 1 week ago

Good question, I definitely don't 100% grok this option yet, particularly how it behaves in a local context, but my basic understanding (which could be wrong) was that this option determines when the container / process that will eventually execute the reduction is started, not when it is called. For sure worth looking into this further, which I will do.