marklogic / nifi

Mirror of Apache NiFi to support ongoing MarkLogic integration efforts
https://marklogic.github.io/nifi/
Apache License 2.0
12 stars 23 forks source link

Exception in QueryMarkLogic processor after fetching ~1500. #21

Closed iowusu closed 5 years ago

iowusu commented 5 years ago

We have about 10000 records that the processor need to fetch, the QueryMarklogic processor throws an exception after fetching ~1500 records. We see this pattern any time we test our flow with 10000+ records.

Nifi Error 018-11-26 23:20:09,462 ERROR [pool-1635-thread-2] o.a.n.marklogic.processor.QueryMarkLogic QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] failed to process due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=b1b0f03b-a288-4d9a-b399-cbad0ea87b43,claim=,offset=0,name=845183032136763,size=0]; rolling back session: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=b1b0f03b-a288-4d9a-b399-cbad0ea87b43,claim=,offset=0,name=845183032136763,size=0] org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=b1b0f03b-a288-4d9a-b399-cbad0ea87b43,claim=,offset=0,name=845183032136763,size=0] at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:80) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:68) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$null$0(QueryMarkLogic.java:184) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2625) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:184) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.FileSystemRepository$2.write(FileSystemRepository.java:947) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:44) at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:78) ... 8 common frames omitted 2018-11-26 23:20:09,463 ERROR [pool-1635-thread-3] c.m.c.datamovement.impl.QueryBatcherImpl Exception thrown by an onUrisReady listener org.apache.nifi.processor.exception.ProcessException: java.lang.IllegalStateException at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:195) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: null at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:238) at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:335) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:191) ... 4 common frames omitted 2018-11-26 23:20:09,463 ERROR [pool-1635-thread-1] c.m.c.datamovement.impl.QueryBatcherImpl Exception thrown by an onUrisReady listener org.apache.nifi.processor.exception.ProcessException: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=618059a1-1a8e-4e88-b9b9-5c257dde32ef,claim=,offset=0,name=845183030096026,size=0] at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:195) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=618059a1-1a8e-4e88-b9b9-5c257dde32ef,claim=,offset=0,name=845183030096026,size=0] at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:80) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:68) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$null$0(QueryMarkLogic.java:184) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2625) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:184) ... 4 common frames omitted Caused by: java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.FileSystemRepository$2.write(FileSystemRepository.java:947) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:44) at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:78) ... 8 common frames omitted 2018-11-26 23:20:09,463 ERROR [pool-1635-thread-2] c.m.c.datamovement.impl.QueryBatcherImpl Exception thrown by an onUrisReady listener org.apache.nifi.processor.exception.ProcessException: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=b1b0f03b-a288-4d9a-b399-cbad0ea87b43,claim=,offset=0,name=845183032136763,size=0] at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:195) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=b1b0f03b-a288-4d9a-b399-cbad0ea87b43,claim=,offset=0,name=845183032136763,size=0] at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:80) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:68) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$null$0(QueryMarkLogic.java:184) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2625) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:184) ... 4 common frames omitted Caused by: java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.FileSystemRepository$2.write(FileSystemRepository.java:947) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:44) at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:78) ... 8 common frames omitted 2018-11-26 23:20:09,475 INFO [pool-1635-thread-2] c.m.c.datamovement.impl.QueryBatcherImpl Consistent snapshot timestamp=[-1] 2018-11-26 23:20:09,475 INFO [pool-1635-thread-2] c.m.client.impl.DocumentManagerImpl Reading metadata and content for multiple uris beginning with /data/member/e2194e28-3071-4bf9-ab83-1fec0da95ae0.json 2018-11-26 23:20:09,475 INFO [pool-1635-thread-1] c.m.c.datamovement.impl.QueryBatcherImpl Consistent snapshot timestamp=[-1] 2018-11-26 23:20:09,476 INFO [pool-1635-thread-1] c.m.client.impl.DocumentManagerImpl Reading metadata and content for multiple uris beginning with /data/member/b2d8c667-350c-4373-a1b1-5dbc028982d3.json 2018-11-26 23:20:09,476 INFO [pool-1635-thread-3] c.m.c.datamovement.impl.QueryBatcherImpl Consistent snapshot timestamp=[-1] 2018-11-26 23:20:09,476 INFO [pool-1635-thread-3] c.m.client.impl.DocumentManagerImpl Reading metadata and content for multiple uris beginning with /data/member/be763836-33ad-4896-ab0a-7443e5040438.json 2018-11-26 23:20:09,745 ERROR [pool-1635-thread-3] o.a.n.marklogic.processor.QueryMarkLogic QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] failed to process due to java.lang.IllegalStateException; rolling back session: java.lang.IllegalStateException java.lang.IllegalStateException: null at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:238) at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:335) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:191) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-11-26 23:20:09,748 ERROR [pool-1635-thread-1] o.a.n.marklogic.processor.QueryMarkLogic QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] failed to process due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0]; rolling back session: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0] org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0] at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:80) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:68) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$null$0(QueryMarkLogic.java:184) at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2625) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:184) at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.FileSystemRepository$2.write(FileSystemRepository.java:947) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:44) at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:78) ... 8 common frames omitted

