marklogic / nifi

Mirror of Apache NiFi to support ongoing MarkLogic integration efforts
https://marklogic.github.io/nifi/
Apache License 2.0
12 stars 23 forks source link

PutMarkLogic stopped multiple times #217

Closed rjrudin closed 2 months ago

rjrudin commented 4 months ago

Originally reported at https://github.com/marklogic/nifi/issues/216#issuecomment-2108642899 .

Logging indicates that there may be multiple attempts to stop a PutMarkLogic instance, even with only one task configured for the processor. This may be best fixed in the Java Client, where if the batcher has been stopped, it only needs to log a message if another attempt is made to stop it as opposed to throwing an exception.

frankietsang commented 4 months ago

FYI, I tried to use only a 1 concurrent process and 1 thread count, and I still got the same error.

frankietsang commented 3 months ago

Another observation, once I received the following exceptions continuously, the PutMarkLogic processor would not process any flow file until it has been restarted.

2024-05-19 11:00:00,123 WARN [Timer-Driven Process Thread-2] o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1]] java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAsync(WriteBatcherImpl.java:385) at org.apache.nifi.marklogic.processor.PutMarkLogic.flushWriteBatcherAsync(PutMarkLogic.java:460) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:403) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1361) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:247) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

rjrudin commented 3 months ago

Thanks @frankietsang . I am wondering if the issue is that the processor is not handling the NiFi "unscheduled" event correctly. We see this in our logs:

2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] OnUnscheduled
2024-05-13 14:48:28,061 INFO [Timer-Driven Process Thread-3] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Calling flushAndWait on WriteBatcher

It may be that if the processor is unscheduled, it should not actually stop the underlying batcher - vs if it's stopped, then the expectation is that a user needs to restart the processor. We'll test that out.

frankietsang commented 3 months ago

@rjrudin Any updates on this issue? As more data are being processed, this exception has happened regularly. Overnight job was stuck due to this exception from the processor. Thanks.

rjrudin commented 3 months ago

@frankietsang We have not been able to reproduce the issue. I am thinking though that from the perspective of the processor, it is not helpful for an IllegalStateException to be thrown when an attempt is made to flush the batcher and the batcher has already been stopped (I'm not sure that the batcher should even be throwing an exception - logging a warning seems sufficient there).

Are you using version 1.24.0 of the MarkLogic connector? I am wondering if you could test out a 1.24.1 that includes only the above patch - i.e. we would ensure that any calls to flush the batcher (which are needed in case a number of documents less than the batch size are waiting to be written) do not result in an IllegalStateException being thrown.

Also - do you know if the connector is being intentionally unscheduled / stopped at some point, or is it expected to keep running? My concern is that something else is causing the batcher to stop, and then when the processor performs a flush during the trigger method, the exception is being thrown. If so, the root cause would be that the batcher has unexpectedly been stopped.

frankietsang commented 3 months ago

@rjrudin I'm using 1.24.0 and I can test out 1.24.1. I don't see 1.24.1 as one of the releases. Where can I download it?

rjrudin commented 3 months ago

We'll get 1.24.1 together today or tomorrow, just wanted to make sure that you could try it out right away. Again, the challenge for us is we're not able to reproduce this error yet - but I think the approach I outlined above is a better design than what's currently in place, so it's worth doing.

frankietsang commented 3 months ago

Please let me know when 1.24.1 becomes available. Thanks for your help.

On Thu, May 23, 2024, 09:09 Rob Rudin @.***> wrote:

We'll get 1.24.1 together today or tomorrow, just wanted to make sure that you could try it out right away. Again, the challenge for us is we're not able to reproduce this error yet - but I think the approach I outlined above is a better design than what's currently in place, so it's worth doing.

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/217#issuecomment-2127071257, or unsubscribe https://github.com/notifications/unsubscribe-auth/AG2UGF73PDPRXFPRBVYOFXTZDXTB7AVCNFSM6AAAAABHWETGCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMRXGA3TCMRVG4 . You are receiving this because you were mentioned.Message ID: @.***>

rjrudin commented 3 months ago

@frankietsang We published 1.24.1 - https://github.com/marklogic/nifi/releases/tag/1.24.1 . Please let us know if this addresses the problem.

frankietsang commented 3 months ago

I'll test it and let you know. Thanks.

On Thu, May 23, 2024, 15:48 Rob Rudin @.***> wrote:

@frankietsang https://github.com/frankietsang We published 1.24.1 - https://github.com/marklogic/nifi/releases/tag/1.24.1 . Please let us know if this addresses the problem.

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/217#issuecomment-2127906821, or unsubscribe https://github.com/notifications/unsubscribe-auth/AG2UGF6BYDQJURF4SK4RK7LZDZBYZAVCNFSM6AAAAABHWETGCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMRXHEYDMOBSGE . You are receiving this because you were mentioned.Message ID: @.***>

frankietsang commented 3 months ago

@rjrudin As soon as I started the PutMarkLogic 1.24.1 processor, I got the following Nifi bulletin message:

PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to java.lang.RuntimeException: Unable to create WriteBatcher, cause: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService: java.lang.RuntimeException: Unable to create WriteBatcher, cause: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService

  • Caused by: java.lang.IllegalArgumentException: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService

Stack trace:

2024-05-23 18:32:02,730 ERROR [Timer-Driven Process Thread-22] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to java.lang.RuntimeException: Unable to create WriteBatcher, cause: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService java.lang.RuntimeException: Unable to create WriteBatcher, cause: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService at org.apache.nifi.marklogic.processor.PutMarkLogic.onScheduled(PutMarkLogic.java:296) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:78) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:55) at org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$8(StandardProcessorNode.java:1765) at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Controller Service with identifier 0bf73d50-018e-1000-a7e0-9486362ab7a9 is of type class com.sun.proxy.$Proxy82 and cannot be cast to interface org.apache.nifi.marklogic.controller.MarkLogicDatabaseClientService at org.apache.nifi.attribute.expression.language.StandardPropertyValue.asControllerService(StandardPropertyValue.java:215) at org.apache.nifi.marklogic.processor.AbstractMarkLogicProcessor.getDatabaseClient(AbstractMarkLogicProcessor.java:149) at org.apache.nifi.marklogic.processor.PutMarkLogic.onScheduled(PutMarkLogic.java:289) ... 15 common frames omitted 2024-05-23 18:32:02,730 INFO [Timer-Driven Process Thread-22] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] OnUnscheduled 2024-05-23 18:32:02,730 INFO [Timer-Driven Process Thread-22] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] OnStopped

rjrudin commented 3 months ago

I have not seen that error message before. Can you try disabling and enabling the controller service? There shouldn't be any difference in the controller service between the 1.24.0 and 1.24.1 releases.

Which version of Java are you running?

frankietsang commented 3 months ago

After disabling and enabling the controller server, the problem went away. However, I got the following errors after the processor running for a while:

Nifi Bulletin:

22:00:29 EDT ERROR 1190e08f-74dc-3f04-837a-afca78a5a9e1 PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] This instance has been stopped: java.lang.IllegalStateException: This instance has been stopped 22:00:29 EDT ERROR 1190e08f-74dc-3f04-837a-afca78a5a9e1 PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] This instance has been stopped: java.lang.IllegalStateException: This instance has been stopped 22:00:29 EDT WARNING 1190e08f-74dc-3f04-837a-afca78a5a9e1 PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Not calling flushAsync as batcher has already been stopped. 22:00:29 EDT WARNING 1190e08f-74dc-3f04-837a-afca78a5a9e1 PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] Not calling flushAsync as batcher has already been stopped. 22:00:29 EDT ERROR 1190e08f-74dc-3f04-837a-afca78a5a9e1 PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] This instance has been stopped: java.lang.IllegalStateException: This instance has been stopped

The following exception repeated many times in the nifi-app.log:

2024-05-23 22:00:29,660 ERROR [Timer-Driven Process Thread-7] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=1190e08f-74dc-3f04-837a-afca78a5a9e1] This instance has been stopped java.lang.IllegalStateException: This instance has been stopped       at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283)       at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:211)       at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241)       at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248)       at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:467)       at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:420)       at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:388)       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1361)       at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:247)       at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)       at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)       at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)       at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)       at java.base/java.lang.Thread.run(Thread.java:829)

rjrudin commented 3 months ago

Do you see any occurrences of the following in your logs before those messages show up?

All 3 of those NiFi events will cause the batcher to be stopped. At which point it would be necessary to start the processor back up, causing the batcher to be started as well.

Basically, we're trying to figure out what is causing the batcher to be stopped. The above messages seem more like symptoms of the problem but not the actual problem.

frankietsang commented 3 months ago

I saw those events but I was not sure if those events happened due to me manually restarting the processor. I attached one of the log files in case I missed something. Thanks [Uploading nifi-ml-log.zip…]()

rjrudin commented 3 months ago

@frankietsang Can you try attaching that file again?

frankietsang commented 3 months ago

Can I email you the file?

On Fri, May 24, 2024, 13:17 Rob Rudin @.***> wrote:

@frankietsang https://github.com/frankietsang Can you try attaching that file again?

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/217#issuecomment-2130025736, or unsubscribe https://github.com/notifications/unsubscribe-auth/AG2UGFY4MU67KJO5HMA2DD3ZD5YZJAVCNFSM6AAAAABHWETGCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMZQGAZDKNZTGY . You are receiving this because you were mentioned.Message ID: @.***>

rjrudin commented 3 months ago

If you have a support contract, open up a support ticket - they'll contact my team right away. The support site is a better place for exchanging possibly sensitive information like log files.

frankietsang commented 3 months ago

@rjrudin I have created a support ticket (#36917) and attached the log file.

rjrudin commented 3 months ago

Thanks @frankietsang - I'm going to respond here, but the support ticket is still helpful for tracking this and for uploading files without worrying about any sensitive data being on a public GitHub page.

I am wondering if this is related to https://github.com/marklogic/nifi/issues/124#issuecomment-1014655592 , where the user identified a "transient networking problem". The underlying batcher in the PutMarkLogic processor will be stopped if it determines that it cannot access enough hosts in the MarkLogic cluster. When that occurs, you should see "error" logging in the NiFi logs with a message containing "...but black-listing it would drop job below minHosts... so stopping job".

Otherwise, the underlying batcher should only be stopped via user action, which in our NiFi processor would only occur if the user stops the processor.

Let us know if you see the above logging anywhere - it won't be coming from the PutMarkLogic processor but rather from the com.marklogic.client.datamovement.HostAvailabilityListener class.

If that logging is seen, we at least know why the batcher is being stopped - but ideally, the batcher could be restarted at some point by the processor. My team will think about the right approach for that.

Regardless, we're going to work towards another patch release of our connector that improves the logging for when the batcher is stopped. The intent is that no matter how the batcher is stopped, we'll be able to easily identify why it is being stopped - that is our main challenge right now in diagnosing this issue.

frankietsang commented 3 months ago

I searched all nifi-app* and did not get any match on com.marklogic.client. datamovement.HostAvailabilityListener

On Thu, May 30, 2024 at 7:43 AM Rob Rudin @.***> wrote:

Thanks @frankietsang https://github.com/frankietsang - I'm going to respond here, but the support ticket is still helpful for tracking this and for uploading files without worrying about any sensitive data being on a public GitHub page.

I am wondering if this is related to #124 (comment) https://github.com/marklogic/nifi/issues/124#issuecomment-1014655592 , where the user identified a "transient networking problem". The underlying batcher in the PutMarkLogic processor will be stopped if it determines that it cannot access enough hosts in the MarkLogic cluster. When that occurs, you should see "error" logging in the NiFi logs with a message containing "...but black-listing it would drop job below minHosts... so stopping job".

Otherwise, the underlying batcher should only be stopped via user action, which in our NiFi processor would only occur if the user stops the processor.

Let us know if you see the above logging anywhere - it won't be coming from the PutMarkLogic processor but rather from the com.marklogic.client.datamovement.HostAvailabilityListener class.

If that logging is seen, we at least know why the batcher is being stopped

  • but ideally, the batcher could be restarted at some point by the processor. My team will think about the right approach for that.

Regardless, we're going to work towards another patch release of our connector that improves the logging for when the batcher is stopped. The intent is that no matter how the batcher is stopped, we'll be able to easily identify why it is being stopped - that is our main challenge right now in diagnosing this issue.

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/217#issuecomment-2139374362, or unsubscribe https://github.com/notifications/unsubscribe-auth/AG2UGFYQHMCXYRA4SQEWW43ZE4GHPAVCNFSM6AAAAABHWETGCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMZZGM3TIMZWGI . You are receiving this because you were mentioned.Message ID: @.***>

rjrudin commented 3 months ago

Thanks for checking. Here's what we're thinking for a next step:

  1. A new 1.24.2 release that provides new logging for when the underlying batcher in PutMarkLogic is started and stopped. The main mystery here is why the batcher is stopped, so we're hoping additional logging can identify when the batcher is entering a state of being stopped.
  2. Possibly a subsequent release that provides a "Auto-restart" feature for PutMarkLogic. The idea is that if the batcher has stopped, the processor should try to restart it (technically, create a new one and start it) periodically, using the NiFi "Yield" setting to determine the periodicity. We've done a prototype to verify this is feasible, and we make its usage configurable, with it defaulting to "off".

But since we don't know why the batcher is stopped, we're not sure if the 2nd item above is going to help yet. In a scenario where the batcher is stopping due to MarkLogic not being available, the "Auto-restart" feature seems very helpful, as MarkLogic may only be unavailable for a short stretch of time. We're able to reproduce that scenario consistently simply by shutting down MarkLogic and then bringing it back up, and the "auto-restart" feature works well.

So we'll have a 1.24.2 out soon, with the goal of identifying why the batcher is entering into a stopped state.

rjrudin commented 3 months ago

@frankietsang We'll have a release ready soon, and we are now going to include a "Restart failed batcher" property that defaults to false, but when set to "true", it will attempt to restart the underlying batcher if it fails and stops.

One request - can you search for just "HostAvailabilityListener" in your logs? Depending on how your NiFi logging is configured, you may only have c.m.c.d.HostAvailabilityListener in your NiFi log files.

frankietsang commented 3 months ago

I found c.m.c.d.HostAvailabilityListener in NiFi log files. Please see below:

2024-05-31 00:36:55,050 INFO [Timer-Driven Process Thread-27] c.m.c.datamovement.impl.WriteBatcherImpl flushing 2 queued docs 2024-05-31 00:36:55,050 INFO [Timer-Driven Process Thread-14] c.m.c.datamovement.impl.WriteBatcherImpl flushing 0 queued docs 2024-05-31 00:36:55,050 INFO [Timer-Driven Process Thread-20] c.m.c.datamovement.impl.WriteBatcherImpl flushing 0 queued docs 2024-05-31 00:36:55,051 ERROR [pool-42-thread-5] c.m.c.d.HostAvailabilityListener Encountered [com.marklogic.client.MarkLogicIOException: Failed to serialize metadata: cause: java.net.SocketException: Broken pipe (Write failed)] on host "marklogic-tih.celanese.com" but black-listing it would drop job below minHosts (1), so stopping job "null" com.marklogic.client.MarkLogicIOException: Failed to serialize metadata: cause: java.net.SocketException: Broken pipe (Write failed) at com.marklogic.client.io.DocumentMetadataHandle.sendMetadataImpl(DocumentMetadataHandle.java:840) at com.marklogic.client.io.DocumentMetadataHandle.write(DocumentMetadataHandle.java:618) at com.marklogic.client.impl.StreamingOutputImpl.writeTo(StreamingOutputImpl.java:58) at okhttp3.MultipartBody.writeOrCountBytes(MultipartBody.kt:157) at okhttp3.MultipartBody.writeTo(MultipartBody.kt:93) at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:62) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at com.marklogic.client.impl.okhttp.BasicAuthInterceptor.intercept(BasicAuthenticationConfigurer.java:56) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201) at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154) at com.marklogic.client.impl.OkHttpServices.sendRequestOnce(OkHttpServices.java:516) at com.marklogic.client.impl.OkHttpServices.sendRequestOnce(OkHttpServices.java:511) at com.marklogic.client.impl.OkHttpServices.doPost(OkHttpServices.java:4114) at com.marklogic.client.impl.OkHttpServices.postResource(OkHttpServices.java:3352) at com.marklogic.client.impl.OkHttpServices.postBulkDocuments(OkHttpServices.java:3455) at com.marklogic.client.impl.DocumentManagerImpl.write(DocumentManagerImpl.java:649) at com.marklogic.client.impl.GenericDocumentImpl.write(GenericDocumentImpl.java:23) at com.marklogic.client.impl.DocumentManagerImpl.write(DocumentManagerImpl.java:624) at com.marklogic.client.impl.GenericDocumentImpl.write(GenericDocumentImpl.java:23) at com.marklogic.client.datamovement.impl.WriteBatcherImpl$BatchWriter.run(WriteBatcherImpl.java:679) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: javax.xml.stream.XMLStreamException: java.net.SocketException: Broken pipe (Write failed) at java.xml/com.sun.xml.internal.stream.writers.XMLStreamWriterImpl.flush(XMLStreamWriterImpl.java:399) at com.marklogic.client.io.DocumentMetadataHandle.sendMetadataImpl(DocumentMetadataHandle.java:837) ... 33 common frames omitted

