apache / age

Graph database optimized for fast analysis and real-time data processing. It is provided as an extension to PostgreSQL.
https://age.apache.org
Apache License 2.0
3.03k stars 406 forks source link

Parrallel usage using the age wrapper #1990

Open ishaan812 opened 2 months ago

ishaan812 commented 2 months ago

I was trying to parrallelise a sync script from postgres to an age graph to make it more efficient using go routines. The prepared statement started giving issues when I did it.

parrallelVerticeCreation := func(flow models.Flow) {
        defer wg.Done()
        tx, err := age.Begin()
        // TODO: see if flow aready exists before putting in
        if err != nil {
            fmt.Println("Error in creating tx", err)
        }
        err = s.repo.CreateVertice(context.TODO(), tx, &flow)
        if err != nil {
            fmt.Println("Error in creating vertice", err)
        }
        tx.Commit()
    }
    for _, flow := range flows {
        wg.Add(1)
        go parrallelVerticeCreation(flow)
    }
        wg.Wait()
    fmt.Println("All flows created")
    return nil

the create vertice just runs a create cypher query. This did not work. The error was:

SELECT * FROM age_prepare_cypher($1, $2); weber CREATE (n:Flow {id: '2', created_at: '2024-07-26 13:22:24.289636 +0530 IST', updated_at: '2024-07-26 13:22:24.289636 +0530 IST'})
Error in creating vertice pq: function age_prepare_cypher(unknown, unknown) does not exist
SELECT * FROM age_prepare_cypher($1, $2); weber CREATE (n:Flow {id: '1', created_at: '2024-07-26 13:18:19.30531 +0530 IST', updated_at: '2024-07-26 13:18:27.125151 +0530 IST'})
Error in creating vertice pq: function age_prepare_cypher(unknown, unknown) does not exist

Although if I tried with the simple sql statement this used to work to an extent (till db got overloaded and gave up if i made >500 requests or so) which is fine.

Wanted to understand what is different with age since it must be using the same sql connection that the rdb was using so wanted to u nderstand the only thing I felt was different was the prepared statement being used. If not wanted to know how I can do this better.

jrgemignani commented 2 months ago

@ishaan812 age_prepare_string is meant to be called by a driver or something passing cstrings.

CREATE FUNCTION ag_catalog.age_prepare_cypher(cstring, cstring)
    RETURNS boolean
    LANGUAGE c
    STABLE
PARALLEL SAFE
AS 'MODULE_PATHNAME';
commit f49c322eee85e47e4cfe4a2b1cacdbe74c9f4496
Author: John Gemignani <jrgemignani@gmail.com>
Date:   Mon Nov 21 09:33:58 2022 -0800

    Add the ability to pass parameters to the cypher function

    Added the ability to pass parameters to the cypher() function via
    a function called age_prepare_cypher().

    This extra function is necessary because the cypher() function itself
    isn't actually executed - it is instead transformed and replaced. This
    means that it needs to have its input parameters resolved prior to
    that transform. However, parameters aren't resolved until the
    execution phase. So, another command to resolve them needs to run
    prior to the cypher() function call.

    This mainly impacts the drivers, which will need to be updated.

    Additionally, modified the golang driver as an example of this new
    usage.

    Added regression tests.
MironAtHome commented 2 months ago

@jrgemignani How hard would it be to make this function

CREATE FUNCTION ag_catalog.age_prepare_cypher(cstring, cstring)
    RETURNS boolean
    LANGUAGE c
    STABLE
PARALLEL SAFE
AS 'MODULE_PATHNAME';

return an handle to an prepared statement, so that it can be called using something like

select * from ag_catalog.age_execute_prepared_cypher(handle, parameter_map) as t(a agtype);

many times in a loop. The advantage being fairly obvious, on my mind. Once compiled statement called in the loop in theory should bring an significant performance improvement. The question regarding multiple threads will still remains. It's not related to multiple thread invocation. Only retention of execution plan using handle variable.

@ishaan812 I apologies for barging into question with my own, somewhat unrelated sideline. Hope it is accepted in vein of caring to make age graph database better :)

jrgemignani commented 2 months ago

@MironAtHome This command was added to fix a very specific security issue in the drivers. It allows the cypher() command to be parameterized and the wrapping to be separated from the parameters. It really isn't meant for anything else. Look at the Python and Golang drivers to see how it is used.

MironAtHome commented 2 months ago

Thanks John. I didn't realise it is actually working the way I asked. Took a bit more learning :)

jrgemignani commented 2 months ago

@MironAtHome Just understand that it wasn't intended nor expected to be used outside of this [security issue] context. So it may not work as expected. And, come to think of it, it may not work in parallel. I need to think about the latter.

The general idea behind it was to allow parameters to be passed to the cypher() function. The issue being that cypher($1, $2) doesn't work due to when parameters ($1, $2) are resolved (in the execution phase) and when they are needed (in the cypher parse phase which comes before the execution phase).

By using age_prepare_cypher($1, $2) the parameters are resolved at execution of this command and a global structure for that specific process is set up to contain, for one time only, the graph name and cypher command. The very next command needs to be cypher(NULL, NULL). Any other cypher command will error out and reset it. That next cypher(NULL, NULL) will then use those resolved parameters. The structure is then cleared so that it can be used again.

That is basically how the two commands work together age_prepare_cypher($1, $2) then cypher(NULL, NULL).

MironAtHome commented 2 months ago

@jrgemignani appreciate clarification and thank you John for your additional notes. Very - very nice and very helpful. @ishaan812 once again, thank you for your kindly letting me ask a few clarifying questions regarding this feature in your question.

ishaan812 commented 1 month ago

@MironAtHome Just understand that it wasn't intended nor expected to be used outside of this [security issue] context. So it may not work as expected. And, come to think of it, it may not work in parallel. I need to think about the latter.

The general idea behind it was to allow parameters to be passed to the cypher() function. The issue being that cypher($1, $2) doesn't work due to when parameters ($1, $2) are resolved (in the execution phase) and when they are needed (in the cypher parse phase which comes before the execution phase).

By using age_prepare_cypher($1, $2) the parameters are resolved at execution of this command and a global structure for that specific process is set up to contain, for one time only, the graph name and cypher command. The very next command needs to be cypher(NULL, NULL). Any other cypher command will error out and reset it. That next cypher(NULL, NULL) will then use those resolved parameters. The structure is then cleared so that it can be used again.

That is basically how the two commands work together age_prepare_cypher($1, $2) then cypher(NULL, NULL).

Oooo, makes sense now, So if I want to run it in parrallel I will have to run a cypher(NULL, NULL) after every age_prepare_cypher call.

Appreciated your help to help answer my question @MironAtHome

Edit: This practice is also causing other issues I think @jrgemignani. If I have various endpoints and if one of the queries goes wrong due to some error. That also means on the same connection the prepared function would not work on any other endpoints. Since I guess, we would have to call cypher(NULL,NULL) after the failed query.

ishaan812 commented 1 month ago

Hey @jrgemignani, sorry to tag you again on this but could you help me out with this question since it is not predicatable when the function breaks. How can I recover from this will it just be a cypher(NULL,NULL) if it breaks and it'll be fine?

jrgemignani commented 1 month ago

@ishaan812 The function resets if it isn't used immediately by the process that created it. However, I do see some issues with the logic it uses and will need to work on it once I finish this other higher priority issue.