neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Missing nodes in Neo4j with "Neo4j Kafka Connector Sink" #570

Closed pradeepharbour closed 1 year ago

pradeepharbour commented 1 year ago

I have setup my Kafka connector by following Neo4j Documentation and testing the connector by publishing messages using Kafka and expecting the message to be transformed to a Neo4j node. Here is my Sink configuration,

{ "name": "SomeName", "config": { "topics": "SomeTopicName", "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "errors.retry.timeout": "-1", "errors.retry.delay.max.ms": "1000", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "neo4j.database": "someDb", "neo4j.server.uri": "neo4j://X.X.X.X:XXXX", "neo4j.authentication.basic.username": "neo4j", "neo4j.authentication.basic.password": "password", "neo4j.batch.parallelize": false, "neo4j.connection.max.pool.size": 500, "neo4j.topic.cypher.SomeTopicName": "query" }

The basic injection works fine, like broadcasting 10k messages is showing 10k nodes in Neo4j. But, when I broadcast 500K messages, I see only 350K nodes in Neo4j. I don't see any error in connector logs or Neo4j logs. If I run the broadcast with the same set of 500K nodes again, missing nodes are added. Like I need to repeat this run until I see the complete 500K nodes in Neo4j.

Expected Behavior (Mandatory)

I want to see all nodes in Neo4j or an error for missing nodes

Actual Behavior (Mandatory)

Streamed Kafka messages are missing in Neo4j Database

How to Reproduce the Problem

Simple Dataset (where it's possibile)

I used basic cypher query,

//Insert here a set of Cypher statements that helps us to reproduce the problem

MERGE(person:Person {uri: 'http://company#Person/' + event.personid}) 
    ON MATCH SET person.hasdescription=event.hasdescription, person.hascount=event.hascount 
    ON CREATE SET person.hasdescription=person.hasdescription, person.hascount=event.hascount  return count(*) as count 

Steps (Mandatory)

  1. Create a Connector
  2. Broadcast Message to Topic
  3. Check In Neo4j

Screenshots (where it's possibile)

Screenshot 2023-07-16 at 5 32 42 PM Screenshot 2023-07-16 at 5 33 58 PM

Specifications (Mandatory)

Currently used versions

Versions

pradeepharbour commented 1 year ago

The issue is from the broadcasting end, it's not the problem with the connector.