nats-io / nats.java

Java client for NATS
Apache License 2.0
570 stars 155 forks source link

java.lang.IllegalMonitorStateException if thread is interrupted during NatsConnection.publish() #1250

Open davidmcote opened 2 weeks ago

davidmcote commented 2 weeks ago

Observed behavior

My application uses thread interruption to signal that work should be aborted. When a thread is interrupted before making a call to anything that tries to publish a message to a NATS Connection, the call to publish throws an IllegalMonitorStateException.

Here's an example exception stack from a call to KeyValue.create().

java.lang.IllegalMonitorStateException(null)
java.lang.IllegalMonitorStateException
    at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:175)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1007)
    at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:494)
    at io.nats.client.impl.MessageQueue.push(MessageQueue.java:187)
    at io.nats.client.impl.MessageQueue.push(MessageQueue.java:139)
    at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:233)
    at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1678)
    at io.nats.client.impl.NatsConnection.publishInternal(NatsConnection.java:990)
    at io.nats.client.impl.NatsConnection.requestFutureInternal(NatsConnection.java:1339)
    at io.nats.client.impl.NatsConnection.requestInternal(NatsConnection.java:1229)
    at io.nats.client.impl.NatsJetStreamImpl.makeInternalRequestResponseRequired(NatsJetStreamImpl.java:242)
    at io.nats.client.impl.NatsJetStream.publishSyncInternal(NatsJetStream.java:155)
    at io.nats.client.impl.NatsJetStream.publish(NatsJetStream.java:83)
    at io.nats.client.impl.NatsKeyValue._write(NatsKeyValue.java:238)
    at io.nats.client.impl.NatsKeyValue.update(NatsKeyValue.java:189)
    at io.nats.client.impl.NatsKeyValue.create(NatsKeyValue.java:168)

Expected behavior

Calls to NATS should either:

Server and client version

Nats Server: 2.10.20 Nats Java client: 2.20.2

Host environment

Amazon Linux 2 within docker on c5a.2xlarge EC2 instance. Container limited to 2GiB RAM and 3vCPU.

Steps to reproduce

    @Test
    void testNatsIllegalStateMonitor() throws Exception {
        try (final Connection nc = Nats.connect()) {
            nc.publish("test", "test".getBytes()); // Successful

            Thread.currentThread().interrupt();

            nc.publish("test", "test".getBytes()); // throws java.lang.IllegalMonitorStateException
        }
    }
jjlauer commented 21 hours ago

This same exception also occurs if you're on a pull subscription and in anything that's blocking such as fetch(). That exception is pretty scary/confusing as it's sourced from trying to unlock a lock that isn't owned by the thread. I haven't dug much into what's going on, but I fear a lock isn't actually being unlocked even though the fetch() returns since its interrupted.

java.lang.IllegalMonitorStateException: null
    at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:175)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1059)
    at java.base/java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:494)
    at io.nats.client.impl.MessageQueue.push(MessageQueue.java:187)
    at io.nats.client.impl.MessageQueue.push(MessageQueue.java:139)
    at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:233)
    at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1682)
    at io.nats.client.impl.NatsConnection.publishInternal(NatsConnection.java:992)
    at io.nats.client.impl.NatsJetStreamPullSubscription._pull(NatsJetStreamPullSubscription.java:65)
    at io.nats.client.impl.NatsJetStreamPullSubscription._fetch(NatsJetStreamPullSubscription.java:144)
    at io.nats.client.impl.NatsJetStreamPullSubscription.fetch(NatsJetStreamPullSubscription.java:128)
    at com.fizzed.nats.demo.NatsStreamConsumer.lambda$execute$0(NatsStreamConsumer.java:44)
    at java.base/java.lang.Thread.run(Thread.java:1583)
scottf commented 17 hours ago

Are you using any unusual futures or threading model? What version of Java are you running on? Any idea what is causing the interrupt? I'll have to go through all the locks, but they should all be done in finally blocks.

jjlauer commented 17 hours ago

Nothing fancy, just a simple example as I was exploring nats.java yesterday. Here is the code that causes it. The "subscriberThread.interrupt()" triggers that exception if the subscriber thread is in the fetch() method.

https://github.com/jjlauer/nats-demo/blob/master/src/main/java/com/fizzed/nats/demo/NatsStreamConsumer.java

scottf commented 15 hours ago

My goal is to have the client behave as well as possible. I'm looking into this to see if there is anything I can add to the code to make it not freeze. I will also go through the code to make sure my locks are all unlocked in finally, I'm 99.9% certain they are all.

I'm by no means the threading expert but I recently went through all of our code to move completely to ReentrantLock. I did a lot of reading and what I learned is that you have to build your loops to be able to yield and stop. Interrupting just throws a wrench in the works and the result is pretty much undefined.

I'm looking at your code example. There is no reason for you to interrupt the subscriber thread. If you absolutely must shut it down, you should have a flag (AtomicBoolean probably) in your while loop and set that, stop fetching messages and exit the loop gracefully.

In any case, I don't even think you need to do this. The fetch will recover and re-subscribe once the connection is re-established. I also think you should have your exception handling for any JetStream calls (like fetch) outside of the while loop, because they are pretty much terminal errors.

jjlauer commented 15 hours ago

