Open WellingR opened 4 years ago
I wonder if it would make sense to explicitly call close()
on the consumer in the error handler. It seems like this is something the consumer implementation should do itself when a connection error/timeout is thrown.
The implementation of the handleError callback is currently as follows:
private[jms] val handleError = getAsyncCallback[Throwable] { e =>
updateState(JmsConnectorStopping(Failure(e)))
fail(out, e)
}
I am not completely sure what happens when fail(out, e)
is called. But it seems like it only signals the failure downstream and does not actually stop or abort the sessions.
Maybe handleError
should call abortSessions(ex: Throwable)
? Which eventually also signals the failure downstream
Yes, the sessions and connections need to be closed explicitly. I can look into this right now.
When the broker is stopped, the envelope.commit()
fails with org.apache.activemq.broker.BrokerStoppedException
that even invalidates the JMS connection and connection.close()
fails with jms.JMSException: Disposed due to prior exception
.
So it might not be closing of connections/sessions that can solve this.
I started on a test to show this scenario, but I still need to add persistence to the broker so that non-committed messages show after a restart.
In the code I used for testing I was using a failover connection (failover:(tcp://localhost:61616)
), which might cause this to behave differently.
I re-ran the test (see the link to the code in the issue description) without using failover which seems to improve the situation, all messages seem to be consumable afterwards and I see the "Disposed due to prior exception" exception message.
I guess that the failover connections are not invalidated and must be explicitly closed when such an error occurs.
Also I am not getting a org.apache.activemq.broker.BrokerStoppedException
I guess this only happens with in-memory brokers, since on the tcp connection it is not possible to distinguish a stopped broker and an unreachable broker.
Good point. I need to carry on with other stuff. I'll push the unfinished test case.
See #2104
I am using alpakka-jms 1.1.2 and akka-2.5.27 and was experimenting with failure recovery.
It appears that if I restart the activemq broker, a failure occurs when committing the message (as expected). In this case akka-streams logs the message "Restarting graph due to failure." From this point onwards the graph does seem to recover, but the original consumer is not closed properly. When debugging I observe that handleError.invoke(e) is called, but this does not seem to close the consumer/session
The activemq web interface lists 2 consumers for the same queue after activemq was restarted. I also observe that the first consumer seems to be caching 1000 messages (this is something that seems to happen in the activemq client). These messages are still being delivered to the alpakka consumer, and it is blocking until the message is acknowledged (which is never going to happen since the stream failed) or until the ackTimeout occurs.
Even when the ackTimeout does occur, it appears that the next message is handled, setting the
failStreamOnAckTimeout
to true does not seem to make any difference.I would expect the consumer to stop consuming messages, and close itself when it fails or when it is cancelled.
The code I use to reproduce this issue https://gist.github.com/WellingR/0c2b38fc8f9e04c8e95c730f83ee0147
The following snippet is how I create the TX consumer