graphistry / pygraphistry

PyGraphistry is a Python library to quickly load, shape, embed, and explore big graphs with the GPU-accelerated Graphistry visual graph analyzer
BSD 3-Clause "New" or "Revised" License
2.17k stars 206 forks source link

[FEA] Node reductions #193

Open lmeyerov opened 3 years ago

lmeyerov commented 3 years ago

Node reductions are great!

What

This is an important case of graph reductions. There are requests in both the UI and APIs to do reductions at the level of node, multiedge, and multiple nodes / subgraphs. Instead of simply filtering out the selected entities, the idea is they'd be partitioned into subgraphs, and different kinds of operations would replace them new topologies, and attributes representing them. They seem like some sort of continuous derivative operators, and may even be invertable.

Example

One of the most common cases is dropping a node and propagating its attributes and edges/attributes through its neighborhood, so

In hypergraph:

g = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
  direct=True)

We'd get 3-node graph (a)-[e1]->(m)<-[e2]-(b)

However, we really just want (a)-[m]-(b) ... so need a way to drop node (m) and synthesize ede -[m]->. This gets weirder in the cases of multiedges, cliques, and different kinds of nodes:

g2 = g.replace_node_with_edges(
  select_nodes=g._nodes.query('type="m"'),
  edge_reducer=g.reducers.pair_unique,
  edge_attr_reducer={'weight': 'max'})

g2 = g.replace_node_with_edges(select_nodes='m')
g2 = g.replace_node_with_edges(select_nodes=[1])
g2 = g.replace_node_with_edges(select_nodes=[False, True, False])
g2 = g.replace_node_with_edges(select_nodes=g._nodes.query('type="m"'))

g2 = g.replace_node_with_edges(select_nodes='m', edge_reducer=g.reducer.pair_unique_undirected, edge_reducer_attr={'weight': 'max', ...})

Sample reducers

This gets at patterns of graph reducers, where we want to take nodes/edges, remove them, and replace with other nodes edges. For example, we can replace all type='m' nodes with (a: x)-[e]->(b: x) via a node reduction driven by a selector predicate:

1. Node reducers: Drop a node by converting to edges

g2.replace_node_with_edges(selected=g._nodes['type'] == m, reducer=lambda edges: strong_component(edges_to_nodes(edges)))

The idea is pull out nodes matching some predicate

def replace_node_with_edges(g):
  nodes = g._nodes[ selected ]
  new_edges = []
  for node in g._nodes[g._node]:
    in_edges = g._edges[ g._edges[g._src] == node ]
    out_edges = g._edges[ g._edges[g._src] == node ]
    new_edges.append( reducer(pd.concat([in_edges, out_edges]))
  new_nodes = g._nodes[ ~selected ]
  g2 = g.nodes(new_nodes).edges(new_edges)
  return g2

where

def strong_component(edges, s='src', d='dst')
    node_ids = pd.concat([edges[s].unique() + edges[d].unique()]).unique()
   edges = []
    for x in range(len(node_ides)):
       for y in range(x+1, len(node_ids)):
           edges.append([x,y])
   return edges

2. Sugar for common cases

g2 = g.replace_node_with_edges(select_key='type', select_value='m', reducer=reducers.pair_unique)

3. Build into the hypergraph transform, such as during or post-process

g2 = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
  edges={ }, # no direct edges
  reducer_subgraphs=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'output_nodes': ['x'],  'reducer': 'pair_unique'} ]
)

Or as a post-process:

g2 = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
   direct=true,
   reduce=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'reducer': 'pair_unique'} ]
)

Plan

I think it's worth doing some principled experiments of different shaping use cases first (replace_node_with_edges()) as the API is not at all obvious:

Ideally we come to something simple + extensible in PyGraphistry, and use that to drive subsequent incorpration in hypergraphs, UI, and new preprocessing APIs.

Common scenarios

pradkrish commented 3 years ago

I am interested in working on this issue. I am currently reading through this and collecting some thoughts around it, do you mind assigning it to me?

lmeyerov commented 3 years ago

Awesome, this has been a long time coming!

I'm guessing the idea here is not super well-formed once you work through some examples. The ideas of collapsing nodes/edges and then replacing with simpler nodes/edges with summary stats is a common request, but I'm sure a lot of edge cases. So even the basic API definition seems up for grabs too.

