memgraph / mage

MAGE - Memgraph Advanced Graph Extensions :crystal_ball:
Apache License 2.0
246 stars 24 forks source link

Rust query modules with parallelisation #328

Open risufaj opened 1 year ago

risufaj commented 1 year ago

Hello,

I've been trying to develop a query module in rust that tries to parallelize some loops using the rayon library. The parallelization loops involve accessing nodes and edges. However, I get this error : G cannot be shared between threads safely where G is a MemgraphGraph object. I've seen that some of the query modules in C++ have parallelisation, but I haven't been able to find anything for rust.

Here is some code to reproduce this. Cargo.toml

name = "parallel-example"
version = "0.1.0"
edition = "2018"

[dependencies]
c_str_macro = "1.0.2"
rayon = "1.5"

rsmgp-sys = { git = "https://github.com/memgraph/mage.git", tag="v1.7.0"}

[lib]
name = "parallel_example"
crate-type = ["cdylib"]

The src folder contains these files: lib.rs

mod example;

use crate::example::MemgraphGraph;
use crate::example::example as example_algorithm;
use c_str_macro::c_str;
use rsmgp_sys::memgraph::*;
use rsmgp_sys::mgp::*;
use rsmgp_sys::result::*;
use rsmgp_sys::rsmgp::*;
use rsmgp_sys::value::*;
use rsmgp_sys::{close_module, define_optional_type, define_procedure, define_type, init_module};
use std::collections::{HashMap};
use std::ffi::{CString};
use std::os::raw::c_int;
use std::panic;

init_module!(|memgraph: &Memgraph| -> Result<()> {
    memgraph.add_read_procedure(
        example,
        c_str!("example"),
        &[define_type!("node_list", Type::List, Type::Int),],
        &[],
        &[
            define_type!("node_id", Type::Int),
        ],
    )?;
    Ok(())
});

fn write_nodes_to_records(memgraph: &Memgraph, nodes: Vec<i64>) -> Result<()> {
    for node_id in nodes {
        let record = memgraph.result_record()?;
        record.insert_int(c_str!("node_id"), node_id)?;
    }
    Ok(())
}

define_procedure!(example, |memgraph: &Memgraph| -> Result<()> {
    let args = memgraph.args()?;
    let Value::List(node_list) = args.value_at(0)? else {
        panic!("Failed to read node_list")
    };

    let node_list: Vec<i64> = node_list
        .iter()?
        .map(|value| match value {
            Value::Int(i) => i as i64,
            _ => panic!("Failed converting node_list to vector"),
        })
        .collect();

    let graph = MemgraphGraph::from_graph(memgraph);

    let result = example_algorithm(
        graph,
        &node_list
    );
    write_nodes_to_records(memgraph, result)?;
    Ok(())
});

close_module!(|| -> Result<()> { Ok(()) });

example.rs

use rsmgp_sys::memgraph::*;
use rsmgp_sys::result::Error as MgpError;
use rsmgp_sys::value::*;
use std::io;
use c_str_macro::c_str;
use rayon::prelude::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Vertex {
    pub id: i64,
}

#[derive(Debug)]
pub enum GraphError {
    IoError(io::Error),
    MgpError(MgpError),
}

impl From<io::Error> for GraphError {
    fn from(error: io::Error) -> Self {
        Self::IoError(error)
    }
}

impl From<MgpError> for GraphError {
    fn from(error: MgpError) -> Self {
        Self::MgpError(error)
    }
}

pub trait Graph {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError>;
    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError>;
    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn add_vertex(&mut self, vertex: Vertex) -> Result<(), GraphError>;
    fn add_edge(&mut self, source: Vertex, target: Vertex, weight: f32) -> Result<(), GraphError>;
    fn num_vertices(&self) -> usize;
    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex>;
    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
}

pub struct MemgraphGraph<'a> {
    graph: &'a Memgraph,
}

impl<'a> MemgraphGraph<'a> {
    pub fn from_graph(graph: &'a Memgraph) -> Self {
        Self { graph }
    }
}

impl<'a> Graph for MemgraphGraph<'a> {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError> {
        let vertices_iter = self.graph.vertices_iter()?;
        let vertices: Vec<_> = vertices_iter.map(|v| Vertex { id: v.id() }).collect();
        Ok(vertices)
    }

    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let iter = vertex_mgp.in_edges()?.map(|e| {
            let target_vertex = e.from_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let incoming_edges: Vec<_> = iter.collect();
        Ok(incoming_edges)

    }

    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let outgoing_edges_iter = vertex_mgp.out_edges()?.map(|e| {
            let target_vertex = e.to_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let outgoing_edges: Vec<_> = outgoing_edges_iter.collect();
        Ok(outgoing_edges)
    }

    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex,f64)>, GraphError> {
        let mut outgoing_edges = self.outgoing_edges(vertex).unwrap();
        let incoming_edges = self.incoming_edges(vertex).unwrap();

        outgoing_edges.extend(incoming_edges);

        Ok(outgoing_edges)
    }

    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError> {
        let mut neighbors = vec![];
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let neighbors_iter = vertex_mgp.out_edges()?.map(|e| e.to_vertex());
        for neighbor_mgp in neighbors_iter {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        let neighbors_in = vertex_mgp.in_edges()?.map(|e| e.from_vertex());
        for neighbor_mgp in neighbors_in {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        Ok(neighbors)
    }

    fn add_vertex(&mut self, _vertex: Vertex) -> Result<(), GraphError> {
        !unimplemented!()
    }

    fn add_edge(&mut self, _source: Vertex, _target: Vertex, _weight: f32) -> Result<(), GraphError> {
        // let source_mgp = self.graph.vertex_by_id(source.id)?;
        // let target_mgp = self.graph.vertex_by_id(target.id)?;
        // self.graph.create_edge(source_mgp, target_mgp, weight)?;
        // Ok(())
        !unimplemented!()
    }

    fn num_vertices(&self) -> usize {
        self.graph.vertices_iter().unwrap().count()
    }

    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex> {
        match self.graph.vertex_by_id(id) {
            Ok(_) => Some(Vertex { id }),
            Err(_) => None,
        }
    }
}

pub fn example<G: Graph>(
    graph: G,
    node_list: &[i64]
) -> Vec<i64> {

    node_list.par_iter()
        .filter_map(|&node_id| {
            graph.get_vertex_by_id(node_id)
        })
        .flat_map(|node| {
            graph.neighbors(node).unwrap_or_else(|_| Vec::new())
        })
        .map(|vertex| vertex.id)
        .collect()
}

Any help on how to move this forward would be great.

gitbuda commented 1 year ago

@risufaj very nice module / issue 😃 Thanks for pinging!

Yes, Rust API is not as polished as C/C++, but there is a plan to improve it, this will help 💪

I'll try to deep-dive in the following days, in the meantime, if you have some suggestions on the API side, feel free to dump them here or make a new PR with the improvements 👀

risufaj commented 1 year ago

There are definitely workarounds. For example, in my use case, I'm practically doing random walks as an approximations for personalized pagerank. Then, it is enough to calculate the neighbors for each node and store this information in a HashMap for instance, and use rayon to paralellize the loops over those. This is very likely not optimized, but I just tried it today to show that it works. Also, from what I understand, something similar to this is done for the C++ implementation of pagerank.

However, it makes sense to have a better and more general solution, that also doesn't require to first go over the whole graph and then do what you need to do every time the query module is called. I know I'm not proposing anything concrete yet, but we'll get there.

gitbuda commented 1 year ago

@risufaj I've put your code into the #336 PR and made it compilable with the Send&Sync trait implementations (just "hints"). But, the underlying object has to be implemented to be safe.

It's not a huge priority for me now, but I'll try to figure out / implement thread safety in the following days/weeks.

If you have any suggestions, just put them here or contribute directly to the PR 😃

risufaj commented 1 year ago

Thank you @gitbuda This does allow now for parallel access to the graph. However, there is no speedup when compared to the workaround that I mentioned before. That's probably to do with the larger usecase though.