neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Relationships creation not working #532

Open aissaelouafi opened 2 years ago

aissaelouafi commented 2 years ago

Hello,

I have an issue with the relationship creation apoc : apoc.create.relationship I used this cypher query to create the relationship :

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

I have this data :

{
  "parent": "626a4f1b1be46810704d8055464bcbf1",
  "type_display": "Hosted by ::",
  "child": "4021d02f1b24ac10123c3035464bcb1e"
}

The idea is to match the label node based on id, but the relationships are not created even if I check that both labels exists with parent and child id.

I think that the request is not taken into account by the plugin. I can't see any message in logs regarding the query but sometimes when I restart the neo4j server I can see the created relationships.

The total number of nodes in the database is : 736670 nodes The number of node with parent id is : 1 node The number of node with child id is : 1 node

I added this parameters to the streams.conf file :

streams.sink.enabled=true
streams.sink.enabled.to.test-db2=true
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.security.protocol=SSL

kafka.ssl.truststore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.truststore.password=test123
kafka.ssl.keystore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.keystore.password=test123

kafka.ssl.endpoint.identification.algorithm=HTTPS
streams.check.apoc.timeout=20000
streams.check.apoc.interval=10000
streams.sink.poll.interval=5000
kafka.streams.async.commit=true

kafka.group.id=neo4j_group
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

I don't understand why the query is not executed, when I tried the request manually I can see that the relationships are well created.

If you have an idea it can be very helpful.

I think it's related to a query plan caching. Sometimes I can see that the relationship is well created with this message on log file :

Discarded stale query from the query cache after 62549 seconds. Reason: NodesAllCardinality changed from 29300.0 to 308214.0, which is a divergence of 0.9049361807056137 which is greater than threshold 0.043763100387390126. Query: UNWIND $events AS event MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

Do you have an idea about this behavior ?

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

i think this is not a problem related to the Neo4j Streams plugin. That message means that the data involved in the query that you're trying to execute has changed significantly from the previous execution of the same query. So, the query plan in the cache may not be valid anymore and a new one will be generated.

You probably need to change some Neo4j params regarding the heap size and the page cache size.

Furthermore, you can find more details about query replanning process here.

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Thanks a lot again for your comment. I think it's not related to the neo4j cache. I think it's related to neo4j kafka params because the node and relationships are well created when I send for example many thousands event to the kafka topic but when I send only one event it's not working and I dont see any message on log file.

I tried to change the following param : kafka.max.poll.records=1 to force kafka to poll each event, I know that this value can generate a memory / performance issues but I still can't receive the event in neo4j side. I dont know if I should modify the params neo4j.batch.size and neo4j.batch.timeout.msecs as well. I also deleted the param streams.sink.poll.interval.

This problem persist since many weeks and I really dont have any message on log file.

Many thanks for your precious comments @mroiter-larus.

Regards, Aissa

mroiter-larus commented 2 years ago

Hi @aissaelouafi,

i'll investigate on this case. In the meantime, i've noticed a thing on your streams configuration regarding the message commit. You have enabled the async commit via the kafka.streams.async.commit=true parameter, but this works only when you disable the Kafka auto commit via the kafka.enable.auto.commit=false paremeter.

Another suggestion regarding the error management. You have set the following params:

streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

but if you don't set streams.sink.errors.tolerance=all too, they don't have any effect.

Regards,

Mauro

aissaelouafi commented 2 years ago

Hello @mroiter-larus,

Thanks for you recommendations.

I just modified the parameters, the conf file looks like :

streams.sink.enabled=true
streams.sink.enabled.to.test-db2=true
streams.sink.errors.tolerance=all
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

# Kafka conf parameters
kafka.enable.auto.commit=false
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.security.protocol=SSL

# Kafka ssl certificates
kafka.ssl.truststore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.truststore.password=test123
kafka.ssl.keystore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.keystore.password=test123

# Kafka poll
kafka.max.poll.records=1

kafka.ssl.endpoint.identification.algorithm=HTTPS
streams.check.apoc.timeout=2000
streams.check.apoc.interval=1000

kafka.group.id=neo4j_group
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

I have more and more relationships created with this configuration but I still don't receive all events I send to the kafka topic. I'm subscribed to 4 kafka topics as following :

streams.sink.topic.pattern.node.topic-1.to.testdb-2=Incident{!sys_id}
streams.sink.topic.pattern.node.topic-2.to.testdb-2=Change{!sys_id}
streams.sink.topic.cypher.topic-3.to.testdb-2=call apoc.do.case(...)
streams.sink.topic.cypher.topic-4.to.testdb-2=MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

I suspect a delay related to kafka consumer but that's why I set the param kafka.max.poll.records=1 and I removed the param streams.sink.poll.interval.

I have another question, It is possible to consume the same kafka topic 2 times, I have the following data :

{
'id':1,
'name':'Luka',
'friend_of':2
}

I want to create the node Person for example and also the relationship friend_od with the Person with id=2.

Regards, Aissa

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

Did you have time to investigate this case please ? About the second part of the issue, I dont know if there a solution to execute 2 cypher query based on the same event consumed from kafka. Like create a Node and a Relationship based on the same kafka topic.

I want to do something like that :

streams.sink.topic.cypher.topic-1.to.testdb-2=MERGE (n:Incident {id: event.id}) ON CREATE SET n +=event.properties MATCH (c {id: event.cmdb_ci}) CREATE (n)-[r:IMPACTS]->(c)

Thanks a lot for your precious help.

Regards, Aissa

mroiter-larus commented 2 years ago

@aissaelouafi

i was able to replicate the issue. I'm still investigating. About the second part of the issue, you can't consume events from a topic two times with the same consumer. You should force the creation of a new consumer but this is not possible with the plugin.

I'll keep you posted.

Mauro

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

I just get this error message in log file :

2022-04-07 14:15:14.235+0000 ERROR [s.StreamsSinkConfigurationListener] [db-2/79d0ad63] Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1151) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1081) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaManualCommitEventConsumer.commitData(KafkaManualCommitEventConsumer.kt:84) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaManualCommitEventConsumer.read(KafkaManualCommitEventConsumer.kt:91) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:159) [neo4j-streams-4.1.0.jar:?]
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665) [neo4j-streams-4.1.0.jar:?]

I think that the issue is related to these parameters.

Regards, Aissa

aissaelouafi commented 2 years ago

Hi @mroiter-larus,

I hope that you are doing well. I have a question concerning the issue as we can't have two consumers for the same kafka topic to create both node and relationship. I tried to create a cypher query to create the node and the relationship at the same time but it's not working.

I have this data :

{
id : 1,
name: Test,
friend_of : 2
}

So I want to create a node called Person with attribute {id:1, name: 'Test'} and the relationship friend_of with the node Person with id=2. This is the cypher query I tried :

MERGE (n:Person {id: event.id}) ON CREATE SET n +=event.properties WITH n MATCH (p) WHERE p.id=event.friend_of CALL apoc.create.relationship (n, 'friend_of', {}, p) YIELD rel RETURN rel

I don't know if this kind of cypher query is supporter by the kafka plugin or maybe if I have a syntax error on the query because both node and relationships are not created with this query.

Do you have some news concerning the kafka delay to consume event ?

Thanks a lot for your help.

Regards, Aissa