knowbi / knowbi-pentaho-pdi-neo4j-output

Pentaho Data Integration output step for Neo4J
Apache License 2.0
41 stars 19 forks source link

Constraints not always created in Neo4J Output step #200

Open Krejtcha opened 3 years ago

Krejtcha commented 3 years ago

Using:

If using the "Neo4J Output" step with the option "Create indexes?" checked. The constraints are not always created even if they do not exist in the database:

#######################################################################
WARNING:  no libwebkitgtk-1.0 detected, some features will be unavailable
    Consider installing the package with apt-get or yum.
    e.g. 'sudo apt-get install libwebkitgtk-1.0-0'
#######################################################################
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
log4j:WARN Continuable parsing error 45 and column 76
log4j:WARN Element type "rollingPolicy" must be declared.
log4j:WARN Continuable parsing error 52 and column 14
log4j:WARN The content of element type "appender" must match "(errorHandler?,param*,layout?,filter*,appender-ref*)".
log4j:WARN Please set a rolling policy for the RollingFileAppender named 'pdi-execution-appender'
05:53:26,124 INFO  [KarafBoot] Checking to see if org.pentaho.clean.karaf.cache is enabled
05:53:26,181 INFO  [KarafInstance] 
*******************************************************************************
*** Karaf Instance Number: 1 at /data-integration/./system/karaf/caches/kit ***
***   chen/data-1                                                           ***
*** FastBin Provider Port:52901                                             ***
*** Karaf Port:8802                                                         ***
*** OSGI Service Port:9051                                                  ***
*******************************************************************************
Feb 15, 2021 5:53:26 AM org.apache.karaf.main.Main$KarafLockCallback lockAquired
INFO: Lock acquired. Setting startlevel to 100
2021/02/15 05:53:26 - Kitchen - Start of run.
2021-02-15 05:53:28.798:INFO:oejs.Server:jetty-8.1.15.v20140411
2021-02-15 05:53:28.837:INFO:oejs.AbstractConnector:Started NIOSocketConnectorWrapper@0.0.0.0:9051
Feb 15, 2021 5:53:28 AM org.apache.cxf.bus.osgi.CXFExtensionBundleListener addExtensions
INFO: Adding the extensions from bundle org.apache.cxf.cxf-rt-management (182) [org.apache.cxf.management.InstrumentationManager]
Feb 15, 2021 5:53:28 AM org.apache.cxf.bus.osgi.CXFExtensionBundleListener addExtensions
INFO: Adding the extensions from bundle org.apache.cxf.cxf-rt-transports-http (183) [org.apache.cxf.transport.http.HTTPTransportFactory, org.apache.cxf.transport.http.HTTPWSDLExtensionLoader, org.apache.cxf.transport.http.policy.HTTPClientAssertionBuilder, org.apache.cxf.transport.http.policy.HTTPServerAssertionBuilder, org.apache.cxf.transport.http.policy.NoOpPolicyInterceptorProvider]
Feb 15, 2021 5:53:29 AM org.pentaho.caching.impl.PentahoCacheManagerFactory$RegistrationHandler$1 onSuccess
INFO: New Caching Service registered
2021/02/15 05:53:30 - Ext-ETL_Intelis-DANA - Start of job execution
log4j:ERROR No output stream or file set for the appender named [pdi-execution-appender].
Feb 15, 2021 5:53:30 AM org.apache.cxf.endpoint.ServerImpl initDestination
INFO: Setting the server's publish address to be /lineage
Feb 15, 2021 5:53:30 AM org.apache.cxf.endpoint.ServerImpl initDestination
INFO: Setting the server's publish address to be /i18n
2021/02/15 05:53:30 - Ext-ETL_Intelis-DANA - Starting entry [Check DANA_rts Connection]
2021/02/15 05:53:30 - Ext-ETL_Intelis-DANA - Starting entry [Check DANA_neo4j Connection]
Feb 15, 2021 5:53:31 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 88080379 created for server address neo4j:7687
2021/02/15 05:53:31 - Check DANA_neo4j Connection - 1 Neo4j connections tested without error
2021/02/15 05:53:31 - Ext-ETL_Intelis-DANA - Starting entry [DANA_build-graph-model]
2021/02/15 05:53:31 - DANA_build-graph-model - Starting entry [DANA_build-graph-mode]
Feb 15, 2021 5:53:31 AM org.apache.cxf.endpoint.ServerImpl initDestination
INFO: Setting the server's publish address to be /marketplace
2021/02/15 05:53:31 - DANA_build-graph-mode - Using run configuration [Pentaho local]
2021/02/15 05:53:31 - DANA_build-graph-mode - Using legacy execution engine
2021/02/15 05:53:31 - DANA_build-graph-model - Dispatching started for transformation [DANA_build-graph-model]
Feb 15, 2021 5:53:31 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 694865548 created for server address neo4j:7687
Feb 15, 2021 5:53:31 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 104950853 created for server address neo4j:7687
Feb 15, 2021 5:53:31 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 1182457206 created for server address neo4j:7687
2021/02/15 05:53:31 - Select Netelem, Karta.0 - Finished reading query, closing connection.
2021/02/15 05:53:31 - Select Netelem, Karta.0 - Finished processing (I=382, O=0, R=0, W=382, U=0, E=0)
2021/02/15 05:53:31 - Select IRST_KARTA.0 - Finished reading query, closing connection.
2021/02/15 05:53:31 - Select IRST_KARTA.0 - Finished processing (I=382, O=0, R=0, W=382, U=0, E=0)
2021/02/15 05:53:31 - Select IRST_RDS_UZEL_MPLS.0 - Finished reading query, closing connection.
2021/02/15 05:53:31 - Select IRST_RDS_UZEL_MPLS.0 - Finished processing (I=894, O=0, R=0, W=894, U=0, E=0)
2021/02/15 05:53:31 - Select IRST_RDS_PORT.0 - Finished reading query, closing connection.
2021/02/15 05:53:31 - Select IRST_RDS_PORT.0 - Finished processing (I=3424, O=0, R=0, W=3424, U=0, E=0)
Feb 15, 2021 5:53:32 AM org.neo4j.driver.internal.logging.JULogger warn
WARNING: Transaction failed and will be retried in 1000ms
org.neo4j.driver.exceptions.TransientException: Database constraints have changed (txId=605) after this transaction (txId=602) started, which is not yet supported. Please retry your transaction to ensure all constraints are executed.
    at org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:143)
    at org.neo4j.driver.internal.InternalSession.beginTransaction(InternalSession.java:163)
    at org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:147)
    at org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:102)
    at org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:146)
    at org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:124)
    at org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:118)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.emptyUnwindList(Neo4JOutput.java:423)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.wrapUpTransaction(Neo4JOutput.java:894)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.dispose(Neo4JOutput.java:703)
    at org.pentaho.di.trans.step.RunThread.run(RunThread.java:97)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause
        at org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:105)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.unpackFailureMessage(MessageReaderV1.java:83)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.read(MessageReaderV1.java:59)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
        at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
