nodestream-proj / nodestream

A Declarative framework for Building, Maintaining, and Analyzing Graph Data
https://nodestream-proj.github.io/docs/
Apache License 2.0
38 stars 11 forks source link

Support Analytics Pipelines #252

Open zprobst opened 10 months ago

zprobst commented 10 months ago

Background

One of the primary use cases for using graph databases is the use of analytics and ML workloads.

Requirements / Principles

If nodestream were to support analytics jobs, it would be ideal for it to support the same core principles that the remainder of nodestream supports.

Implementation Details

Implementation could essentially follow a similar design approach as migrations is taking. The core framework handles as much as is prudent and defers to the database connector (which can optionally support the feature) to perform the actual work of data analysis. Steps like copy and export mentioned below can be implemented using nodestream's existing copy and pipelines features to retrieve and map data.

Example Project File

# nodestream.yaml
scopes:
   # ... for data pipelines 

analyses:
  - analyses/example.yaml

targets:
    anaylitics-graph: 
       # ....
    persistent-graph:
      # ... 

Example Analysis File

This example pipeline facilitates the copying of data from persistent-graph to anaylitics-graph. From there it runs some topological analysis algorithms and persists the results back in persistent-graph.

# analyses/example.yaml
phases:
  # Before we can run the analysis, we need to copy the data into the graph. 
  # This step will copy the data from the target specified in nodestream.yaml into the graph.
  # If you are using a persistent graph, you may not need to run this step. 
  - name: Copy Data
    step: copy
    source: persistent-graph
    nodes:
      - Person
    relationships:
      - KNOWS

  # Project tells the connector which nodes and relationships to include in the analysis. 
  # For instance, in the case of GDS, this will run a projection. 
  - name: Project Graph
    step: project
    projection:
      nodes:
        - Person
      relationships:
        - KNOWS

  # Next is some example algorithms that we are running. 
  - name: Run Weakly Connected Components
    step: algorithm
    algorithm: weaklyConnectedComponents
    parameters:
      writeProperty: community

  - name: Run Degree Centrality
    step: algorithm
    algorithm: degreeCentrality
    parameters:
      node_types:
        - Person
      relationship_types:
        - KNOWS
      # weightProperty: weight; optional
      writeProperty: degreeCentrality

  # The export step will export the results of the analysis to the specified target.
  # The target must be specified in nodestream.yaml.
  # Internally, this will build a nodestream pipeline to extract the data from the graph and write it to the target.
  - name: Export Results
    step: export
    target: persistent-graph
    nodes:
      - type: Person
        properties:
          - degreeCentrality
          - community

Can be run with nodestream analytics run example --target anaylitics-graph

ccloes commented 9 months ago

This functionality seems good.

Primary use case: Leverage your existing graph for analytic workloads, but abstract it away from specific db implementations.

Thoughts/Discussions:

zprobst commented 9 months ago

Based on feedback, a redesign using existing pipeline concepts:

- implementation: nodestream.analytics:CopyData
  arguments:
    source: persistent-graph
    nodes:
      - Person
    relationships:
      - KNOWS

- implementation: nodestream.analytics:ProjectGraph
  arguments:
    nodes:
      - Person
    relationships:
      - KNOWS

- implementation: nodestream.analytics:RunAlgorithm
  arguments:
    algorithm: weakly_connected_components
    parameters:
      writeProperty: "weakly_connected_components"

- implementation: nodestream.analytics:RunAlgorithm
  arguments:
    algorithm: degree_centrality
    parameters:
      writeProperty: "degree_centrality"
      node_types:
        - Person
      relationship_types:
        - KNOWS

- implementation: nodestream.analytics:ExportResultsToDatabase
  arguments:
    target: persistent-graph
    nodes:
      - type: Person
        properties:
          - weakly_connected_components
          - degree_centrality

Also adding a run query command for an escape hatch / incremental modifications:

- implementation: nodestream.analytics:RunCypher
  arguments:
    query: "MATCH (n:Person) where (n)-[:KNOWS]->(:Person{name: "Zach"}) set n:ZachFriend"
grantleehoffman commented 9 months ago

Yeah I like this, I think it will be easier on the end-user using the same pipeline construct formats for the analytics pipelines

ccloes commented 9 months ago

I like this approach... its incremental and reduces the cognitive requirements on the end users for understanding concepts.

👍

bechbd commented 9 months ago

Overall, I think this makes sense, I have a few other thoughts/comments below.