@scottf I've been working in Java for over 20 years and so I'll relay some of my thread/atomic knowledge. Interrupting a thread is exactly how you signal you want blocking calls to stop blocking. The fetch() call blocks on I/O, interrupting that thread is how you'd signal you want the thread to shutdown. In a production app, I'd have an AtomicBoolean that signals I want the thread to stop, I'd then interrupt the thread, then some try/catch in the thread's run() would check that stop was signaled, and exit the run(). What you've described is almost more like a "spin lock" where it polls for X seconds, then decides its being shutdown. That's not a great design when you're trying to handle things like an app gracefully shutting down.

Also, this is how the Java ExecutorService shutdownNow() method works -- the javadocs explains that the executor service will do an interrupt on the threads to signal them to be shutdown: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdownNow()

Now, you'll come back with something like, only wait X seconds on the fetch(), put that in a loop, check the stop signal or something. That'll kind of work, but that wouldn't guarantee an interrupt won't be called, won't be triggered in the fetch(), especially as I pointed out that's exactly how the ExecutorService in Java triggers a shutdown.

Also, its good practice to now "swallow" interrupts if your own library code is in turn also blocking on something. That's why Java's lock APIs all have InterruptedException's get thrown, since that's another area that can block and interrupts should be handled properly and propagated up the chain if necessary. You can handle it in your own try/catch, but then its good to re-throw it so the consumer's of your library can also appropriately handle the interrupt -- since their own app or themselves are the ones that triggered it.

jjlauer commented 15 hours ago

One last thing, I was testing scenarios of the nats server connection being disrupted (ethernet, being shutdown, etc.) while an app was running. If you call the fetch() method, and while its blocking the underlying connection gets re-connected, then fetch() never returns any new messages. You MUST call fetch() again in that situation to get new messages. So despite the fancy re-connect handling under-the-hood, it doesn't actually help my app if the fetch() never returns any new messages. The documentation isn't clear that would be the case and I think the fact "re-connect" is a feature, I expected fetch() to just work once the connection was re-established. The other problem is you don't know a re-connect occurred while your code is calling fetch(). So I added code to add a connection listener, get the connection closed event. Then I needed to tell the subscriber thread that something happened, so I used the interrupt technique. That's how I discovered what I think could be a problem -- it hints that maybe a lock isn't actually unlocked, but I haven't dug too much into it.

scottf commented 14 hours ago

You are supposed to handle the InterruptedException, clean up your state and exit your thread gracefully. Your example code does not do that.

The fetch call in your example already has a time out. An exception thrown via any JetStream call is probably unrecoverable and you might need to make a new subscription.

A disconnection is a recoverable situation that can be accounted for. I suggest actually moving to the simplified API which recovers on disconnects. The fetch in your example can too, I have an example somewhere, but there is more to add to your code to handle it.


As I said, I'm not suggesting, by any means that it's "not my problem". I'm looking into it. Just a quick glance at that push() function with fresh eyes, I can see something I should dig into. And I'm trying to help here. Please keep your comments respectful. I've been working with Java since 1995 so I know a little bit too. The software is free and open source, so if you have a solution that I don't see, feel free to make a PR.

jjlauer commented 14 hours ago

I'm not trying to be difficult, just trying to give nats a try to see if we'll use it as a key part of our platform. You mentioned some newness on threading, so I shared my background. Did not mean that as a dig :-) The fetch() call is currently throwing a java.lang.IllegalMonitorStateException if the thread executing it is interrupted (see my stacktrace above). If it was throwing an InterruptedException, that's actually what I'd expect it to be doing and I'd be happy.

The IllegalMonitorStateException is a scary exception since it means somewhere in the stacktrace I shared, a lock is trying to be released, but its not actually owned by the thread that locked it.

scottf commented 14 hours ago

@jjlauer Would you please look at this PR https://github.com/nats-io/nats.java/pull/1255 My guess is that the lock was not obtained, so the unlock in the finally is bad.

scottf commented 14 hours ago

@jjlauer I built a snapshot of my PR branch 2.20.5.edit-lock-handling-SNAPSHOT. It's only on Sonatype, the repo readme has gradle and maven dependency examples. If it's easier we can move this conversation to slack, I'm scottf on there. I really want to solve this problem. I'm in US East so will be signing off tonight, but back at it tommorow.

jjlauer commented 13 hours ago

@scottf I tried out that branch/PR -- I cloned this repo, checked out that branch, published locally.

The IllegalMonitorStateException no longer gets thrown, but now fetch() immediately returns with no messages once it's been interrupted. So fetch() does technically "swallow" the InterruptedException. Also, the problem is if fetch() is called again (w/ the server's connection still being down), the fetch() call immediately returns again (w/o honoring the Duration I passed in). In my example project, this lead to a full blast never ending loop that was difficult to kill the java process to get out of it.

Now if I add a Thread.sleep() at the bottom of my loop, the fetch() call definitely will quickly return no messages while the server connection is down. Once the server connection comes back up, the new fetch() call did work to get messages.

I guess this leads to the question -- if the underlying connection is down, what behavior should fetch() have? or other methods like pull(), etc? IMHO, they either should throw an exception telling you the connection is down or they should honor the Duration I passed in and if the server comes back up, they return a message.

scottf commented 2 hours ago

The subscription is invalid once the connection is broken and fetch will never work. You need to resubscribe after the interrupt.