Feb 15, 2021 5:53:32 AM org.neo4j.driver.internal.logging.JULogger warn
WARNING: Transaction failed and will be retried in 1140ms
org.neo4j.driver.exceptions.TransientException: Database constraints have changed (txId=605) after this transaction (txId=602) started, which is not yet supported. Please retry your transaction to ensure all constraints are executed.
    at org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:143)
    at org.neo4j.driver.internal.InternalSession.beginTransaction(InternalSession.java:163)
    at org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:147)
    at org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:102)
    at org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:146)
    at org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:124)
    at org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:118)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.emptyUnwindList(Neo4JOutput.java:423)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.wrapUpTransaction(Neo4JOutput.java:894)
    at bi.know.kettle.neo4j.steps.output.Neo4JOutput.dispose(Neo4JOutput.java:703)
    at org.pentaho.di.trans.step.RunThread.run(RunThread.java:97)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause
        at org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:105)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.unpackFailureMessage(MessageReaderV1.java:83)
        at org.neo4j.driver.internal.messaging.v1.MessageReaderV1.read(MessageReaderV1.java:59)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:83)
        at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:35)
        at org.neo4j.driver.internal.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:47)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at org.neo4j.driver.internal.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.neo4j.driver.internal.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.neo4j.driver.internal.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
