neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Create node dynamically from kafka topic #527

Closed aissaelouafi closed 2 years ago

aissaelouafi commented 2 years ago

Hello,

I use the kafka streams plugin to ingest data into neo4j database from kafka topics. It working fine when we map a kafka topic to a given node, for example :

streams.sink.topic.pattern.node.person-topic.to.testdb2=Person{!id}

The topic data looks like :

[{id:1, name : Leo},{id: 2, name: Kevin}]

But I want to create node label dynamically from another kafka topic, I want to extract the label node from a data attribute. For example I want to create 2 differents node Developer and Player from the same kafka topic based on an attribute called type. The data looks like :

[{id:1, name : Leo, type : football_player }, {id: 2, name: Kevin, type : software_engineer}]

I want to create 2 nodes :

I tried with using the following Cypher template :

streams.sink.topic.cyher.person-topic.to.testdb2=MERGE (p:{event.type}) ON CREATE set p.id = event.id, p.name = event.name

I got the following exception :

exception=org.neo4j.graphdb.QueryExecutionException: Invalid input '{': expected whitespace or a label name (line 1, column 34 (offset: 33))

is there a way to create node dynamically from the same kafka topic using kafka streams ?

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

you should use the APOC apoc.create.node in your query. Please see the APOC doc at the following: https://neo4j.com/labs/apoc/4.2/overview/apoc.create/apoc.create.node/.

In your case you have to replace the query with the following:

streams.sink.topic.cyher.person-topic.to.testdb2=call apoc.create.node(event.type, {id: event.id, name: event.name}) YIELD node RETURN node

You should also change a bit the format of your json events, so that event.type would be an array:

{id:1, name : Leo, type : [football_player] }

Hope this helps!

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Thanks a lot for you answer and your help as usual. Unfortunately we dont have the control over the input data, we can't modify the type.

The main idea is to be able to create a map function (based on if conditions for example) between the type attribute and the node label.

for example :

The idea is to create dynamically the node based on a condition on an attribute, in this case type attribute

if type in ['football_player', 'tennis_player', 'basketball_player']:
   node = 'Player'
elif type in ['software_engineer', 'devops', 'web_developer']:
   node = 'Developer'

I dont know if it's possible to do that using cypher template.

Regards and thanks a lot again for you help @mroiter-larus.

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

for conditional executions you can use APOC apoc.do.case, https://neo4j.com/labs/apoc/4.3/overview/apoc.do/apoc.do.case/

For example:

streams.sink.topic.cyher.person-topic.to.testdb2=call apoc.do.case([event.type IN ['football_player', 'tennis_player', 'basketball_player'], 'CREATE (n:Player) SET n.id=event.id, n.name=event.name RETURN n as node', event.type IN ['software_engineer', 'devops', 'web_developer'], 'CREATE (n:Developer) SET n.id=event.id, n.name=event.name RETURN n as node'], 'CREATE (n:General) RETURN n as node', {event:event}) YIELD value RETURN value.node

Regards,

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Thanks a lot for the provided example.

I have the following exception :

exception=org.neo4j.graphdb.QueryExecutionException: There is no procedure with the name `apoc.do.case` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.

In the config file I already add the procedure apoc.* the allowlist procedures : dbms.security.procedures.allowlist=bloom.*,gds.*,apoc.*,streams.*

I need to do another action to enable this procedure ?

Thanks a lot again.

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

try defining the following two properties:

streams.check.apoc.timeout=<ms to await for APOC being loaded, default -1 skip the wait>
streams.check.apoc.interval=<ms interval awaiting for APOC being loaded, default 1000>

please see here for further details. The problem is most likely that the plugin starts before the APOC procedures has been fully loaded.

Regards,

Mauro

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

any news?

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Sorry I was busy about something else. I just tried, I added the parameters and it's working well now.

Thanks a lot for you help as usual.

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

great! Happy to help!

Regards, Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Sorry to distrub you again but I still have a question.

I want to create the relationships between the nodes created before dynamically using another kafka topic that contains the relationships between the node.

The input data looks like :

[{'parent_id':1, 'child_id':2, 'type':'brother'},
{'parent_id':4, 'child_id':5, 'type':'friend'}]

