i2mint / meshed

Link functions up into callable objects
https://i2mint.github.io/meshed/
MIT License
4 stars 3 forks source link

Local+Cloud Scheduler Flexibility POC #65

Closed thorwhalen closed 1 year ago

thorwhalen commented 1 year ago

Read first a bit of Node computation flexibility for a proper context.

What we want here some specialized tools to introduce some flexibility into DAG's scheduling. Namely being able to assign workers to tasks/jobs -- even more concretely, here, to be able to tell the (local) DAG that it must send some its tasks (represented by "func nodes") to be computed in the cloud, via an http web service API.

In this first version, we will:

Example code

So, taking it with a grain of salt please, here's what it might look like:

import i2
from meshed import code_to_dag

def find_funcs(dag, func_outs):
    return list(dag.find_funcs(lambda x: x.out in func_outs))

def mk_dag_with_wf_funcs(dag, ws_funcs):
    return dag.ch_funcs(ws_funcs)

@code_to_dag
def mk_hybrid_dag():
    funcs_to_cloudify = find_funcs(dag, func_ids_to_cloudify)
    ws_app = mk_web_service(funcs_to_cloudify)
    ws_funcs = py_binder_funcs(ws_app, funcs_to_cloudify)
    ws_dag = mk_dag_with_wf_funcs(dag, ws_funcs)

assert str(i2.Sig(mk_hybrid_dag)) == '(dag, func_ids_to_cloudify)'

mk_hybrid_dag.dot_digraph()
image

Note that I've already given, here, the code (or proposal code) for two of the functions needed for mk_hybrid_dag. The ones that are remaining are mk_web_service to make the web-service (with py2http), and py_binder_funcs to make the python binders to the needed APIs.

Also note that mk_hybrid_dag doesn't return the ws_app object here, but probably should (or be added as an attribute to wf_dag so it's available) since for a demo, you'd like to take that object and use it to launch the service.

Example test

from meshed.examples import online_marketing_funcs as funcs
from meshed import DAG

# The parameters
dag = DAG(funcs)
funcs_ids_to_cloudify = ['cost', 'revenue']
input_dict = dict(
    impressions=1000, 
    cost_per_impression=0.02, 
    click_per_impression=0.3, 
    sales_per_click=0.05, 
    revenue_per_sale=100
)

# Calling mk_hybrid_dag
ws_dag = mk_hybrid_dag(dag, funcs_ids_to_cloudify)

# Testing ws_dag
#
#   Here, we assume we can find the ws_app as an attribute of ws_dag 
#   (could also have mk_hybrid_dag return a (ws_dag, ws_app) tuple instead)
# 
#   We also assume we can launch the webservice with launch_webservice, which should 
#   have the effect of running the webservice in a separate thread.
#   I wrote stuff for that in py2http/http2py; I think there should still be traces of 
#   it there.
with launch_webservice(ws_dag.ws_app):  # runs the webservice in a separate thread (there's stuff in py2http/http2py for this)

    dag_result = dag(**input_dict)
    ws_dag_result = ws_dag(**input_dict)
    assert dag_result == ws_dag_result
andeaseme commented 1 year ago

https://github.com/i2mint/meshed/issues/65