unloggedio / unlogged-sdk

Unlogged SDK for recording code execution
https://unlogged.io
Apache License 2.0
152 stars 16 forks source link

Direct invoke fails during batch save operation with @Transactional #63

Open kingkong-AI opened 3 weeks ago

kingkong-AI commented 3 weeks ago

Describe the bug

In case of batch save operation using @Transactional where if the input name is empty, transaction should roll back. Everything works fine using postman and the correct bad request exception is received. But with direct invoke the exception trace is different and unexpected. Seems like an improper serialization issue: Stack Trace:

Caused by: class java.util.LinkedHashMap cannot be cast to class org.unlogged.springwebfluxdemo.model.Player (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; org.unlogged.springwebfluxdemo.model.Player is in unnamed module of loader 'app')
    at java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.unlogged.springwebfluxdemo.service.PlayerService.saveAll(PlayerService.java:72)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:352)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport$ReactiveTransactionSupport.lambda$invokeWithinTransaction$2(TransactionAspectSupport.java:946)
    at reactor.core.publisher.FluxUsingWhen.deriveFluxFromResource(FluxUsingWhen.java:121)
    at reactor.core.publisher.FluxUsingWhen.access$000(FluxUsingWhen.java:54)
    at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:194)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:293)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:188)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:237)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:246)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:293)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:188)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:237)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:239)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at io.asyncer.r2dbc.mysql.internal.util.DiscardOnCancelSubscriber.onComplete(DiscardOnCancelSubscriber.java:84)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
    at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:359)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476)
    at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:273)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
    at io.asyncer.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:361)
    at io.asyncer.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:115)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.asyncer.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:178)
    at io.asyncer.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:81)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:840)

Controller:

    @PostMapping("/batch")
    @ResponseStatus(HttpStatus.CREATED)
    public Flux<Player> batchInsertProducts(@RequestBody List<Player> players) {
        return playerService.saveAll(players);
    }

Service Layer:

@Transactional
    public Flux<Player> saveAll(List<Player> players) {
        players.forEach(player -> player.setId(null));
        return repository.saveAll(players)
                .doOnNext(this::throwStatusExceptionWhenNameIsEmpty);
    }
    private void throwStatusExceptionWhenNameIsEmpty(Player player) {
        if(StringUtil.isNullOrEmpty(player.getName())) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid name");

Repository:

@Repository
public interface PlayerRepository extends ReactiveCrudRepository<Player, Long> {
}

Entity:

import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;

@Table(name = "players")
public class Player {
    @Id
    private Long id;
    private String name;
    private int age;

Reproduction steps

Make batch save request from the batchSave controller in spring webflux maven demo using direct invoke. keep name as empty and only a single Player element.

Expected behavior

Bad Request Exception should be thrown instead of class java.util.LinkedHashMap cannot be cast to class org.unlogged.springwebfluxdemo.model.Player

Additional context

No response