marklogic / nifi

Mirror of Apache NiFi to support ongoing MarkLogic integration efforts
https://marklogic.github.io/nifi/
Apache License 2.0
12 stars 23 forks source link

java.lang.IllegalStateException after about 30 seconds of processing #216

Closed Brandon4v closed 2 months ago

Brandon4v commented 4 months ago

Using NIFI 1.22.0, with the PutMarkLogic 1.24.0 processor, I have observed that the processor enters in an error-state from anywhere between 20 seconds to 5 minutes after processing anywhere between 20,000 and 500,000 sub 1KB documents in batches of 1000, but also attempted in batches of 100. When an error does occur we see on Nifi the error notification on the process say:

"PutMarkLogic[ID={GUIDVALUEHERE}] Processing failed: org.apache.IllegalStateException: This instance has been stopped - Caused by: java.lang.IllegalStateException: This instance has been stopped"

I would review the MarkLogic error logs for this single node cluster and would not see ANY error logs for the App Server that I am using for this flow nor do the general error logs suggest anything about the server being unhealthy.

image

rjrudin commented 4 months ago

Can you look for a stacktrace in the NiFi app log? This sounds similar to #21 , wondering if there's a NiFi error about a stream being closed in your stacktrace.

Also, is your NiFi processor connecting to a load balancer and directly to a MarkLogic host?

Brandon4v commented 4 months ago

The NIFI server is single node, and so is the MarkLogic server. Additionally there is currently no load balancers configuration currently stood up.

Thanks for the quick response!

--NIFI Error Log-- 2024-04-30 14:36:52,286 ERROR [Timer-Driven Process Thread-6] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,286 ERROR [Timer-Driven Process Thread-4] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,337 ERROR [Timer-Driven Process Thread-6] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,338 ERROR [Timer-Driven Process Thread-6] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,378 ERROR [Timer-Driven Process Thread-5] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,379 ERROR [Timer-Driven Process Thread-5] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-04-30 14:36:52,429 ERROR [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=c9a3385d-45ec-17c3-2b30-b6bab7cedbee] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248) at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

rjrudin commented 4 months ago

Can you check your NiFi app log for the follow messages from PutMarkLogic:

Those are each logged at the INFO level when the processor is shutdown, stopped, or unscheduled. If you don't see those, I don't know how the underlying WriteBatcher in PutMarkLogic is being stopped.

chhean commented 4 months ago

@rjrudin I have also been looking into this issue with @Brandon4v and I'll get back to you with more NiFi app log details after validating good network connectivity between the ML and NiFi hosts.

frankietsang commented 4 months ago

I have experienced a similar error with the PutMarkLogic processor. I'm using Nifi 1.24 with the PutMarkLogic 1.24.0 processor. The following is the stacktrace with the last error plus the error while stopping the PutMarkLogic processor:

2024-05-13 14:48:27,523 ERROR [Timer-Driven Process Thread-9] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Processing halted: yielding [1 sec] java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAsync(WriteBatcherImpl.java:385) at org.apache.nifi.marklogic.processor.PutMarkLogic.flushWriteBatcherAsync(PutMarkLogic.java:460) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:403) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1361) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:247) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-05-13 14:48:27,523 WARN [Timer-Driven Process Thread-9] o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1]] java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAsync(WriteBatcherImpl.java:385) at org.apache.nifi.marklogic.processor.PutMarkLogic.flushWriteBatcherAsync(PutMarkLogic.java:460) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:403) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1361) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:247) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-05-13 14:48:28,061 INFO [NiFi Web Server-2218] o.a.n.c.s.StandardProcessScheduler Stopping PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] 2024-05-13 14:48:28,061 INFO [NiFi Web Server-2218] o.a.n.controller.StandardProcessorNode Stopping processor: PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] 2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] to run 2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] OnUnscheduled 2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Calling flushAndWait on WriteBatcher 2024-05-13 14:48:28,061 ERROR [Timer-Driven Process Thread-3] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void org.apache.nifi.marklogic.processor.PutMarkLogic.onUnscheduled()' with arguments '[]'. java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAndWait(WriteBatcherImpl.java:390) at org.apache.nifi.marklogic.processor.PutMarkLogic.completeWriteBatcherJob(PutMarkLogic.java:616) at org.apache.nifi.marklogic.processor.PutMarkLogic.onUnscheduled(PutMarkLogic.java:610) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:316) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:93) at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1917) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] OnStopped 2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Calling flushAndWait on WriteBatcher 2024-05-13 14:48:28,061 ERROR [Timer-Driven Process Thread-3] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void org.apache.nifi.marklogic.processor.PutMarkLogic.onStopped()' with arguments '[]'. java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAndWait(WriteBatcherImpl.java:390) at org.apache.nifi.marklogic.processor.PutMarkLogic.completeWriteBatcherJob(PutMarkLogic.java:616) at org.apache.nifi.marklogic.processor.PutMarkLogic.onStopped(PutMarkLogic.java:604) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:316) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:93) at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1929) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

rjrudin commented 4 months ago

Thanks @frankietsang . In your logging, the scenario looks different - we can see the "OnUnscheduled" text in your logs, indicating that a user unscheduled PutMarkLogic. It looks like multiple threads may be trying to stop the underlying batcher - can you reply back with how many tasks you have assigned to this NiFi processor? There may be a separate bug where if 2 or more tasks are assigned to it, we get a race condition with trying to stop the batcher.

However, I'm not expecting that would cause an issue, other than the logs showing the error. That is - the user has requested that the processor be stopped, and the logs are saying that the processor has effectively already been stopped. I think that's different from the original error here, where the processor is still running and items are added but the underlying batcher has been stopped.

rjrudin commented 4 months ago

@chhean Another piece of logging to look for is an error message with text of "black-listing it would drop job below minHosts". That error will occur when the PutMarkLogic processor receives a connection error such that it thinks the MarkLogic host is no longer available. If it does not determine that enough hosts are still present, it will stop the underlying batcher.

frankietsang commented 4 months ago

@rjrudin The 'Concurrent Tasks' is set to 1 for the PutMarkLogic processor with 'Thread Count' set to 3.

rjrudin commented 4 months ago

Thanks @frankietsang - I opened #217 to track this as it seems like a separate issue.

rjrudin commented 3 months ago

@Brandon4v @chhean When you have an opportunity, please try the "Restart Failed Batcher" setting in the new 1.24.2 release - https://github.com/marklogic/nifi/releases/tag/1.24.2 . We're able to reproduce a stopped batcher in PutMarkLogic simply by shutting down MarkLogic temporarily. While MarkLogic is not available, FlowFiles are routed to the FAILURE relationship. Once we start MarkLogic back up, PutMarkLogic will verify that it now connect to MarkLogic again and will resume normal processing of FlowFiles.

rjrudin commented 2 months ago

The expectation is that the 1.24.2 release will address this by providing an optional "restart" mechanism for PutMarkLogic.

I am closing this ticket in anticipation of that being the fix, but please open a new issue if you run into any issues with this release.