rjrudin commented 3 months ago

Thanks, that helps a lot! That confirms that the reason the batcher stops is due to a network issue, causing it to think that MarkLogic may not be accessible anymore. The "Restart failed batcher" feature we are planning for 1.24.2 should help with this. It will wait for the amount of time configured for the "Yield" setting for the processor and then try to connect to MarkLogic again and start the batcher back up.

"Broken pipe" socket exceptions can occur of course for a variety of reasons. Do you have a load balancer in front of your MarkLogic cluster? We typically recommend using one to support scenarios where connections fail. The MarkLogic Java Client in use by our NiFi connector will retry certain failures, but not all socket exceptions. But a load balancer can typically be configured to provide all kinds of retry support.

frankietsang commented 3 months ago

We don't have a load balancer since we don't have MarkLogic cluster.

rjrudin commented 3 months ago

@frankietsang To confirm, you only have a single host in your cluster? How many threads have you configured on PutMarkLogic?

frankietsang commented 3 months ago

Yes with single host. 3 threads.

On Mon, Jun 3, 2024, 13:47 Rob Rudin @.***> wrote:

@frankietsang https://github.com/frankietsang To confirm, you only have a single host in your cluster? How many threads have you configured on PutMarkLogic?

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/217#issuecomment-2145787581, or unsubscribe https://github.com/notifications/unsubscribe-auth/AG2UGF2BWD7DHXSZTAJICZLZFSTZ7AVCNFSM6AAAAABHWETGCOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNBVG44DONJYGE . You are receiving this because you were mentioned.Message ID: @.***>

rjrudin commented 3 months ago

@frankietsang Sorry for the delay on this, we had some issues with Maven Central. You mentioned 3 threads above - about how many documents are being written to MarkLogic per second or per minute?

rjrudin commented 3 months ago

@frankietsang Release 1.24.2 - https://github.com/marklogic/nifi/releases/tag/1.24.2 - is now available . While this will not address the root cause of connectivity issues (which are surprising, given the use of only 3 threads), the "Restart Failed Batcher" option - when set to true - will result in PutMarkLogic resuming processing of FlowFiles once it is able to connect to MarkLogic again.

frankietsang commented 2 months ago

@rjrudin I was able to test the PutMarkLogic 1.24.2 processor with a good numbers of flow files overnight, I didn't see any errors from the processor. I think this fix looks promising and will keep monitoring it. Thanks.

rjrudin commented 2 months ago

Thanks for the feedback! I'll close this ticket but please open a new one if you run into any future issues.