neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Example from documentation gives lateinit property streamsSinkConfiguration #345

Closed FrankR85 closed 4 years ago

FrankR85 commented 4 years ago

I try to execute the example from the documentation §5.3:

CALL streams.consume('my-topic', {timeout: 5000, partitions: [{partition: 0, offset: 30}]}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

This gives me the following exception:

org.neo4j.driver.exceptions.ClientException: Failed to invoke procedure `streams.consume`: Caused by: kotlin.UninitializedPropertyAccessException: lateinit property streamsSinkConfiguration has not been initialized
    at org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:143)
    at org.neo4j.driver.internal.InternalResult.blockingGet(InternalResult.java:128)
    at org.neo4j.driver.internal.InternalResult.hasNext(InternalResult.java:64)
    at java.util.Iterator.forEachRemaining(Iterator.java:115)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at de.mycomponent.v1.infrastructure.neo4j.Neo4JUpsertTest.test(Neo4JUpsertTest.java:46)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
    Suppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause
        at org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:80)
        at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:105)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.unpackFailureMessage(MessageReaderV1.java:83)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.read(MessageReaderV1.java:59)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
        at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:425)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)

What could be the cause of this issue? I'm running Neo4J locally with this config:

FROM neo4j:latest

COPY neo4jstreams3.5.8.jar /var/lib/neo4j/plugins/neo4jstreams.jar

COPY neo4j.conf /var/lib/neo4j/conf/neo4j.conf

and this is my neo4j.conf:

kafka.bootstrap.servers=mykafka:443

streams.sink.dlq=neo4j-dlq

kafka.auto.offset.reset=earliest
kafka.group.id=neo4j

kafka.acks=all
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432

#kafka.security.protocol=SSL
#kafka.ssl.truststore.location=/home/ubuntu/security/kafka.client1.truststore.jks
#kafka.ssl.truststore.password=neo4jpassword
#kafka.ssl.keystore.location=/home/ubuntu/security/kafka.client1.keystore.jks
#kafka.ssl.keystore.password=neo4jpassword
#kafka.ssl.key.password=neo4jpassword
#kafka.ssl.endpoint.identification.algorithm=HTTPS

dbms.security.procedures.whitelist=streams.*,apoc.*
dbms.security.procedures.unrestricted=apoc.*
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake
dbms.connector.bolt_address 0.0.0.0:7687
dbms.ssl.policy.bolt.client_auth=NONE
dbms.ssl.policy.https.client_auth=NONE
dbms.connector.http.enabled=false
dbms.connector.https.enabled=false
dbms.security.auth_enabled=false

Any ideas?

FrankR85 commented 4 years ago

Don't know if it is relevant but here is the snippet I use to call the procedure:

  Driver driver = GraphDatabase.driver(uri, AuthTokens.none());

   Session session = driver.session();

      Result run = session.run(
          "CALL streams.consume(\'my-topic\', {}) YIELD event"
              + " CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})");
FrankR85 commented 4 years ago

I want to add the information from the running Neo4j Container (started via a generic Testcontainer) We upgraded to neo4j-streams:3.5.9 today.

What makes me wonder is that I don't see anything about Connecting to Kafka or similar in the log. Do we have to activate the streams somehow?

Directories in use:

  home:         /var/lib/neo4j

  config:       /var/lib/neo4j/conf

  logs:         /logs

  plugins:      /var/lib/neo4j/plugins

  import:       NOT SET

  data:         /var/lib/neo4j/data

  certificates: /var/lib/neo4j/certificates

  run:          /var/lib/neo4j/run

Starting Neo4j.

APOC couln't set a URLStreamHandlerFactory since some other tool already did this (e.g. tomcat). This means you cannot use s3:// or hdfs:// style URLs in APOC. This is caused by a limitation of the JVM which we cannot fix. 
2020-09-02 13:15:43.163+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.truststore.password

2020-09-02 13:15:43.166+0000 WARN  Unrecognized setting. No declared setting with name: kafka.batch.size

2020-09-02 13:15:43.171+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.dlq

2020-09-02 13:15:43.172+0000 WARN  Unrecognized setting. No declared setting with name: kafka.acks

2020-09-02 13:15:43.172+0000 WARN  Unrecognized setting. No declared setting with name: dbms.connector.bolt_address

2020-09-02 13:15:43.172+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.keystore.location

2020-09-02 13:15:43.173+0000 WARN  Unrecognized setting. No declared setting with name: kafka.auto.offset.reset

2020-09-02 13:15:43.173+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.cud

2020-09-02 13:15:43.176+0000 WARN  Unrecognized setting. No declared setting with name: kafka.enable.ssl.certificate.verification

2020-09-02 13:15:43.176+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.truststore.location

2020-09-02 13:15:43.177+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.keystore.type

2020-09-02 13:15:43.178+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id

2020-09-02 13:15:43.178+0000 WARN  Unrecognized setting. No declared setting with name: kafka.retries

2020-09-02 13:15:43.178+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.truststore.type

2020-09-02 13:15:43.179+0000 WARN  Unrecognized setting. No declared setting with name: kafka.security.protocol

2020-09-02 13:15:43.179+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled

2020-09-02 13:15:43.180+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.my-topic

2020-09-02 13:15:43.181+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers

2020-09-02 13:15:43.181+0000 WARN  Unrecognized setting. No declared setting with name: kafka.ssl.keystore.password

2020-09-02 13:15:43.182+0000 WARN  Unrecognized setting. No declared setting with name: kafka.num.partitions

2020-09-02 13:15:43.182+0000 WARN  Unrecognized setting. No declared setting with name: kafka.buffer.memory

2020-09-02 13:15:43.184+0000 INFO  Starting...
2020-09-02 13:16:04.212+0000 INFO  Initializing system graph model for component 'security-users' with version -1 and status UNINITIALIZED

2020-09-02 13:16:04.226+0000 INFO  Setting up initial user from defaults: neo4j

2020-09-02 13:16:04.227+0000 INFO  Creating new user 'neo4j' (passwordChangeRequired=true, suspended=false)

2020-09-02 13:16:04.253+0000 INFO  Setting version for 'security-users' to 2

2020-09-02 13:16:04.262+0000 INFO  After initialization of system graph model component 'security-users' have version 2 and status CURRENT

2020-09-02 13:16:04.279+0000 INFO  Performing postInitialization step for component 'security-users' with version 2 and status CURRENT
2020-09-02 13:16:07.102+0000 INFO  Called db.clearQueryCaches(): Query cache already empty.

2020-09-02 13:16:07.168+0000 INFO  Bolt enabled on 0.0.0.0:7687.

2020-09-02 13:16:07.170+0000 INFO  Started.
FrankR85 commented 4 years ago

Ok...stupid me... I shouldn't use Neo4j 4.1.1. with streams 3.x. Using streams-4.0.3 gives us Kafka Connect errors. Seems we are one step further.