Open SebastianKuehnau opened 3 years ago
Good observation!
This is expected in Spring if OSIV (Open Session In View) is disabled and you return the stream directly from the repository. The same would happen with the stream returned from an AbstractBackendDataProvider
for a Grid, since the stream is consumed by the component which can't be transactional without OSIV.
Some considerations:
So I wouldn't change the return type, but instead I'll delegate the stream terminal operation to an overridable persister method, so that the user can make it transactional if not using OSIV.
For example:
public interface CollaborationMessagePersister {
Stream<CollaborationMessage> fetchMessages(FetchQuery query);
void persistMessage(PersistRequest request);
default void fetchAndConsumeMessages(FetchQuery query, Consumer<CollaborationMessage> consumer) {
fetchMessages(query).forEach(consumer);
}
}
@Service
public class ScrollablePersister implements CollaborationMessagePersister {
private final MessageRepository messageRepository;
public ScrollablePersister(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
@Override
@Transactional(readOnly = true)
public Stream<CollaborationMessage> fetchMessages(FetchQuery fetchQuery) {
String topicId = fetchQuery.getTopicId();
Instant since = fetchQuery.getSince();
return messageRepository.streamByTopicSince(topicId, since).map(Message::toCollaborationMessage);
}
@Override
@Transactional
public void persistMessage(PersistRequest persistRequest) {
String topicId = persistRequest.getTopicId();
CollaborationMessage message = persistRequest.getMessage();
messageRepository.save(new Message(topicId, message));
}
@Override
@Transactional(readOnly = true)
public void fetchAndConsumeMessages(FetchQuery fetchQuery, Consumer<CollaborationMessage> consumer) {
fetchMessages(fetchQuery).forEach(consumer);
}
}
Then CollaborationMessageList
would use that method to consume the stream inside the transaction:
FetchQuery query = new FetchQuery(this, topicId, since);
persister.fetchAndConsumeMessages(query, list::append);
If there's no interest in using stream optimization you can just return a list from the repository instead of returning a stream and then collecting it in the persister.
Of course, this should be well documented to avoid misleading the user to the missing transaction exception!
The alternative is that you just learn that this is one small extra thing to do. The same implementation would then look like this
@Service
public class ScrollablePersister implements CollaborationMessagePersister {
private final MessageRepository messageRepository;
public ScrollablePersister(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
@Override
@Transactional(readOnly = true)
public Stream<CollaborationMessage> fetchMessages(FetchQuery fetchQuery) {
String topicId = fetchQuery.getTopicId();
Instant since = fetchQuery.getSince();
Stream<CollaborationMessage> stream = messageRepository.streamByTopicSince(topicId, since).map(Message::toCollaborationMessage);
return stream.collect(Collectors.toList()).stream();
}
@Override
@Transactional
public void persistMessage(PersistRequest persistRequest) {
String topicId = persistRequest.getTopicId();
CollaborationMessage message = persistRequest.getMessage();
messageRepository.save(new Message(topicId, message));
}
}
Description Using streams as return value type of
fetchMessages
would implicate to create a data fetching method in the backend with a stream as return value as well or at least leading the developer into that direction. In combination with a Spring Data JPA Backend this could lead to the exceptionorg.springframework.dao.InvalidDataAccessApiUsageException
after making further requests on the result set to reduce, map or filter the fetched data.The whole exception message would be: "You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."
Solution Returning a
java.util.Collection
would prevent that exception and you could still convert the return value into a stream and make further reductions by filtering, mapping or processing.Alternatives Another option I considered and which I used in the end as a workaround, I created a backend call which return a list and convert it in a stream, made further processing with the result set.