emehrkay / Pypher

Python Cypher Querybuilder
MIT License
164 stars 29 forks source link

More elaborate function calls, e.g., batching openai embeddings #57

Open lvijnck opened 2 months ago

lvijnck commented 2 months ago

Hello,

I'm trying to codify the following query in Pypher for readability, but I don't seem to get quite far:

CALL apoc.periodic.iterate(
    "MATCH (p) RETURN p",
    "
     CALL apoc.ml.openai.embedding([item in $_batch | labels(item.p)[1]], $apiKey, $configuration) 
    YIELD index, text, embedding
    CALL apoc.create.setProperty($_batch[index].p, 'rrrr', embedding) YIELD node
    RETURN count(*)
    ",
    {batchMode: "BATCH_SINGLE", batchSize: 2000, params: $ai_config}
)
YIELD batch, operations

I was trying to create custom classes to represent apoc.ml.openai.embedding and apoc.periodic.iterate, but when I do that the "CALL" keyword does not seem to show up in the query. Any recommendations?

from pypher import __,  create_function, Pypher

from pypher.builder import Func

class ApocIterate(Func):
  _CAPITALIZE = False
  _ALIASES = ['periodic_iterate', 'apoc_periodic_iterate']
  name = 'apoc.periodic.iterate'

class OpenAIEmbedding(Func):
  _CAPITALIZE = False
  _ALIASES = ['openai_embedding', 'apoc_ml_openai_embedding']
  name = 'apoc.ml.openai.embedding'

from pypher import Pypher
q = Pypher()
q.apoc_iterate(
  __.MATCH.node("n", labels="Entity").RETURN.n, 
  __.openai_embedding(__.n.property('category'))
)

print(str(q))

Returns

apoc_iterate MATCH (n:`Entity`) RETURN n, apoc.ml.openai.embedding(n.`category`)

Observations:

emehrkay commented 2 months ago

You don't include CALL anywhere in your Pypher query. Either .call.apoc_iterate should work or you can create a new statement for call

create_statement('call', {'name': 'CALL'})

I can test it out later and get back to you

emehrkay commented 2 months ago

yeah, you can just append a .call. to your Pypher (this is a screenshot from the tester.py) Screenshot 2024-07-10 at 9 21 56 AM

emehrkay commented 2 months ago

It can even be p.CALL.apoc_iterate and CALL (all caps) will be in the resulting Cypher string

lvijnck commented 2 months ago

@emehrkay Maybe I'm missing something here, but the apoc.periodic.iterate requires a stringified version of the query to execute iteratively as the first argument, i.e.,

CALL apoc.periodic.iterate(
    "stringified_cypher_query_1",
    "stringified_cypher_query_2",
    {batchMode: "BATCH_SINGLE", batchSize: 2000, params: $ai_config}
)
YIELD batch, operations

How would you set that up?

P.S. Whenever I add the "call" the parenthesis are also missing from the resulting cypher query.

P.S2. The iterate has this special $_batch variable, how can this be accessed in pypher?