2021/02/15 05:53:32 - Create Karta.0 - Finished processing (I=0, O=382, R=382, W=382, U=0, E=0)
2021/02/15 05:53:33 - Create Port.0 - Finished processing (I=0, O=3424, R=3424, W=3424, U=0, E=0)
2021/02/15 05:53:33 - Create Netelem.0 - Finished processing (I=0, O=894, R=894, W=894, U=0, E=0)
Feb 15, 2021 5:53:33 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 47095385 created for server address neo4j:7687
Feb 15, 2021 5:53:33 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 1505561222 created for server address neo4j:7687
2021/02/15 05:53:33 - Block until Netelem, Karta created.0 - Finished processing (I=0, O=0, R=382, W=382, U=0, E=0)
Feb 15, 2021 5:53:33 AM org.neo4j.driver.internal.logging.JULogger info
INFO: Direct driver instance 2135575349 created for server address neo4j:7687
2021/02/15 05:53:33 - Block until Port, Netelem created.0 - Finished processing (I=0, O=0, R=3424, W=3424, U=0, E=0)
2021/02/15 05:53:34 - Create Netelem Obsahuje Karta.0 - Finished processing (I=0, O=382, R=382, W=382, U=0, E=0)
2021/02/15 05:54:00 - Create Port Obsahuje Port.0 - Finished processing (I=0, O=3424, R=3424, W=3424, U=0, E=0)
2021/02/15 05:54:26 - Create Netelem Obsahuje Port.0 - Finished processing (I=0, O=3424, R=3424, W=3424, U=0, E=0)
2021/02/15 05:54:26 - Carte - Installing timer to purge stale objects after 1440 minutes.
2021/02/15 05:54:26 - DANA_build-graph-model - Finished job entry [DANA_build-graph-mode] (result=[true])
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Starting entry [Success]
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Finished job entry [Success] (result=[true])
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Finished job entry [DANA_build-graph-model] (result=[true])
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Finished job entry [Check DANA_neo4j Connection] (result=[true])
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Finished job entry [Check DANA_rts Connection] (result=[true])
2021/02/15 05:54:26 - Ext-ETL_Intelis-DANA - Job execution finished
2021/02/15 05:54:26 - Kitchen - Finished!
2021/02/15 05:54:26 - Kitchen - Start=2021/02/15 05:53:26.917, Stop=2021/02/15 05:54:26.599
2021/02/15 05:54:26 - Kitchen - Processing ended after 59 seconds.

The transformation finishes, but some constraints are not created thus making the dependent steps run very slow.

mattcasters commented 3 years ago

This option is a left-over from another era. I would not add it anymore. Anyway I think the transactional model changed slightly in the 4.1.x Java drivers or the 4.2 server line. I think we need to wrap every index creation in a separate transaction.

Krejtcha commented 3 years ago

Right now I am creating constraints with the Neo4j Cypher step manually and let all the Neo4j Output steps wait until all the constraints are created. Is this what you are suggesting or do you plan some improvements/extension for the plugin?

mattcasters commented 3 years ago

I would create the indexes in a parent job as part of the overall workflow using the Execute Cypher job entry. In the end I wouldn't let the scattered steps determine these things. I would let my graph model determine which fields should have indexes or constraints.

Krejtcha commented 3 years ago

Thanks, I will do that.