marklogic / nifi

Mirror of Apache NiFi to support ongoing MarkLogic integration efforts
https://marklogic.github.io/nifi/
Apache License 2.0
12 stars 23 forks source link

org.apache.nifi.processor.exception.ProcessException: java.lang.IllegalStateException: This instance has been stopped #124

Closed ELadner closed 1 year ago

ELadner commented 2 years ago

NiFi 1.11.x - been working fine for at least a year, then starts throwing this message continually, to the point of filling up the logs over and over.

Tried restarting NiFi, MarkLogic, clearing the NiFi repository, etc.

Any idea what causes this error?

2022-01-10 08:41:42,338 ERROR [Timer-Driven Process Thread-2] o.a.n.marklogic.processor.PutMarkLogic PutMarkLogic[id=016b101a-d65a-1ba4-4618-c500a674cb9d] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: java.lang.IllegalStateException: This instance has been stopped: org.apache.nifi.processor.exception.ProcessException: java.lang.IllegalStateException: This instance has been stopped org.apache.nifi.processor.exception.ProcessException: java.lang.IllegalStateException: This instance has been stopped at org.apache.nifi.marklogic.processor.AbstractMarkLogicProcessor.logErrorAndRollbackSession(AbstractMarkLogicProcessor.java:215) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:394) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:329) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: This instance has been stopped at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:283) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:395) at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAsync(WriteBatcherImpl.java:385) at org.apache.nifi.marklogic.processor.PutMarkLogic.flushWriteBatcherAsync(PutMarkLogic.java:401) at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:346) ... 12 common frames omitted

ELadner commented 2 years ago

I believe the actual error is coming from the flushWriteBatcherAsync(this.writeBatcher); call.

In the output, I can see the "Flushing the WriteBatcher asynchronously..." message, but not the following (in the code) message of "Calling yield() on the ProcessContext".

This is in the "flowfile == null" section of the code, too. Not sure why it's continually looping over a null flowfile - like several hundered times per second continually.

ELadner commented 2 years ago

This looks like it might be kicked off by a transient networking problem when communicating with the ML server. Still, this kicks off 14 gigabytes of logging per minute until somebody notices the processor is in error and restarts it or the disk fills up and everything goes into the toilet.

2022-01-17 03:59:04,180 ERROR [Timer-Driven Process Thread-6] c.m.c.d.HostAvailabilityListener Encountered [com.marklogic.client.MarkLogicIOException: Failed to serialize metadata] on host "gclcnappvhl0073.gdc0.chevron.net" but black-listing it would drop job below minHosts (1), so stopping job "null"
com.marklogic.client.MarkLogicIOException: Failed to serialize metadata
        at com.marklogic.client.io.DocumentMetadataHandle.sendMetadataImpl(DocumentMetadataHandle.java:791)
        at com.marklogic.client.io.DocumentMetadataHandle.write(DocumentMetadataHandle.java:569)
        at com.marklogic.client.impl.StreamingOutputImpl.writeTo(StreamingOutputImpl.java:58)
        at okhttp3.MultipartBody.writeOrCountBytes(MultipartBody.kt:157)
        at okhttp3.MultipartBody.writeTo(MultipartBody.kt:93)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:59)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:96)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at com.burgstaller.okhttp.AuthenticationCacheInterceptor.intercept(AuthenticationCacheInterceptor.java:49)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:100)
        at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:197)
        at okhttp3.internal.connection.RealCall.execute(RealCall.kt:148)
        at com.marklogic.client.impl.OkHttpServices.sendRequestOnce(OkHttpServices.java:714)
        at com.marklogic.client.impl.OkHttpServices.sendRequestOnce(OkHttpServices.java:709)
        at com.marklogic.client.impl.OkHttpServices.doPost(OkHttpServices.java:4175)
        at com.marklogic.client.impl.OkHttpServices.postResource(OkHttpServices.java:3486)
        at com.marklogic.client.impl.OkHttpServices.postBulkDocuments(OkHttpServices.java:3589)
        at com.marklogic.client.impl.DocumentManagerImpl.write(DocumentManagerImpl.java:649)
        at com.marklogic.client.impl.GenericDocumentImpl.write(GenericDocumentImpl.java:23)
        at com.marklogic.client.impl.DocumentManagerImpl.write(DocumentManagerImpl.java:624)
        at com.marklogic.client.impl.GenericDocumentImpl.write(GenericDocumentImpl.java:23)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl$BatchWriter.run(WriteBatcherImpl.java:679)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2038)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl$CompletableRejectedExecutionHandler.rejectedExecution(WriteBatcherImpl.java:777)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl$CompletableThreadPoolExecutor.execute(WriteBatcherImpl.java:810)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:233)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:241)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:248)
        at org.apache.nifi.marklogic.processor.PutMarkLogic.addWriteEvent(PutMarkLogic.java:408)
        at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:362)
        at org.apache.nifi.marklogic.processor.PutMarkLogic.onTrigger(PutMarkLogic.java:329)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1273)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: javax.xml.stream.XMLStreamException: java.net.SocketException: Broken pipe (Write failed)
        at com.sun.xml.internal.stream.writers.XMLStreamWriterImpl.flush(XMLStreamWriterImpl.java:399)
        at com.marklogic.client.io.DocumentMetadataHandle.sendMetadataImpl(DocumentMetadataHandle.java:788)
        ... 53 common frames omitted
Caused by: java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at okio.OutputStreamSink.write(JvmOkio.kt:53)
        at okio.AsyncTimeout$sink$1.write(AsyncTimeout.kt:103)
        at okio.RealBufferedSink.flush(RealBufferedSink.kt:247)
        at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSink.flush(Http1ExchangeCodec.kt:310)
        at okio.ForwardingSink.flush(ForwardingSink.kt:32)
        at okhttp3.internal.connection.Exchange$RequestBodySink.flush(Exchange.kt:228)
        at okio.RealBufferedSink.flush(RealBufferedSink.kt:250)
        at okio.RealBufferedSink$outputStream$1.flush(RealBufferedSink.kt:123)
        at com.sun.xml.internal.stream.writers.UTF8OutputStreamWriter.flush(UTF8OutputStreamWriter.java:138)
        at com.sun.xml.internal.stream.writers.XMLStreamWriterImpl.flush(XMLStreamWriterImpl.java:397)
        ... 54 common frames omitted
rjrudin commented 2 years ago

@ELadner Has this still been an issue for you?

rjrudin commented 1 year ago

I did some more testing, and I believe this behavior is due to PutMarkLogic rolling back the session and logging an error. That is likely not the correct approach, as the FlowFile that failed will be tried again. In the event of a connection error - such as what's reported above - I think we'd expect the behavior described above, which is that the FlowFile fails repeatedly and we get a ton of useless logging.

I am going to open a new ticket to provide an option for PutMarkLogic to do what ever other processor does, which is to send the failed batch to a "failure" relationship.

Closing this otherwise due to lack of response, but my expectation is that the above ticket will be helpful in resolving this issue.

ELadner commented 1 year ago

Thanks for the attention on this issue, but I no longer use MarkLogic or NiFi in my current role. I'll pass this information along to those who do, however.

On Mon, Nov 28, 2022 at 10:10 AM Rob Rudin @.***> wrote:

Closed #124 https://github.com/marklogic/nifi/issues/124 as completed.

— Reply to this email directly, view it on GitHub https://github.com/marklogic/nifi/issues/124#event-7904477337, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA4C3KV6IP65L47R7M7ZIMLWKTKONANCNFSM5LT3BFLA . You are receiving this because you were mentioned.Message ID: @.***>

-- Eric Ladner