marklogic-community / marklogic-nifi-incubator

A collaboration space for processors, recipes, templates, etc. NOTE: improvements made to the connector in this project have been incorporated into the MarkLogic NiFi repository (https://github.com/marklogic/nifi).
Apache License 2.0
4 stars 11 forks source link

PutMarkLogic sometimes stops running when there are still flow files available #98

Closed dmcassel closed 3 years ago

dmcassel commented 3 years ago

After processing a large number of flow files, PutMarkLogic will sometimes stop running with a few left in the queue. PutMarkLogic has already requested them from the queue (they won't show up when asking NiFi to show the list of flow files in the queue), but just sits on them. Stopping PutMarkLogic triggers the @OnStopped event, calling the completeWriteBatcherJob method and finishing them up. I suspect that adding the @OnUnscheduled annotation to the same completeWriteBatcherJob method would address this case.

rjrudin commented 3 years ago

At a minimum, this seems extremely low risk to do. I'll do a quick test to verify I see logging when the processor is unscheduled.

rjrudin commented 3 years ago

I made separate methods for OnStopped, OnUnscheduled, and OnShutdown (the latter doesn't matter here, it's for when NiFi shuts down, along with the JVM running it). When I stop a processor, OnUnscheduled is called which results in the WriteBatcher being flushed and stopped. Then OnStopped is called, which has nothing to do.

So this change seems safe. I don't know of a test to add for it, as we don't have reproducible steps for a bug here.

rjrudin commented 3 years ago

This is being addressed via https://github.com/marklogic/nifi/pull/102 , so closing

dmcassel commented 3 years ago

@rjrudin I'm still seeing this error in 1.9.1.5. I'm thinking that when Concurrent Tasks is greater than 1, it's possible that shouldFlushIfEmpty can end up false when the writeBatcher has unsent documents. When the processor gets stopped, the writeBatcher gets flushed and all's well.

Assuming that is the case, I think we could address that using synchronized blocks when accessing or changing shouldFlushIfEmpty, right?

rjrudin commented 3 years ago

@dmcassel Can you open a new ticket under marklogic-nifi with your details above? Sounds like a good idea, and the bit about setting concurrent tasks > 1 sounds important for testing.

dmcassel commented 3 years ago

https://github.com/marklogic/nifi/issues/112