My intuition is this will be used similarly to df.groupby(..).agg(...) in pandas, except needs to generalize to grouping nodes: removing them + their edges; reconnecting the new summary nodes to their old neighborhoods (accounting for replaced parts); and enriching with aggregate stats both on the new resulting nodes and edges. So probably a lot of APIs we can learn from + reuse!

lmeyerov commented 2 years ago

Oh, I forgot to mention: we're starting to do both CPU (pandas) + GPU (cudf/dask_cudf) flavors of implementations.

Totally fine to just start with CPU and I can help on GPU. To make the process easier, helps a lot to write as vectorized pandas. E.g., s = df['some_col'].sum() vs for x in df['some_col']: s += x. A lot of code ends fully reusable or just swapping a few pd for cudf calls.

pradkrish commented 2 years ago

My ideas are still incomplete. I would like to pick your brains on what you think the output should look like for simple use cases. Discussing specific use cases can help build a bigger picture. If you take the following dataframe

g = graphistry.edges(pd.DataFrame([{'x': 'a', 'y': 'm'},
                                   {'x': 'b', 'y': 'm'},
                                   {'x': 'c', 'y': 'n'},
                                   {'x': 'd', 'y': 'm'}]), 'x', 'y')

Replacing all the nodes (m) with an edge [m] results in a clique(I guess) consisting of nodes (a), (b) and (d). Let's say we have a naive implementation of replace_node_with_edges('m'), which takes a node that need to be replaced with edges, like you described above. Let's treat the function as a blackbox for now and ignore the implementation details. Would you be able to illustrate how you imagine the output to look like for this function call?

g.replace_node_with_edges('m') 
lmeyerov commented 2 years ago

a->m<-b, c->n, d->m

Yes, I think you've got it: (a), (b), and (d) would now form a clique, so it'd be equiv to writing:

g2 = graphistry.edges(pd.DataFrame([
  # the clique
  {'x': 'a', 'y': 'b'},
  {'x': 'a', 'y': 'd'},
  {'x': 'b', 'y': 'd'}  # we will treat all edges as undirected, even though the UI rendering will be directed
  # the rest
  {'x': 'c', 'y': 'n'}],
  'x', 'y')

We may want to further include provenence information:

g2 = graphistry.edges(pd.DataFrame([
  # the clique
  {'x': 'a', 'y': 'b', 'reduced_from': 'm', 'type': m['type']}, # assumes original graphistry.nodes(df2) had col `type`
  {'x': 'a', 'y': 'd', 'reduced_from': 'm', 'type': m['type']},
  {'x': 'b', 'y': 'd', 'reduced_from': 'm', 'type': m['type']}
  # the rest
  {'x': 'c', 'y': 'n'}],
  'x', 'y')

Likewise, we can imagine updating node/edge based on aggregates like:

# node props
(a { f }) = f(a, [ (a->m), m for (a, m) in edges]) 

# edge props
(a)-[e { f }]->(b) = f(a, a->m, m, m->b, b)

My intuition is a mix of pandas/r's group/apply agg pattern for table reductions, and how tools like pregel/graphx do iterative maps on labeled <src_node, edge, dst_node> triples.

pradkrish commented 2 years ago

I would like to go through the suggested function signature. Please correct me if you sense any misunderstanding on my part.

g2 = g.replace_node_with_edges(
  select_nodes=g._nodes.query('type="m"'),
  edge_reducer=g.reducers.pair_unique,
  edge_attr_reducer={'weight': 'max'})

select_nodes are the nodes which will become edges. select_nodes can be of type str, list(str), Series or a single-columned DataFrame.

edge_reducer probably determines the combination of the newly created edges. In the following example

df = pd.DataFrame({'x': ['a', 'b'], 'y': ['m', 'm'], 'w':[30,15})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')

for select_nodes=['m'], g.reducers.pair_unique results in edges (a,b) and (b,a) but g.reducers.pair_unique_undirected should only give edge (a,b)

edge_attr_reducer sets the attributes of the newly created edges. In the following example

df = pd.DataFrame({'x': ['a', 'b', 'c','d','e'], 'y': ['m', 'm', 'n', 'm','o'], 'w':[30,15,40,20,50]})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')

for select_nodes=['m','n'], edge_attr_reducer={'weight': 'max'} will set the edge_weight of the newly created edges alone to 40. weight can probably take other reducers as well, like min, sum, mean etc.

Clarifying it here will help me with the PR, thanks :)

lmeyerov commented 2 years ago

select_node seems close! For select_nodes =g._nodes.query(...) to "just work", we'd also take a df where we slice on g._node

--

For reducers, I'm not sure of the right signatures, so feel free to propose :) If we can keep these minimal and push additional reductions to complementary operators, that'd make this simpler.

Working backwards from some sample use cases:

--

So after writing that out, maybe for the case of removing one node at a time, it's just about compute new edge props ,(edge_attr_reducer), and we don't need edge_reducer because it fine (so far) as a separate outside postprocessing call.

A super general contract would be taking functions like:

def my_edge_attr_reducer(removed_node : dict, new_src : dict, new_dst : dict, old_src_node_edge: dict, old_node_dst_edge: dict) ->List[dict]:
    new_edge_props = dict(...)
    return  new_edge_props

g.replace_node_with_edges(edge_attr_reducer=my_edge_attr_reducer)

It is close to apply_rows, which is vectorizable

A similar vectorizable general contract can be panda's .agg() on each group g1=[(a)-(m1), (m1)-(b)], g2=... where the resulting cols go to (a)-[agg(g1)]-(b), .... <-- I'm leaning more to this...

--

Fun edge case: If there's a chain like (a)-(m1)-(m2)-(b) with multiple reduced nodes, unclear what my_edge_attr_reducer should do! Especially if we want this vectorizable!

lmeyerov commented 2 years ago

I've been playing a variant for a project.

I'm thinking next step is:

def collapse_nodes_by_id(g, nodes_df):
    """
        - drop any nodes in g from nodes_df[g._source]
        - drop corresponding edges
        - ... and reroute as new edges:
          - n1->a->n2 => n1->n2
          - n1->a<-n2 => n1<-n2 (but skip self-edges)
          - n1<-a->n2 => n1<-n2 (but skip self-edges)
    """

    #remove g._nodes in nodes_df
    nodes2 = g._nodes.set_index(g._node).drop(index=nodes_df[g._node]).reset_index()

    ####

    #remove g._edges[g._source] in nodes_df
    edges_indexed = g._edges[[g._source]].reset_index()
    edges_labeled_src = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
    edges_bad_src_index = edges_labeled_src[ edges_labeled_src['drop'] == 1.0 ]['index']
    edges_good_src = g._edges.drop(index=edges_bad_src_index)

    #remove g._edges[g._destination] in nodes_df
    edges_indexed = edges_good_src.reset_index()
    edges_labeled_dst = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
    edges_bad_dst_index = edges_labeled_dst[ edges_labeled_dst['drop'] == 1.0 ]['index']
    edges_good_dst = edges_good_src.drop(index=edges_bad_dst_index)

    ####

    #for each removed node:
    #  n1->a->n2 => n1->n2
    #  n1->a<-n2 => n1<-n2 (but skip self-edges)
    #  n1<-a->n2 => n1<-n2 (but skip self-edges)

    #focus just on rebuilding removed edges
    edges_with_matching_src_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
    edges_with_matching_src = edges_with_matching_src_tagged[ edges_with_matching_src_tagged['drop'] == 1.0 ].drop(columns=['drop'])
    edges_with_matching_dst_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
    edges_with_matching_dst = edges_with_matching_dst_tagged[ edges_with_matching_dst_tagged['drop'] == 1.0 ].drop(columns=['drop'])
    edges_with_matching_src_dst = g._edges[ (edges_with_matching_src_tagged['drop'] == 1.0) | (edges_with_matching_dst_tagged['drop'] == 1.0) ]

    #  n1->a->n2 => n1->n2
    directed_edges_raw = edges_with_matching_dst.merge(
        edges_with_matching_src,
        how='left',
        left_on=g._destination,
        right_on=g._source,
        suffixes=('', '_y'))
    directed_edges = (directed_edges_raw
      .drop(columns=[g._destination, f'{g._source}_y'])
      .rename(columns={f'{g._destination}_y': g._destination}))
    #print('directed_edges', directed_edges)

    #  n1->a<-n2 => n1<-n2
    # FIXME skip self-edges
    inwards_edges_raw = edges_with_matching_dst.merge(
        edges_with_matching_dst,
        how='left',
        on=g._destination,
        suffixes=('', '_y'))
    inwards_edges = (inwards_edges_raw
        .drop(columns=[g._destination])
        .rename(columns={f'{g._source}_y': g._destination}))
    #print('inwards_edges', inwards_edges)

    #  n1<-a->n2 => n1<-n2
    # FIXME skip self-edges
    outwards_edges_raw = edges_with_matching_src.merge(
        edges_with_matching_src,
        how='left',
        on=g._source,
        suffixes=('', '_y'))
    outwards_edges = (outwards_edges_raw
        .drop(columns=[g._source])
        .rename(columns={f'{g._destination}_y': g._source}))
    #print('outwards_edges', outwards_edges)

    new_edges = pd.concat([
        edges_good_dst,
        directed_edges,
        inwards_edges,
        outwards_edges], ignore_index=True)

    return g.nodes(nodes2).edges(new_edges)

Ex:

collapse_nodes_by_id(
    graphistry.nodes(pb_with_pb[:5], 'event')
        .edges(pd.DataFrame({'s': [0, 0, 0, 1, 1, 2, 2, 3, 3, 4], 'd': [2, 1, 1, 1, 2, 2, 2, 3, 0, 5], 'v': ['aa', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}), 's', 'd'),
    pd.DataFrame({'event': [1, 3]}))._edges

Ex:

g2 = collapse_nodes_by_id(g, g._nodes[ g._nodes['category'] == 'event' ])
g2.plot()
lmeyerov commented 2 years ago

Related, label prop WIP:

def propagate_edge_labels(g, incoming=None, outgoing=None, both=None, combine=None, fillna=None, groupby_deduplicate_key=None):
    """
        incoming: optional .agg(**kwargs) for a node over its incoming edges
        outgoing: optional .agg(**kwargs) for a node over its outgoing edges
        both: reuse the same calc for both incoming + outgoing
        combine: if setting both incoming+outgoing (or via both shorthand), how to then combine the results. Otherwise, will propagate the single aggregate provided.
        fillna: nodes.fillna(...) if provided
        groupby_deduplicate_key: likely to have incoming (outgoing) edges to have a key to deplicate on before reducing, such as 'event' in hypergraphs which would eitherwise be counted multiple times
    """

    g = g.materialize_nodes()

    if both is not None:
        incoming = both
        outgoing = both

    dst_labels = None
    if incoming is not None:
        if groupby_deduplicate_key is not None:
            dst_labels = g._edges.drop_duplicates([g._destination, groupby_deduplicate_key]).groupby(g._destination).agg(**incoming)
        else:
            dst_labels = g._edges.groupby(g._destination).agg(**incoming)

    src_labels = None
    if incoming is not None:
        if groupby_deduplicate_key is not None:
            src_labels = g._edges.drop_duplicates([g._source, groupby_deduplicate_key]).groupby(g._source).agg(**outgoing)
        else:
            src_labels = g._edges.groupby(g._source).agg(**outgoing)

    nodes2 = None
    if combine is not None:
        nodes2 = g._nodes.set_index(g._node).assign(**{c: combine[c](dst_labels[c], src_labels[c]) for c in combine}).reset_index()
    else:
        nodes_d = g._nodes
        if incoming is not None:
            nodes_d = g._nodes.merge(dst_labels.reset_index().rename(columns={g._destination: g._node}), how='left', on=g._node)
        nodes2 = nodes_d
        if outgoing is not None:
            nodes2 = nodes_d.merge(src_labels.reset_index().rename(columns={g._source: g._node}), how='left', on=g._node)

    if fillna is not None:
        nodes2 = nodes2.fillna(fillna)

    return g.nodes(nodes2)
propagate_edge_labels(
    graphistry.edges(
        pd.DataFrame({
            's': ['a', 'b', 'b', 'c', 'c', 'c'],
            'd': ['a', 'b', 'c', 'a', 'b', 'c'],
            'b': [True, True, False, False, False, False],
            'v': [1, 2, 4, 6, 8, 10]
        }),
        's', 'd'),
    both={
        'sum_v': ('v', 'sum'),
        'any_b': ('b', 'any'),
        'count_b': ('b', 'sum'),
    },
    combine={
        'sum_v': (lambda c1, c2: c1 + c2),
        'any_b': (lambda c1, c2: c1 | c2),
        'count_b': (lambda c1, c2: c1 + c2)
    })._nodes
lmeyerov commented 2 years ago

Node reductions came up again in the topology-aware .collapse: https://github.com/graphistry/pygraphistry/issues/336 + https://github.com/graphistry/pygraphistry/issues/337

The optimization thinking shows it may help to separate out primitives. A marking phase can identify collapse equiv classes in diff ways , while reduction phases can compute aggregates in structured ways.