PaulSchweizer / flowpipe

Very simple flow-based programming framework.
MIT License
232 stars 30 forks source link

Multiprocessing optimizations #81

Open PaulSchweizer opened 5 years ago

PaulSchweizer commented 5 years ago

Currently, every node will be evaluated in it's own process. There are however situations where chains of nodes could be grouped together and evaluated in one single process, thus limiting the amount of processes and saving on the startup time for these unnecessary processes.

Consider this graph:

from flowpipe.graph import Graph
from flowpipe.node import Node

@Node(outputs=['out'])
def MyNode(in_):
    return {'out': 1}

graph = Graph()

a = MyNode(name="Grp1-a", graph=graph)
b = MyNode(name="Grp1-b", graph=graph)
c = MyNode(name="Grp2-c", graph=graph)
d = MyNode(name="Grp2-d", graph=graph)
e = MyNode(name="Grp3-e", graph=graph)

a.outputs["out"] >> b.inputs["in_"]
b.outputs["out"] >> e.inputs["in_"]["0"]

c.outputs["out"] >> d.inputs["in_"]
d.outputs["out"] >> e.inputs["in_"]["1"]
+-------------+          +-------------+          +--------------------+
|   Grp1-a    |          |   Grp1-b    |          |       Grp3-e       |
|-------------|          |-------------|          |--------------------|
o in_<>       |     +--->o in_<>       |          % in_                |
|         out o-----+    |         out o--------->o  in_.0<>           |
+-------------+          +-------------+     +--->o  in_.1<>           |
+-------------+          +-------------+     |    |                out o
|   Grp2-c    |          |   Grp2-d    |     |    +--------------------+
|-------------|          |-------------|     |
o in_<>       |     +--->o in_<>       |     |
|         out o-----+    |         out o-----+
+-------------+          +-------------+

We have 5 nodes resulting in 5 processes. If we would group the nodes, we could lower the number of processes to 3 while still maintaining the same benefits of multi-processing.

Consider this (naive) pseudo-code on how this grouping could work:

groups_counter = 0
groups = {}
visited_nodes = []

for node in graph.nodes:

    if node.name in visited_nodes:
        continue

    if len(node.upstream_nodes) < 2:
        grp = str(groups_counter)
        groups[grp] = {
            "nodes": [node.name]
        }
        groups_counter += 1
        visited_nodes.append(node.name)

        parent = node
        while True:
            if len(parent.downstream_nodes) != 1 or len(parent.upstream_nodes) > 1:
                break
            down = parent.downstream_nodes[0]
            if len(down.upstream_nodes) != 1:
                break
            groups[grp]["nodes"].append(down.name)
            visited_nodes.append(down.name)
            parent = down
    else:
        groups[str(groups_counter)] = {
            "nodes": [node.name]
        }
        groups_counter += 1
        visited_nodes.append(node.name)

For the above example, the result would look like this:

{
  "0": {
    "nodes": [
      "Grp1-a",
      "Grp1-b"
    ]
  },
  "1": {
    "nodes": [
      "Grp2-c",
      "Grp2-d"
    ]
  },
  "2": {
    "nodes": [
      "Grp3-e"
    ]
  }
}

To make this possible we'd need to solve these issues:

The grouping could then be an option when multiprocessing:

graph.evaluate_multiprocessed(optimized_evaluation=True)

I have not tested whether this would actually be beneficial, but it made sense to me.

neuneck commented 5 years ago

I like this idea a lot.

A simple and probably very powerful way to move forward is to restrict the focus on the simple case that you present in your example: Linear chains of nodes. They are

  1. easy to find,
  2. there are no dependencies between different chains by definition (because a node with more than one input can only occur as the first node of such a chain) and
  3. are also guaranteed not to waste any optimization potential from the multiprocessing, as they form a dependency chain by definition.

As for your second concern: Would it be possible to do the following:

  1. Identify all linear chains of nodes (there might be chains of length one, or the entire graph might be one chain, which is all fine).
  2. For each chain, a new Graph is instantiated and the nodes that are assigned to this chain get added into that graph.
  3. Instead of having a graph.evaluate_node_in_process method, introduce a graph.evaluate_subgraph_in_process method and spawn a process for each chain (which might contain only a single individual node).

So, basically I suggest collapsing the graph into subgraphs in such a way that each remaining component has at least two inputs and at most one output (with the exceptions of the entry and exit nodes of the original graph).

I hope I managed to express myself clearly - If I am being confusing, don't hesitate to ask what the hell I mean :-)

PaulSchweizer commented 5 years ago
  1. Limiting to chains only for now: Agreed!

  2. Subgraphs I like the subgraph idea but I think we should treat that in a separate ticket and decide on how to deal with sub graphs properly #84

We might actually be able to achieve the sub-graph solution without sub-graphs though, by using a dedicated node instead. The node would have one in and one output just for treating the relationships. The node would then evaluate the node chain assigned to it. Hope that makes sense, I can add an example later.