Closed juanavelez closed 3 years ago
Not sure how any streaming API can help since we still need to schedule tasks for those messages to release in the memory. So, eventually we still going to end up with the OOM error.
I need to learn more how to deal with OOM situaitons since this one is not a first asked. We have similar problem with an aggregator when it releases groups pulling data from DB, too.
We probably may look into a paging support instead. Although I don't know if that is possible with Redis and Gemfire since the message group is a single entry in those stores...
Would you mind to share with us though why do you need to delay so many messages? May be you just need to have a persistent messaging middleware in between your components? For example RabbitMQ. And what is interesting it has a delayed exchange feature: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange. But again: what is the point for delaying messages in your logic?
I can’t go into specifics but I will try to explain a similar scenario: let’s say you are using SI to read off a queue/topic (Kafka, rabbit, jms, etc) and while processing those messages you need to call an external system. That external system might have an issue and responds with “try later”. This is fine, so we delay those messages. Issue is the rate at which we send to such external system is measured in 10s of thousands per second. We had a situation where we had to stop the instance that was processing those delayed messages but when it came back on, it kept crashing because of OOM. Again, the issue is not scheduling those messages but at startup loading them all in memory. I think using some pagination as part of the iteration implementation might solve the issue. I am on leave until January but will try to work on a suggestion
A much better solution is to stop consuming from the source when this happens and start consuming again when the problem is resolved. Dumping them all into Mongo is not at all the right solution for this scenario.
A much better solution is to stop consuming from the source when this happens and start consuming again when the problem is resolved. Dumping them all into Mongo is not at all the right solution for this scenario.
I think you might have missed the point. It's ok to say it won't be fixed. Thank you
I haven’t missed the point at all, it just seems odd to me to move the data from one place to another just because you can’t process it right now.
Just exploring alternatives, not saying it can’t or won’t be “fixed”. It just was not intended for such scenarios.
The delay handler facility works well for us in most cases, when the number of rescheduled messages to be processed at a later time is small, and messages are scheduled on the fly. We only run into issues in the scenario described by Juan, and the problem is exacerbated by the current implementation of DelayHandler.reschedulePersistedMessages()
.
Most likely what happens is that blindly calling messageGroup.getMessages()
causes the entire content of the delay store collection to be loaded into memory eagerly, due to ConfigurableMongoDbMessageStore.getMessagesForGroup()
having been implemented in terms of MongoTemplate.find()
, which effectively faults in the whole data set. If you were to consider introducing a streaming counterpart of that method using MongoTemplate.stream()
, and then refactor PersistentMessageGroup.PersistentCollection
in terms of that method, then effectively you get a streaming iterator under the hood inside reschedulePersistedMessages()
. I would also argue as a side note that the "lazy" loading of messages in the current implementation is in fact not lazy at all, but rather delayed.
I'm going to stop short of proposing an actual patch here, though conceivably could do so if requested. But I think having a streaming semantics behind scheduling persisted messages would actually help us here, because most of the loaded messages would likely not need to be rescheduled but rather released immediately (typically, the delay desired is rather small compared to the time elapsed since last write to the delay store, and container restart).
Thank you for feedback and sharing your pain!
I think we can give it a try and really provide an alternative streaming API for persistent message stores where underlying library has such a feature.
MongoTemplate
):
/**
* Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB
* {@link com.mongodb.client.FindIterable}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable} that needs to
* be closed.
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification. Must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param <T> element return type
* @return will never be {@literal null}.
* @since 1.7
*/
<T> CloseableIterator<T> stream(Query query, Class<T> entityType);
JdbcTemplate
):
/**
* Query using a prepared statement, mapping each row to a result object
* via a RowMapper, and turning it into an iterable and closeable Stream.
* <p>A PreparedStatementCreator can either be implemented directly or
* configured through a PreparedStatementCreatorFactory.
* @param psc a callback that creates a PreparedStatement given a Connection
* @param rowMapper a callback that will map one object per row
* @return the result Stream, containing mapped objects, needing to be
* closed once fully processed (e.g. through a try-with-resources clause)
* @throws DataAccessException if there is any problem
* @see PreparedStatementCreatorFactory
* @since 5.3
*/
<T> Stream<T> queryForStream(PreparedStatementCreator psc, RowMapper<T> rowMapper) throws DataAccessException;
AbstractKeyValueMessageStore
impls) has currently an API like this:
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
ArrayList<Message<?>> messages = new ArrayList<>();
if (groupMetadata != null) {
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
while (messageIds.hasNext()) {
messages.add(getMessage(messageIds.next()));
}
}
return messages;
}
I think it is not a big deal to turn this logic into a Stream
implementation.
Probably with that we won't need a PersistentMessageGroup
intermediary any more 😄 .
Does it make sense for you?
@artembilan Yes, I think this is a good approach. Thank you!
Yeah, I think it should work also, and at least for the message rescheduling part you could short-circuit the MessageGroup
API, it seems.
In what version(s) of Spring Integration are you seeing this issue?
5.3.1.RELEASE
Describe the bug
If you have set up a DelayHandler using a ConfigurableDbMessageStore and the MongoDB collection behind it is very big (e.g. 1M records), upon startup the DelayHandler will try to re-schedule all messages in the delay store. Because we are using ConfigurableDbMessageStore which extends AbstractMessageGroupStore and also lazyLoadMessageGroups, this class, sets the persistentMessageGroupFactory as a SimpleMessageGroupFactory(SimpleMessageGroupFactory.GroupType.PERSISTENT) which returns PersistentMessageGroup. When the PersistentMessageGroup is called to retrieve the messages, it loads ALL messages in memory before returning the iterator
https://github.com/spring-projects/spring-integration/blob/08316577cea0be542559b6510d7ef5e0d9c87aa7/spring-integration-core/src/main/java/org/springframework/integration/store/PersistentMessageGroup.java#L171
https://github.com/spring-projects/spring-integration/blob/08316577cea0be542559b6510d7ef5e0d9c87aa7/spring-integration-core/src/main/java/org/springframework/integration/store/PersistentMessageGroup.java#L186
https://github.com/spring-projects/spring-integration/blob/08316577cea0be542559b6510d7ef5e0d9c87aa7/spring-integration-core/src/main/java/org/springframework/integration/store/PersistentMessageGroup.java#L217
https://github.com/spring-projects/spring-integration/blob/08316577cea0be542559b6510d7ef5e0d9c87aa7/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java#L598
https://github.com/spring-projects/spring-integration/blob/08316577cea0be542559b6510d7ef5e0d9c87aa7/spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java#L570 Depending on the available memory, OOM happens.
To Reproduce
Create a collection behind the DelayHandler/ConfigurableDbMessageStore and load it with millions of records. Start an application that makes use of the DelayHandler
Expected behavior
We believe it is OK to reschedule messages upon startup but not loading them all at once in memory. Maybe doing it via MongoDB streaming API?