neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Pattern strategy with kafka connect plugin doesn't produce result #441

Closed antonio-urbano closed 3 years ago

antonio-urbano commented 3 years ago

I'm trying to use kafka connect plugin with pattern strategy to write back on Neo4j from a kafka topic. I have two kafka topics, namely CREATED_PERSON for nodes, with this json format: {"ID":"0","NAME":"AAA","FLAG":true} {"ID":"1","NAME":"BBB","FLAG":false}

and CREATED_KNOWS for relationships, with this json: {"STARTID":"0","ENDID":"1","FLAG":true} {"STARTID":"2","ENDID":"3","FLAG":false}

Finally I configured my connector as follows: { "name": "Neo4jSinkConnector", "config": { "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "topics": "CREATED_PERSON, CREATED_KNOWS", "neo4j.topic.pattern.node.CREATED_PERSON": "(:Person{!ID,*})", "neo4j.topic.pattern.relationship.CREATED_KNOWS": "(:Person{!STARTID})-[:KNOWS]->(:Person{!ENDID})", "errors.retry.timeout": "-1", "errors.retry.delay.max.ms": "1000", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "neo4j.server.uri": "bolt://:7687", "neo4j.authentication.basic.username": "", "neo4j.authentication.basic.password": "", "neo4j.encryption.enabled": false } }

By printing the status of the connector and task, they are both "RUNNING" curl -X GET http://localhost:8083/connectors/Neo4jSinkConnector/status {"name":"Neo4jSinkConnector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"sink"}

But in the neo4j instance there is no result

mroiter-larus commented 3 years ago

Hi @antonio-urbano,

It would be helpful if you could share the versions you're using for:

Thanks in advance

antonio-urbano commented 3 years ago

Hi @mroiter-larus,

I'm using:

Everything in a docker-compose

mroiter-larus commented 3 years ago

Hi @antonio-urbano,

i tried to replicate the issue following the steps you provide, but in my test environment the Kafka Connect Neo4j Sink plugin works as expected. I think there's an issue in your JSON configuration. The only thing i noticed is that in your neo4j.server.uri the host is missing. I don't know if you have removed it before sharing the JSON config file.

Did you by chance check if you have any errors into the connector logs?

I mean, if you run docker-compose logs -f connect, where connect is the name of your Kafka connect container, do you find any errors or exceptions?

antonio-urbano commented 3 years ago

HI @mroiter-larus,

Thanks for the reply. About the neo4j.server.uri in the configuration file is set as follows (it was an error in the copy paste):

By printing the logs of the connect I have some exceptions:

I printed the full log output in the attached txt: logs.txt

Thanks in advance

mroiter-larus commented 3 years ago

@antonio-urbano please, try to replace your neo4j.server.uri with the following:

"neo4j.server.uri": "bolt://<NEO4J_CONTAINER_NAME>:7687"

where <NEO4J_CONTAINER_NAME> is your Neo4j Docker container name. If that doesn't work, could you please share your docker-compose file?

Thanks

antonio-urbano commented 3 years ago

Hi @mroiter-larus,

it was the problem. Thanks again for your assistance