fsnow commented 5 years ago

This:

Caused by: java.lang.IllegalStateException: null at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:238) at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:335) at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:191)

IllegalStateExceptions are usually related to transferring the FlowFile more than once.

ryanjdew commented 5 years ago

Exploring if there could be a situation where we could be transferring FlowFiles multiple times, but at first glance, it looks like this could be a NiFi performance issue with the NiFi repositories. It might be worth looking at the NiFi performance information at https://marklogic.github.io/nifi/performance-considerations#__RefHeading__786_1654017897 and see if the NiFi system could be tuned.

pradeepkumardv commented 5 years ago

most of the performance considerations from the documentation were already in place except the number of threads and multiple provenance/content repositories.

ryanjdew commented 5 years ago

Looking further down the stack trace, the error is due to the stream for the NiFi FileSystemRepository being closed when writing a FlowFile to a sesssion on line 184 of QueryMarklogic (https://github.com/marklogic/nifi/blob/marklogic-1.8.0.1/nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/org/apache/nifi/marklogic/processor/QueryMarkLogic.java#L184). You can see that the FlowFile is created on the line just above, so it can't really be re-sending the same FlowFile.

Unfortunately, it doesn't get me closer to a solution at the moment.

2018-11-26 23:20:09,748 ERROR [pool-1635-thread-1] o.a.n.marklogic.processor.QueryMarkLogic QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] QueryMarkLogic[id=02823651-3937-1887-b3ba-77ff802d44b9] failed to process due to org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0]; rolling back session: org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0]
org.apache.nifi.processor.exception.FlowFileAccessException: Could not write to StandardFlowFileRecord[uuid=bee20a52-20f7-4891-a00b-10877778ca0e,claim=,offset=0,name=845183316066246,size=0]
at org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream.write(FlowFileAccessOutputStream.java:80)
at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:68)
at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$null$0(QueryMarkLogic.java:184)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:2625)
at org.apache.nifi.marklogic.processor.QueryMarkLogic.lambda$onTrigger$1(QueryMarkLogic.java:184)
at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:657)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is closed
at org.apache.nifi.controller.repository.FileSystemRepository$2.write(FileSystemRepository.java:947)
damonfeldman commented 5 years ago

Poking around the code, the stream is likely closed earlier by FileSystemRepository.close() in org.apache.nifi.controller.repository.FileSystemRepository (lines 972ff). This method uses a trace flag and will dump the stack showing what closed it if trace is enabled.

If this is not reproduceable, we may set that flag in the original system to see what closed the stream before the error.

See https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java

ryanjdew commented 5 years ago

I was able to reproduce by having creating a larger load and changing the "Run Schedule" to small interval (~10 seconds). I have a fix pending by using a synchronized block on the session. https://github.com/marklogic/nifi/blob/df91522b9f3521b4227ca4c0b9b8b11ea2406432/nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/org/apache/nifi/marklogic/processor/QueryMarkLogic.java#L228