pdxjohnny commented 5 years ago

The Data Flow Facilitator portion of DFFML would be great to port to other languages. As it's just a helper to use a data flow architecture. I hear rust has some async like support these days.

APIs should be kept the same. If needed we'll change the python APIs to match so that it's all one API.

pdxjohnny commented 5 years ago

Data Flow Programming

Explainer on what data flow programming is and how it works. Alice thinks in parallel.

We need to come up with serveral metrics to track and plot throughout. We also need to plot in relation to other metrics for tradeoff analysis.

We could also make this like a choose your own adventure style tutorial, if you want to do it with threads, here's your output metrics. We can later show that we're getting these metrics by putting all the steps into a dataflow and getting the metrics out by running them. We could then show how we can ask the orchestrator to optimize for speed, memory, etc. Then add in how you can have the orchestrator take those optimization constriants from dynamic conditions such as how much memory is on the machine you are running on, or do you have access to a k8s cluster. Also talked about power consumption vs. speed trade off for server vs. desktop. Could add in edge constraints like network latency.

Will need to add in metrics API and use in various places in orchestrators and expose to operations to report out. This will be the same APIs we'll use for stub operations to estimate time to completion, etc.

This could be done as an IPython notebook.

License: Public Domain
import os
import asyncio
import pathlib
import contextlib
import dataclasses
import urllib.parse
import concurrent.futures

import bs4
import dffml
import aiohttp

DOWNLOAD_PATH = pathlib.Path(__file__).parent.joinpath("downloads")
if not DOWNLOAD_PATH.is_dir():

def parse_bs4(html_doc):
    return bs4.BeautifulSoup(html_doc, "html.parser")

def mkurl(endpoint, **kwargs):
    url = urllib.parse.urlparse(endpoint)
    url = url._replace(**kwargs)
    return urllib.parse.urlunparse(url)

class SiteContext:
    username: str
    password: str
    endpoint: str = ""
    headers: dict = dataclasses.field(
        default_factory=lambda: {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0",
            "Connection": "keep-alive",

    def mkurl(self, **kwargs):
        return mkurl(self.endpoint, **kwargs)

async def main_with_stacks(ctx: SiteContext, loop, stack, astack):
    # Create thread pool for CPU bound tasks
    pool = stack.enter_context(concurrent.futures.ThreadPoolExecutor())
    # Create aiohttp client session which pools TCP connections for reuse
    session = await astack.enter_async_context(
    # Get initial web page
    async with session.get(ctx.endpoint) as response:
        # Read home page
        initial_page_body_text = await
        # Parse home page
        initial_page_body_soup = await loop.run_in_executor(
            pool, parse_bs4, initial_page_body_text
        # Find authenticity_token (This is the CSRF token)
        authenticity_token = initial_page_body_soup.find(
            {"name": "authenticity_token"},
        authenticity_token = authenticity_token.get("value")
    # Login. Server sends cookie which is used to authenticate us in subsequent
    # requests. The cookie is stored in the ClientSession.
    # Cookie might be only sent if something like "rememeber" is sent.
    # Use chrome devtools Network tab to see login request data, make sure to
    # check "Preserve Log" before you trigger the login page. Incognito window
    # can be helpful for getting a logged-out session.
            "authenticity_token": authenticity_token,
            "username": ctx.username,
            "password": ctx.password,
            "remember": "on",

async def main():
    # Grab loop
    loop = asyncio.get_event_loop()
    # Create a context using our credentials
    ctx = SiteContext(
    with contextlib.ExitStack() as stack:
        async with contextlib.AsyncExitStack() as astack:
            await main_with_stacks(ctx, loop, stack, astack)

if __name__ == "__main__":

DFFML's Current Working Data Flow Execution Model

graph TD
  subgraph dataflow_execution[Data Flow Execution]

    inputs[New Inputs]
    opimps[Operation Implementations]

    ictx[Input Network]
    opctx[Operation Network]
    opimpctx[Operation Implementation Network]
    rctx[Redundency Checker]
    lctx[Lock Network]

    opctx_operations[Determine which Operations may have new parameter sets]
    ictx_gather_inputs[Generate Operation parameter set pairs]
    opimpctx_dispatch[Dispatch operation for running]
    opimpctx_run_operation[Run an operation using given parameter set as inputs]

    inputs --> ictx

    operations -->|Register With| opctx
    opimps -->|Register With| opimpctx

    ictx --> opctx_operations
    opctx --> opctx_operations

    opctx_operations --> ictx_gather_inputs
    ictx_gather_inputs --> rctx
    rctx --> |If operation has not been run with given parameter set before| opimpctx_dispatch

    opimpctx_dispatch --> opimpctx

    opimpctx --> lctx

    lctx --> |Lock any inputs that can't be used at the same time| opimpctx_run_operation

    opimpctx_run_operation --> |Outputs of Operation become inputs to other operations| inputs