P.S3. How does yield work with multiple variables? (you can use with YIELD(""), so solved

emehrkay commented 2 months ago

I think you can achieve that by extending FuncRaw which doesn't bind its arguments

from pypher.builder import FuncRaw

class ApocIterate(FuncRaw):
  _CAPITALIZE = False
  _ALIASES = ['periodic_iterate', 'apoc_periodic_iterate']
  name = 'apoc.periodic.iterate'

class OpenAIEmbedding(FuncRaw):
  _CAPITALIZE = False
  _ALIASES = ['openai_embedding', 'apoc_ml_openai_embedding']
  name = 'apoc.ml.openai.embedding'

p.ApocIterate(
  __.MATCH.node("n", labels="Entity").RETURN.n,
  __.openai_embedding(__.n.property('category'))
)
emehrkay commented 2 months ago

P.S3. How does yield work with multiple variables?

maybe a map would work here

https://github.com/emehrkay/Pypher?tab=readme-ov-file#maps

lvijnck commented 2 months ago
from pypher import __, Pypher

from pypher.builder import Func

from pypher import __,  create_function, Pypher

from pypher.builder import Func, FuncRaw

class ApocIterate(FuncRaw):
  _CAPITALIZE = False
  _ALIASES = ['periodic_iterate', 'apoc_periodic_iterate']
  name = 'apoc.periodic.iterate'

class OpenAIEmbedding(FuncRaw):
  _CAPITALIZE = False
  _ALIASES = ['openai_embedding', 'apoc_ml_openai_embedding']
  name = 'apoc.ml.openai.embedding'

class ApocSetProperty(FuncRaw):
  _CAPITALIZE = False
  _ALIASES = ['set_property', 'apoc_create_set_property']
  name = 'apoc.create.setProperty'

from pypher import Pypher
p = Pypher()

p.ApocIterate(
  f"'{__.MATCH.node('n', labels='Entity').RETURN.n}'",
  f"'{__.openai_embedding(__.n.property('category'), '$apiKey', '$configuration').YIELD('index', 'text', 'embedding').append(__.CALL.set_property('$attr', 'embedding').YIELD.node)}'",
  '{batchMode: "BATCH_SINGLE", batchSize: $batchSize, params: {apiKey: $apiKey, configuration: $configuration}}'
).YIELD('batch', 'operations')

print(str(p))
print(p.bound_params)

I'm getting slightly closer, I'm currently using f-strings to format the subqueries for apoc.periodic.iterate. I'm still trying to figure out the following gaps:

  1. List comprehension of the first argument for the embedding, how can I represent this more natively in a class manner? i.e.,
[item in $_batch | {"+".join(["item.p." + feat for feat in features])}]
  1. The apoc.periodic.iterate has a third argument that specifies the variable mappings to use, I currently have this hard coded, any idea on how to do this a bit better?
lvijnck commented 1 month ago

@emehrkay Hi Mark, brief update and thanks for the input earlier. I've landed at my own custom implementation of __.stringify that stringifies the arguments that are passed into apoc.periodic.iterate. I've thus landed on the following definition (the stringify function also allows passing in a list, as opposed to having to leverage the .append() syntax):

class Stringify(FuncRaw):
    """Pypher Stringify function.

    Custom Pypher function to represent stringification of a Cypher query. This is relevant
    for operations such as `apoc.periodic.iterate`, which expects stringified cypher queries
    as arguments.
    """

    def get_args(self):
        """Function to retrieve args."""
        args = []

        for arg in self.args:
            # NOTE: Allows specifying multiple statements as an array
            if isinstance(arg, list):
                arg = " ".join([str(el) for el in arg])

            if isinstance(arg, (Pypher, Partial)):
                arg.parent = self.parent

            args.append(f"'{arg}'")

        return ", ".join(args)

    def __unicode__(self):
        """Unicode function."""
        return self.get_args()

def batch_openai_embeddings(api_key, endpoint, attribute, model):
    # Register functions
    create_function("iterate", {"name": "apoc.periodic.iterate"}, func_raw=True)
    create_function("openai_embedding", {"name": "apoc.ml.openai.embedding"}, func_raw=True)
    create_function("set_property", {"name": "apoc.create.setProperty"}, func_raw=True)

    # Build query
    p = Pypher()

    # https://neo4j.com/labs/apoc/4.1/overview/apoc.periodic/apoc.periodic.iterate/
    p.CALL.iterate(
        # Match query
        cypher.stringify(cypher.MATCH.node("p", labels="Entity").RETURN.p),
        # Query to execute per batch
        cypher.stringify(
            [
                cypher.CALL.openai_embedding(f"[item in $_batch | {'+'.join(f'item.p.{attr}' for attr in features)}]", "$apiKey", "{endpoint: $endpoint, model: $model}").YIELD("index", "text", "embedding"),
                cypher.CALL.set_property("$_batch[index].p", "$attribute", "embedding").YIELD("node").RETURN("node"),
            ]
        ),
        cypher.map(
            batchMode="BATCH_SINGLE",
            parallel="true",
            batchSize=batch_size,
            concurrency=50,
            params=cypher.map(apiKey=api_key, endpoint=endpoint, attribute=attribute, model=model),
        ),
    ).YIELD("batch", "operations")

    return p

Still feel like there's some hardcoding going on, but did not feel like pushing it any further. Dropping result in-case someone might benefit from it.