vaadin / collaboration-engine

The simplest way to build real-time collaboration into web apps
https://vaadin.com/collaboration
Other
3 stars 1 forks source link

Change return value type of "fetchMessages" to Collection #28

Open SebastianKuehnau opened 3 years ago

SebastianKuehnau commented 3 years ago

Description Using streams as return value type of fetchMessages would implicate to create a data fetching method in the backend with a stream as return value as well or at least leading the developer into that direction. In combination with a Spring Data JPA Backend this could lead to the exception org.springframework.dao.InvalidDataAccessApiUsageException after making further requests on the result set to reduce, map or filter the fetched data.

The whole exception message would be: "You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."

Solution Returning a java.util.Collection would prevent that exception and you could still convert the return value into a stream and make further reductions by filtering, mapping or processing.

Alternatives Another option I considered and which I used in the end as a workaround, I created a backend call which return a list and convert it in a stream, made further processing with the result set.

heruan commented 3 years ago

Good observation!

This is expected in Spring if OSIV (Open Session In View) is disabled and you return the stream directly from the repository. The same would happen with the stream returned from an AbstractBackendDataProvider for a Grid, since the stream is consumed by the component which can't be transactional without OSIV.

Some considerations:

So I wouldn't change the return type, but instead I'll delegate the stream terminal operation to an overridable persister method, so that the user can make it transactional if not using OSIV.

For example:

public interface CollaborationMessagePersister {

    Stream<CollaborationMessage> fetchMessages(FetchQuery query);

    void persistMessage(PersistRequest request);

    default void fetchAndConsumeMessages(FetchQuery query, Consumer<CollaborationMessage> consumer) {
        fetchMessages(query).forEach(consumer);
    }
}
@Service
public class ScrollablePersister implements CollaborationMessagePersister {

    private final MessageRepository messageRepository;

    public ScrollablePersister(MessageRepository messageRepository) {
        this.messageRepository = messageRepository;
    }

    @Override
    @Transactional(readOnly = true)
    public Stream<CollaborationMessage> fetchMessages(FetchQuery fetchQuery) {
        String topicId = fetchQuery.getTopicId();
        Instant since = fetchQuery.getSince();
        return messageRepository.streamByTopicSince(topicId, since).map(Message::toCollaborationMessage);
    }

    @Override
    @Transactional
    public void persistMessage(PersistRequest persistRequest) {
        String topicId = persistRequest.getTopicId();
        CollaborationMessage message = persistRequest.getMessage();
        messageRepository.save(new Message(topicId, message));
    }

    @Override
    @Transactional(readOnly = true)
    public void fetchAndConsumeMessages(FetchQuery fetchQuery, Consumer<CollaborationMessage> consumer) {
        fetchMessages(fetchQuery).forEach(consumer);
    }
}

Then CollaborationMessageList would use that method to consume the stream inside the transaction:

FetchQuery query = new FetchQuery(this, topicId, since);
persister.fetchAndConsumeMessages(query, list::append);

If there's no interest in using stream optimization you can just return a list from the repository instead of returning a stream and then collecting it in the persister.

Of course, this should be well documented to avoid misleading the user to the missing transaction exception!

Legioth commented 3 years ago

The alternative is that you just learn that this is one small extra thing to do. The same implementation would then look like this

@Service
public class ScrollablePersister implements CollaborationMessagePersister {

    private final MessageRepository messageRepository;

    public ScrollablePersister(MessageRepository messageRepository) {
        this.messageRepository = messageRepository;
    }

    @Override
    @Transactional(readOnly = true)
    public Stream<CollaborationMessage> fetchMessages(FetchQuery fetchQuery) {
        String topicId = fetchQuery.getTopicId();
        Instant since = fetchQuery.getSince();
        Stream<CollaborationMessage> stream = messageRepository.streamByTopicSince(topicId, since).map(Message::toCollaborationMessage);
        return stream.collect(Collectors.toList()).stream();
    }

    @Override
    @Transactional
    public void persistMessage(PersistRequest persistRequest) {
        String topicId = persistRequest.getTopicId();
        CollaborationMessage message = persistRequest.getMessage();
        messageRepository.save(new Message(topicId, message));
    }
}