I can't have the node label on the input json, I tried to use a MATCH query to get nodes dynamically and to create the relationships but It's not working, this is the cypher query :

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id=event.child_id AND parent.id=event.parent_id CREATE (parent)-[r:event.type]->(child) RETURN r

I don't know if we can perform this kind of cypher queries and if it's efficient in term of performance because we can have many thousands of nodes.

Thanks a lot for your precious help again.

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

in order to increase the query performance you should add the node's label you want to match (but if you can't have the node label on the input json it obviously can't be done) and create also an index for the property id of that nodes:

CREATE INDEX index_name_parent FOR (n:LabelParent) ON (n.id)
CREATE INDEX index_name_child FOR (n:LabelChild) ON (n.id)

Please read here for further details about indexes.

Furthermore, since you can't add labels, to safely perform the query you've shared, you must be sure about the uniqueness of the ids. I mean, you must be sure that parent node with id=1 can't exists together with a child node with id=1, otherwise your query will match all the nodes with id=1 and you'll never know which is the parent and which is the child.

About the query, be careful doing MATCH (child), (parent)... because this will generate a cartesian product between the two nodes. You can also include your where condition into the match clause, like that:

MATCH (child: {id: event.child_id}) MATCH (parent: {id: event.parent_id}) CREATE...

I noticed also that your Streams configuration is pointing to two different databases, testdb2 and test-db2:

streams.sink.topic.cyher.person-topic.to.testdb2=call apoc.do.case([event.type...
streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent)...

This might be a problem because the query which is responsible for relationships creation will never works if the nodes does not exists in the database.

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Thanks a lot for your precious comments and help. Yes I created the index on the id column to avoid/reduce the performances issues. For the database, I use the same database, It was just a mistake from my side.

I tried the following cypher query :

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child:{id: event.child_id}) MATCH (parent: {event.parent_id}) CREATE (parent)-[r:{event.type}]->(child) RETURN r

But I got the following exception :

exception=org.neo4j.graphdb.QueryExecutionException: Invalid input '{': expected whitespace or a label name (line 1, column 38 (offset: 37))
"UNWIND $events AS event MATCH (child:{id: event.child_id}) MATCH (parent: {event.parent_id}) CREATE (parent)-[r:{event.type}]->(child) RETURN r"

I dont know if we need to add the label (I can't have this information on the relationships Kafka topic) or it's a syntax problem of the cypher query.

Thanks again @mroiter-larus.

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

the exception is caused by a syntax error because the label were missing. My bad. You just need to add the label into the match clauses:

MATCH (child:LabelChild {id: event.child_id}) MATCH (parent:LabelParent {event.parent_id}) CREATE (parent)-[r:{event.type}]->(child) RETURN r

The problem here is that you don't know which label has to be specified. I mean, you don't know the nodes label to which events into relationships-topic are referring to. Is it correct?

If so, you can't create the relationship like that.

The only thing you can do here is create two different topics:

so that you can configure two different Sink cypher queries, one for each type of relationships. For example:

streams.sink.topic.cypher.relationships-player-topic.to.test-db2=MATCH (child:Player {id: event.child_id}) MATCH (parent:Player {event.parent_id}) CREATE (parent)-[r:{event.type}]->(child) RETURN r
streams.sink.topic.cypher.relationships-developer-topic.to.test-db2=MATCH (child:Developer {id: event.child_id}) MATCH (parent:Developer {event.parent_id}) CREATE (parent)-[r:{event.type}]->(child) RETURN r

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Yes exactly we dont know which nodes label into relationships-topic. Unfortunately we can't create a specific topic for each relationship because we have a thousands of relationships and we dont have the control on the kafka topic, we are just a consumer of the kafka topic. As specified before, we use the apoc.do.case to create node dynamically because we have only one kafka topic that contains all nodes and we need to create the relationships between the node created before dynamically using the apoc.do.case procedure. We have the same issue here, we can't have a kafka topic for each relationship type. I don't know if there is a way to create the relationships based on a MATCH using id label without specifying the node label ?

Many thanks again for you help.

Regards, Aissa

aissaelouafi commented 2 years ago

Hi again @mroiter-larus,

As I told you before I tried this query without specifiying the node label, it's working in the browser but I dont know if it's supported by the kafka sink plugin.

MATCH (child),(parent) WHERE child.id=id1 AND parent.id=id2 CREATE (parent)-[r:TEST]->(child) RETURN r

But the issue as you said is the query will generate a cartesian product between the two nodes. I dont know if there is a way to include the filter on id without speicifying the label node.

Regards Aissa

mroiter-larus commented 2 years ago

@aissaelouafi

the query is supported by the plugin. This is not the problem. The problem is that without specifying the label, Neo4j will scan all the nodes of the graph every time (could be very expensive if you have millions) and you'll generate a lot of useless relationships.

Maybe there is a way to avoid cartesian product. Please checkout the following: https://community.neo4j.com/t/avoid-cartesian-product-when-create-relationships/13624. But i don't know if this could help you, because you'll avoid cartesian product but not the creation of useless relationships. As i told you before, to safely perform the query you've shared, you must be sure that parent node with id=1 can't exists together with a child node with id=1, otherwise your query will match all the nodes with id=1 and you'll never know which is the parent and which is the child.

Regards,

Mauro

aissaelouafi commented 2 years ago

@mroiter-larus,

I hope that you had a great weekend.

Thanks for you the link, I will check how we can avoid the cartesian product.

Concerning the useless relationships, I don't think it's going to happen because we are using the uuid as an identifiant for both child and parent nodes, So the parent node with id=1 can't exists together witg a child node with id=1.

Concernant the cypher query to match nodes without specifiying the node label, I can use this one ?

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id={event.child_id} AND parent.id={event.parent_id} CREATE (parent)-[r:{event.type}]->(child) RETURN r

Thanks a lot again.

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

hope you had a great weekend too.

I'm sorry, I told you can use that query but it is not totally correct. You don't need the curly braces for accessing parameter values event.parent_id, event.child_id and event.type. Furthermore, you can't create the relationships "dynamically" like that:

CREATE (parent)-[r:{event.type}]->(child)

you need to use apoc.create.relationship as follow:

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id=event.child_id AND parent.id=event.parent_id CALL apoc.create.relationship(parent, event.type, {}, child) YIELD rel RETURN rel

Please see the documentation about the APOC: https://neo4j.com/labs/apoc/4.1/overview/apoc.create/apoc.create.relationship/

Regards, Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

It's working well. Many thanks for your precious comments and help. I wish you all the best !

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

glad that it's working fine!

Regards, Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

I hope that you are doing well, I have another issue with the relationship creation apoc. I used this cypher query to create the relationship :

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

I have this data :

{
  "parent": "626a4f1b1be46810704d8055464bcbf1",
  "type_display": "Hosted by ::",
  "child": "4021d02f1b24ac10123c3035464bcb1e"
}

The idea is to match the label node based on id, but the relationships are not created even if I check that both labels exists with parent and child id.

I have the impression that the request is not taken into account by the plugin. I can't see any message in logs regarding the query but sometimes when I restart the neo4j server I can see the created relationships.

The total number of nodes in the database is : 736670 nodes The number of node with parent id is : 1 node The number of node with child id is : 1 node

I added this parameters to the streams.conf file :

streams.sink.enabled=true
streams.sink.enabled.to.test-db2=true
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.security.protocol=SSL

kafka.ssl.truststore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.truststore.password=test123
kafka.ssl.keystore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.keystore.password=test123

kafka.ssl.endpoint.identification.algorithm=HTTPS
streams.check.apoc.timeout=20000
streams.check.apoc.interval=10000
streams.sink.poll.interval=5000
kafka.streams.async.commit=true

kafka.group.id=neo4j_group
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

I don't understand why the query is not executed, when I tried the request manually I can see that the relationships are well created.

If you have an idea it can be very helpful.

Thanks again for your help as usual @mroiter-larus

Regards,

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

I think it's related to a query plan caching. Sometimes I can get the relationship with this message :

Discarded stale query from the query cache after 62549 seconds. Reason: NodesAllCardinality changed from 29300.0 to 308214.0, which is a divergence of 0.9049361807056137 which is greater than threshold 0.043763100387390126. Query: UNWIND $events AS event MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

Do you have an idea about this behavior ?

Regards, Aissa