wildlife-dynamics / ecoscope-workflows

An extensible task specification and compiler for local and distributed workflows.
BSD 3-Clause "New" or "Revised" License
0 stars 1 forks source link

Split apply combine #45

Open cisaacstern opened 1 week ago

cisaacstern commented 1 week ago

Closes #28

cisaacstern commented 1 week ago

@walljcg here's a start on how we can leverage lithops map reduce (as demonstrated in https://github.com/wildlife-dynamics/ecoscope-workflows/issues/28#issuecomment-2183694749) in the context of ecoscope workflows/tasks. The core task is here:

https://github.com/wildlife-dynamics/ecoscope-workflows/blob/6e416b6a67cdc3f999b27882d4339705ff8e0ff8/ecoscope_workflows/tasks/parallelism/_lithops.py#L8-L38

And here is an example of what it might look like compiled into a runnable workflow:

https://github.com/wildlife-dynamics/ecoscope-workflows/blob/6e416b6a67cdc3f999b27882d4339705ff8e0ff8/examples/dags/time_density_map_reduce.script_lithops.py#L79-L104

💡 Note that this does not yet run, but is more-so one step above pseudocode. Among other things, of course we don't actually need to split-apply-combine for time density maps in this way, I'm just using that as a toy example here.

In terms of where a script like this could run, if it runs locally, it can parallelize across python processes. In the cloud, we could package this script itself as a "launcher" serverless function on GCP Cloud Run, which once it gets to the lithops map-reduce section, would spawn additional Cloud Run functions. The "launcher" function (running this script) would then wait for all of the parallelized tasks to complete, gather the result, and send it back to wherever we need it (database api call to the server maybe).

Note also that this pattern (launch lithops from another cloud function) is conceptually almost identical to the Lithops Airflow Operator. (Which we may also want to use in the future for more complex DAGs, but "deploy lithops from a cloud function" is definitely lower latency for "easy/small" DAGs, as we've discussed.)

I've also iterated a bit on the workflow YAML spec design, so we can represent the map reduce operation there something like this:

https://github.com/wildlife-dynamics/ecoscope-workflows/blob/6e416b6a67cdc3f999b27882d4339705ff8e0ff8/examples/compilation-specs/time-density.yaml#L39-L55

To make this more legible, I'm experimenting with borrowing the GitHub Actions data structure of:

- name: "a human readable name"
  id: |
    # a unique id. we were using the task names for this before,
    # but to support reuse of the same task within a workflow, we'll need an `id`
  uses:  # the action name in github, or for us, the task importable reference
  from: |
    # i'm not actually using this here, and i don't think it's part of GitHub Actions,
    # but i thought this might be a nice way to include the github or pypi path for
    # extension task packages, which could be dynamically installed in a venv at compile
    # time (like pre-commit). https://github.com/moradology/venvception is a nice little
    # package that does this (ephemeral venvs), that a former collaborator wrote for our
    # last project.
  with:
    # kwargs to pass to the task
cisaacstern commented 3 days ago

Based on the current working example here, some observations/thoughts re: performance and optimization (all of the below is based on local testing; I have not run on Cloud Run yet):

Next steps:

atmorling commented 2 days ago
  • About 4 whole seconds are for calling calculate_time_density itself (I have not profiled further within that call yet)

Just throwing this up here as a data point: etd_range

this loop and the calls to intersect1d are the bulk of the time https://github.com/wildlife-dynamics/ecoscope/blob/6e7dd1ea782c2eef07d7bf794be76e7d4a077042/ecoscope/analysis/UD/etd_range.py#L191-L198

cisaacstern commented 2 days ago

Wow @atmorling this is a heroic insight!! If the cost is largely numpy, then it should be optimizable!

Can you share how you generated this profile image? I am still a n00b when it comes to this stuff

atmorling commented 2 days ago

This is just the result of running cProfile a test script visualized via SnakeViz , specifically: python -m cProfile -o etd.prof test_etd.py python -m snakeviz etd.prof

I took another look at this and there's a sort inside intersect1d, and we call intersect1d a lot (ncalls in the below image)

loop-times (this is from the same visualization as above)

cisaacstern commented 2 days ago

Took a stap at jit-compiling that loop, so far to no avail 😄 https://github.com/wildlife-dynamics/ecoscope/pull/193

cisaacstern commented 2 days ago

A few quick notes on build / deploy / call commands:

Build the runtime (container) with:

$ lithops git:(split-apply-combine) ✗ LITHOPS_CONFIG_FILE=../../.ecoscope-workflows-tmp/.lithops_config \
lithops runtime build -b gcp_cloudrun -f Dockerfile.cloudrun ecoscope-workflows-runtime

Deploy it with:

$ lithops git:(split-apply-combine) ✗ LITHOPS_CONFIG_FILE=../../.ecoscope-workflows-tmp/.lithops_config \
lithops runtime deploy -b gcp_cloudrun ecoscope-workflows-runtime

Call the script with:

$ ecoscope-workflows git:(split-apply-combine) LITHOPS_CONFIG_FILE=.ecoscope-workflows-tmp/.lithops_config \      
ECOSCOPE_WORKFLOWS_TMP=gs://ecoscope-workflows-tmp \             
python3 examples/dags/time_density_map_reduce.script_lithops.py

All with config:

# .ecoscope-workflows-tmp/.lithops_confi

lithops:
  backend: gcp_cloudrun
  storage: gcp_storage

gcp:
  region: us-central1
  credentials_path: <path to service account json>

gcp_cloudrun:
  runtime: us.gcr.io/ecoscope-poc/ecoscope-workflows-runtime
  runtime_cpu: 2
  runtime_memory: 1000