unloggedio / unlogged-sdk

Unlogged SDK for recording code execution
https://unlogged.io
Apache License 2.0
152 stars 16 forks source link

GroupedFlux allows only one Subscriber exception when using Unlogged SDK. #68

Open kingkong-AI opened 2 weeks ago

kingkong-AI commented 2 weeks ago

Describe the bug

Have an implementation of splitter pattern using groupedFlux. Without the sdk, everything works fine, but with sdk an exception is received.

Code: Look at the splitter pattern package in unlogged-spring-webflux-demo Service Code:

package org.unlogged.springwebfluxdemo.integrationpatterns.splitter.service;

import org.springframework.stereotype.Service;
import org.unlogged.springwebfluxdemo.integrationpatterns.splitter.dto.ReservationItemRequest;
import org.unlogged.springwebfluxdemo.integrationpatterns.splitter.dto.ReservationItemResponse;
import org.unlogged.springwebfluxdemo.integrationpatterns.splitter.dto.ReservationResponse;
import org.unlogged.springwebfluxdemo.integrationpatterns.splitter.dto.ReservationType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

@Service
public class ReservationService {

    private final Map<ReservationType, ReservationHandler> map;

    public ReservationService(List<ReservationHandler> list){
        this.map = list.stream().collect(Collectors.toMap(
                ReservationHandler::getType,
                Function.identity()
        ));
    }

    public Mono<ReservationResponse> reserve(Flux<ReservationItemRequest> flux){
        return flux.groupBy(ReservationItemRequest::getType) // splitter
                .flatMap(this::aggregator)
                .collectList()
                .map(this::toResponse);
    }

    private Flux<ReservationItemResponse> aggregator(GroupedFlux<ReservationType, ReservationItemRequest> groupedFlux){
        var key = groupedFlux.key();
        var handler = map.get(key);
        return handler.reserve(groupedFlux);
    }

    private ReservationResponse toResponse(List<ReservationItemResponse> list){
        return ReservationResponse.create(
                UUID.randomUUID(),
                list.stream().mapToInt(ReservationItemResponse::getPrice).sum(),
                list
        );
    }

}
java.lang.IllegalStateException: GroupedFlux allows only one Subscriber
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:721) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8777) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:147) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:83) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.0.jar:3.6.0]
    at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:445) ~[reactor-netty-http-1.1.13.jar:1.1.13]
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:710) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:195) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:456) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62) ~[reactor-netty-core-1.1.13.jar:1.1.13]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:305) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.101.Final.jar:4.1.101.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.101.Final.jar:4.1.101.Final]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]

2024-06-14T16:37:39.899+05:30  WARN 5275 --- [ctor-http-nio-4] r.netty.http.client.HttpClientConnect    : [8d3067e8-1, L:/127.0.0.1:64073 - R:localhost/127.0.0.1:8080] The connection observed an error

Reproduction steps

  1. Run the project: unlogged-spring-webflux-demo with unlogged sdk.
  2. Make postman request to the ReservationController from postman
  3. Input:
    [
    {
        "type": "CONCERT",
        "category": "Luxury",
        "city": "New York",
        "dateTime": "2024-06-14T10:00:00"
    },
    {
        "type": "THEATRE",
        "category": "Economy",
        "city": "Los Angeles",
        "dateTime": "2024-06-15T15:30:00"
    }
    ]

    ...

Expected behavior

Expected Output (as getting when running without using sdk):

{
    "reservationId": "ffd42d4c-1cfb-455f-bb2f-e0c2ad61c967",
    "price": 199,
    "items": [
        {
            "itemId": "12ee6315-15ef-4387-8d04-10a8d36d9070",
            "type": "CONCERT",
            "category": "Luxury",
            "city": "New York",
            "dateTime": [
                2024,
                6,
                14,
                10,
                0
            ],
            "price": 143
        },
        {
            "itemId": "6e8f50fe-de84-4f0b-88ec-e7f5b152109c",
            "type": "THEATRE",
            "category": "Economy",
            "city": "Los Angeles",
            "dateTime": [
                2024,
                6,
                15,
                15,
                30
            ],
            "price": 56
        }
    ]
}

Additional context

No response