1/ One thing to highlight here is that some of the concepts here seem very specific to Neo4j versus the way that other DBs handle things. One specific area is the "persistent" versus "projected" graph constructs. This is the way that Neo approaches analytics, however most other databases such as Neptune Analytics, Memgraph, TigerGraph, etc. allow for running these on the data instead of a projection.

I am just curious if the proposal here needs any changes or if it is just that in these scenarios where the pipeline might just be much simpler and fewer steps for other DBs versus Neo?

e.g. In Neptune Analytics, you can run WCC and Degree (similar to https://github.com/nodestream-proj/nodestream/issues/252#issuecomment-1936434656) and save the data back to the graph using just this:

- implementation: nodestream.analytics:RunAlgorithm
  arguments:
    algorithm: weakly_connected_components_mutate
    parameters:
      writeProperty: "weakly_connected_components"

- implementation: nodestream.analytics:RunAlgorithm
  arguments:
    algorithm: degree_centrality_mutate
    parameters:
      writeProperty: "degree_centrality"
      vertexLabel:
        - Person
      edgeLabels:
        - KNOWS

2/ One of the other differences in Neptune Analytics is that the implementation integrates the algorithms into OC syntax in a much richer way than supported in Neo, which allows for chaining multiple algorithms together or combining OC's composability constructs with algorithms.

Examples:

MATCH (hq:HolderQuarter)<--(holder:Holder)
WHERE id(hq) = $hq_id
MATCH (co_hq:HolderQuarter)<--(coholder:Holder)
WHERE NOT id(co_hq) = $hq_id
CALL neptune.algo.jaccardSimilarity(hq, co_hq)
YIELD score
RETURN holder.name, coholder.name, score
ORDER BY score DESC LIMIT 10

or

MATCH (h:Holder)-->(hq)
WHERE id(hq) = $hq_id
WITH h
CALL neptune.algo.vectors.topKByNode(h)
YIELD node, score
WHERE score > 0
WITH node as coholder LIMIT 10
MATCH (holding)<-[o:owns]-(hq)<--(coholder:Holder)
WHERE hq.name = '2023Q4' AND o.value > 10000000000
WITH coholder.name as name, collect(DISTINCT holding.name) as coholdings
RETURN name, size(coholdings) as number_shared, coholdings
ORDER BY number_shared DESC

For these sorts of use cases, would we just recommend using the nodestream.analytics:RunCypher implementation?

3/ Any thoughts on allowing for additional constructs like generating embeddings for adding to the loaded data?

4/ What about allowing for saving of query results to a datalake/S3 bucket or something similar instead of back into a DB?

I highlight this as one of the common use cases I am seeing is CX's wanting to use an analytics graph to generate features for downstream model training. So the general pattern is - Load Data, run a few queries, save the results to S3 (or similar). This may be a bit more than what nodestream is intended for, but I thought it was at least worth discussing

zprobst commented 9 months ago

@bechbd Thanks for the feedback. Here are some of my questions/thoughts/reactions to your comments.

I am just curious if the proposal here needs any changes or if it is just that in these scenarios where the pipeline might just be much simpler and fewer steps for other DBs versus Neo?

Thats a good question. I think that this is probably actually a candidate for naming. Internally, nodestream is going to have to retain some state at this part of the pipeline, regardless of the underlying connector. I was kind of borrowing the ProjectGraph term as a method to explicitly declare what data the analytics pipeline is going to work on so it can do that.

Maybe something more like PrepareAnalysis?

One of the other differences in Neptune Analytics is that the implementation integrates the algorithms into OC syntax in a much richer way than supported in Neo, which allows for chaining multiple algorithms together or combining OC's composability constructs with algorithms.

Yeah thats a good question... I hadn't thought of that. I do think this model as it is today limits the possibility of doing any kind of chaining. I think for now, this definitely does fall into the kind of problem set that the RunCypher escape hatch is going to be used for. How does that sit with you? To me, its not obvious how we can both abstract that and get the more "native" benefits.

Any thoughts on allowing for additional constructs like generating embeddings for adding to the loaded data?

Absolutely! I think this is totally on the cards. The algorithms represented were more for demonstration purposes that meant as a comprehensive list. We're getting to the phase in this where the concept is validated and we can move into scoping an initial version.

What about allowing for saving of query results to a datalake/S3 bucket or something similar instead of back into a DB?

This is a good idea. It would definitely take some designing - maybe as a phase 2 for this - but it absolutely make sense. I had a person ask me in person yesterday if the intention was to support this to feed into a sage maker model and the like. For those that are extracting features for the ML model.

zprobst commented 4 months ago

I've removing this from the 0.13 milestone. Likely this will be moved from the code of nodestream to a sister package.