cwida / duckpgq-extension

DuckDB extension that adds support for SQL/PGQ
https://duckpgq.notion.site/b8ac652667964f958bfada1c3e53f1bb?v=3b47a8d44bdf4e0c8b503bf23f1b76f2
MIT License
86 stars 7 forks source link

UDF for local clustering coefficient #123

Closed Dtenwolde closed 5 months ago

Dtenwolde commented 5 months ago

The clustering coefficient measures the degree to which nodes tend to cluster together. Particularly social networks have a higher clustering coefficient. There is the local clustering coefficient, which calculates how close its neighbours are to being part of a clique, a complete graph. This UDF will only work on undirected graphs. Since we don't know whether the edge table is directed or undirected, we convert it in the CSR to an undirected graph. This way of creating the CSR can also be used in #87.

The global clustering coefficient is defined as the `number of closed triplets/number of all triplets (open or closed). For now, this will be left out of the scope for this issue.

https://en.wikipedia.org/wiki/Clustering_coefficient

Other systems with the same features: Neo4j: Only allows on undirected graphs Networkx Memgraph: Same as NetworkX

Dtenwolde commented 5 months ago
Some timings below after a couple of optimizations: Tested using an Apple M1 Pro, 16GB RAM. Local clustering is calculated over all nodes in the Person table, following the Person_knows_Person table. I am only testing the execution time of the query, not loading the data or any preprocessing. Using the internal timer of DuckDB .timer on using the real time and time.time() in Python: Dataset DuckPGQ UDF Native SQL DuckPGQ vs Native SQL Neo4j DuckPGQ vs Neo4j NetworkX DuckPGQ vs NetworkX
SNB1 0.106 0.337 3.18x 0.960 9.06x 2.875 27.12x
SNB3 0.283 1.234 4.36x 1.643 5.81x 10.334 36.52x
SNB10 1.478 6.816 4.61x 3.064 2.07x 48.520 32.83x

image

DuckPGQ query:

WITH edges_cte AS (
    SELECT a.rowid as src, c.rowid as dst, k.rowid as edges
    FROM Person_knows_person k 
    JOIN Person a on a.id = k.person1id
    JOIN Person c on c.id = k.person2id
), 
cte1 AS (
    SELECT  CREATE_CSR_EDGE(
            0,
            (SELECT count(a.id) FROM Person a),
            CAST (
                (SELECT sum(CREATE_CSR_VERTEX(
                            0,
                            (SELECT count(a.id) FROM Person a),
                            sub.dense_id,
                            sub.cnt)
                            ) * 2
                FROM (
                    SELECT dense_id, count(*) as cnt FROM (
                        SELECT dense_id, outgoing_edge, incoming_edge
                            FROM (
                                SELECT a.rowid AS dense_id, k.person1id AS outgoing_edge, k.person2id AS incoming_edge
                                FROM Person a
                                JOIN Person_knows_person k ON k.person1id = a.id
                                UNION ALL
                                SELECT a.rowid AS dense_id, k.person2id AS outgoing_edge, k.person1id AS incoming_edge
                                FROM Person a
                                JOIN Person_knows_person k on k.person2id = a.id)
                        GROUP BY dense_id, outgoing_edge, incoming_edge)
                   GROUP BY dense_id) sub
                )
            AS BIGINT),
            src,
            dst,
            edge) as temp FROM (
    select src, dst, any_value(edges) as edge FROM (
        select src, dst, edges from edges_cte UNION all select dst, src, edges from edges_cte)
    GROUP BY src, dst)
) SELECT id, __x.temp + local_clustering_coefficient(0, a.rowid) as lcc
        FROM (select count(cte1.temp) * 0 as temp from cte1) __x, Person a
        ORDER BY id ASC;

Native SQL query:

-- Create undirected edges by considering each directed edge in both directions
WITH undirected_edges AS (
    SELECT person1id AS person1id, person2id AS person2id FROM Person_knows_person
    UNION
    SELECT person2id AS person1id, person1id AS person2id FROM Person_knows_person
),

-- Find neighbors of each person
neighbors AS (
    SELECT person1id AS person, LISTAGG(person2id, ',') AS neighbor_list
    FROM undirected_edges
    GROUP BY person1id
),

-- Calculate the number of connections between neighbors
connections_between_neighbors AS (
    SELECT 
        e1.person1id AS person,
        COUNT(*) AS connections
    FROM undirected_edges e1
    JOIN undirected_edges e2 ON e1.person2id = e2.person1id
    JOIN undirected_edges e3 ON e2.person2id = e3.person1id AND e3.person2id = e1.person1id
    GROUP BY e1.person1id
),

-- Calculate the local clustering coefficient
local_clustering_coefficient AS (
    SELECT
        n.person AS person,
        COALESCE(c.connections, 0) AS connections_between_neighbors,
        (array_length(split(n.neighbor_list, ','), 1) * (array_length(split(n.neighbor_list, ','), 1) - 1)) / 2 AS possible_connections
    FROM neighbors n
    LEFT JOIN connections_between_neighbors c ON n.person = c.person
)

-- Final result with local clustering coefficient calculation
SELECT 
    person,
    connections_between_neighbors,
    possible_connections,
    CASE
        WHEN possible_connections > 0 THEN CAST(connections_between_neighbors AS FLOAT) / possible_connections
        ELSE 0
    END AS local_clustering_coefficient
FROM local_clustering_coefficient
ORDER BY person;

Neo4j query:

CALL gds.graph.project(
  'snb',
  'Person',
  {
    KNOWS: {
      orientation: 'UNDIRECTED'
    }
  }
)

CALL gds.localClusteringCoefficient.stream('snb')
YIELD nodeId, localClusteringCoefficient
RETURN gds.util.asNode(nodeId).id AS id, localClusteringCoefficient
ORDER BY localClusteringCoefficient DESC

NetworkX:

import os
import time
import pandas as pd
import networkx as nx

SF = 1

def timing_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Total time for {func.__name__}: {end_time - start_time} seconds")
        return result
    return wrapper

@timing_decorator
def calculate_lcc(G: nx.Graph) -> pd.DataFrame:
    """
    Calculate the local clustering coefficient for each node in the graph.

    Args:
    G (nx.Graph): A NetworkX graph.

    Returns:
    pd.DataFrame: A DataFrame containing nodes and their local clustering coefficients.
    """
    clustering_coefficients = nx.clustering(G)
    clustering_df = pd.DataFrame(clustering_coefficients.items(), columns=['id', 'local_clustering_coefficient'])
    return clustering_df

@timing_decorator
def snb_graph():
    """
    Load SNB graph data, create the graph, and calculate local clustering coefficients.
    """
    try:
        person_df = pd.read_csv(os.path.join('..', 'data', f'SNB{SF}-projected|', 'person.csv'), sep='|')
        person_knows_person_df = pd.read_csv(os.path.join('..', 'data', f'SNB{SF}-projected|', 'person_knows_person.csv'), sep='|')
    except FileNotFoundError as e:
        print(f"Error loading files: {e}")
        return

    G = nx.Graph()

    for _, row in person_df.iterrows():
        G.add_node(row['id'], **row.to_dict())

    for _, row in person_knows_person_df.iterrows():
        G.add_edge(row['Person1Id'], row['Person2Id'])

    clustering_df = calculate_lcc(G)
    clustering_df.to_csv(f'local_clustering_coefficients-sf{SF}.csv', index=False)

def main():
    snb_graph()

if __name__ == "__main__":
    main()
Dtenwolde commented 5 months ago

Now that the performance is relatively good, the next step is integrating the function into a nicer syntax and there are multiple options:

  1. We could wrap it in SQL/PGQ syntax as follows:
    FROM GRAPH_TABLE (snb MATCH (a:Person)-[k:Knows]->(b:Person) COLUMNS (local_clustering_coefficient(a))

    Caveat:

    • What if the local_clustering_coefficient function is used inside more complex patterns?
      FROM GRAPH_TABLE (snb MATCH (c:Person)-[k:Knows]->(a:Person)-[k:Knows]->(b:Person) COLUMNS (local_clustering_coefficient(a))

      What do we calculate the local clustering coefficient over in this case? All nodes between a and b OR a and c?

  2. Keep it like this, requiring the user to write the CSR CTE and ensure it uses the correct columns.
  3. Overload a table function (after having created the property graph), which we then rewrite using BindReplace into the query used above.
    from local_clustering_coefficient(node_table, edge_table)

At the moment I believe the third option strikes a nice balance and seems like the most natural option to choose

Dtenwolde commented 5 months ago

After implementing #124 the following query is now possible and has similar timings as the original DuckPGQ query:

select * from local_clustering_coefficient(snb, person, knows);

SNB1: 0.100s SNB3: 0.353s SNB10: 2.193s