apache / incubator-seata

:fire: Seata is an easy-to-use, high-performance, open source distributed transaction solution.
https://seata.apache.org/
Apache License 2.0
25.08k stars 8.73k forks source link

2.0.0集成skywalking9.0.0时skywalking-api.log输出异常 #6611

Open vincent50411 opened 3 weeks ago

vincent50411 commented 3 weeks ago

Ⅰ. Issue Description

2.0.0 /ext/apm-seata-skywalking-plugin在实现interceptor时,入参类型转换异常

Ⅱ. Describe what happened

If there is an exception, please attach the exception trace:

ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.processor.server.ServerHeartbeatProcessor] after method[process] intercept failure java.lang.ClassCastException: io.netty.channel.DefaultChannelHandlerContext cannot be cast to io.seata.core.protocol.RpcMessage at io.seata.apm.skywalking.plugin.RemotingProcessorProcessInterceptor.afterMethod(RemotingProcessorProcessInterceptor.java:70) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97) at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.process(ServerHeartbeatProcessor.java) at io.seata.core.rpc.netty.AbstractNettyRemoting.processMessage(AbstractNettyRemoting.java:306) at io.seata.core.rpc.netty.AbstractNettyRemotingServer$ServerHandler.channelRead(AbstractNettyRemotingServer.java:169) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)

ERROR 2024-06-13 14:02:26.871 NettyServerNIOWorker_1_2_2 InstMethodsInter : class[class io.seata.core.rpc.netty.NettyRemotingServer] after method[sendAsync] intercept failure java.lang.ClassCastException: io.netty.channel.socket.nio.NioSocketChannel cannot be cast to io.seata.core.protocol.RpcMessage at io.seata.apm.skywalking.plugin.NettyRemotingClientSendSyncInterceptor.afterMethod(NettyRemotingClientSendSyncInterceptor.java:71) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:97) at io.seata.core.rpc.netty.AbstractNettyRemoting.sendAsync(AbstractNettyRemoting.java) at io.seata.core.rpc.netty.AbstractNettyRemotingServer.sendAsyncResponse(AbstractNettyRemotingServer.java:104) at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22(ServerHeartbeatProcessor.java:48) at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor.$sw$original$process$hr32d22$accessor$$sw$aeun762(ServerHeartbeatProcessor.java) at io.seata.core.rpc.processor.server.ServerHeartbeatProcessor$$sw$auxiliary$in3hfi1.call(Unknown Source) 还有【beforeMethod】方法类型转换异常

Ⅲ. Describe what you expected to happen

主要的异常产生位置为: DefaultCoreDoGlobalCommitInterceptor、NettyRemotingClientSendSyncInterceptor、RemotingProcessorProcessInterceptor和SWSeataUtils

DefaultCoreDoGlobalCommitInterceptor类中: public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { RpcMessage rpcMessage = null; for(Object arg:allArguments) { if (arg instanceof RpcMessage) { rpcMessage = (RpcMessage) arg; if (!(rpcMessage.getBody() instanceof AbstractMessage)) { return; } } }

    /* 异常产生位置
    RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (!(rpcMessage.getBody() instanceof AbstractMessage)) {
        return;
    }*/

。。。 @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { for(Object arg:allArguments){ if(arg instanceof RpcMessage){ RpcMessage rpcMessage = (RpcMessage) arg; if (rpcMessage.getBody() instanceof AbstractMessage) { ContextManager.stopSpan(); } } }

    /* 异常产生位置
    RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/
    return ret;
}

NettyRemotingClientSendSyncInterceptor类中: @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { for(Object arg:allArguments){ if(arg instanceof RpcMessage){ RpcMessage rpcMessage = (RpcMessage) arg; if (rpcMessage.getBody() instanceof AbstractMessage) { ContextManager.stopSpan(); } } }

    /*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/

    return ret;
}

RemotingProcessorProcessInterceptor类中: @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { for(Object arg:allArguments){ if(arg instanceof RpcMessage){ RpcMessage rpcMessage = (RpcMessage) arg; if (rpcMessage.getBody() instanceof AbstractMessage) { ContextManager.stopSpan(); } } }

    /*RpcMessage rpcMessage = (RpcMessage) allArguments[0];
    if (rpcMessage.getBody() instanceof AbstractMessage) {
        ContextManager.stopSpan();
    }*/

    return ret;
}

SWSeataUtils类中: public static String convertXid(RpcMessage rpcMessage) { String xid = null; // 调整后 if(rpcMessage != null && rpcMessage.getBody() != null){ if(rpcMessage.getBody() instanceof AbstractMessage){ AbstractMessage subMessage = (AbstractMessage) rpcMessage.getBody(); String requestSimpleName = rpcMessage.getBody().getClass().getSimpleName();

            try {
                xid = SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName) != null
                        ? (String) SWSeataConstants.TRANSACTION_TRANSMISSION_CLASS_NAME_MAPPING.get(requestSimpleName)
                        .getDeclaredMethod("getXid").invoke(subMessage)
                        : xid;
            } catch (Throwable e) {
                LOGGER.error("convert seata xid failure", e);
            }
        }
    }

    return xid;
}

以上是测试通过后的代码调整,仅供参考

Ⅵ. Environment: