Tencent / TencentKonaSMSuite

Tencent Kona SM Suite contains a set of Java security providers, which support algorithms SM2, SM3 and SM4, and protocols TLCP/GMSSL, TLS 1.3 (with RFC 8998) and TLS 1.2.
Other
370 stars 73 forks source link

使用1.0.11版本后,报错Caused by: java.net.SocketException: Connection or outbound has closed #654

Closed tlibo closed 10 months ago

tlibo commented 10 months ago

升级1.0.11版本后,持续发送100w笔grpc交易,每当发送到1w笔左右的时候 ,必报以下错误

合约执行交易线程1执行状态:false 已处理数量:8398, 累计耗时:15272 ms
合约执行交易线程1执行状态:false 已处理数量:9117, 累计耗时:16279 ms
合约执行交易线程1执行状态:false 已处理数量:9849, 累计耗时:17283 ms
io.grpc.StatusRuntimeException: UNAVAILABLE
    at io.grpc.Status.asRuntimeException(Status.java:537)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection or outbound has closed
    at com.tencent.kona.sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1299)
    at okio.Okio$1.write(Okio.java:79)
    at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
    at io.grpc.okhttp.AsyncSink$2.doRun(AsyncSink.java:177)
    at io.grpc.okhttp.AsyncSink$WriteRunnable.run(AsyncSink.java:232)
    ... 4 more
合约执行交易线程1执行状态:false 已处理数量:9931, 累计耗时:18294 ms
合约执行交易线程1执行状态:false 已处理数量:9931, 累计耗时:19306 ms

以下是我的调用代码,想使用try catch 打印grpc错误信息,一直没有输出,但控制台又有错误信息打印。

@Slf4j
public class GrpcSend implements Runnable {
    private static volatile boolean isRunning = true; // 共享的停止标志
    Wallet w;
    StreamObserver<RpcOuterClass.TxReq> stream;
    BlockingQueue<Object> quitSendChannel;
    public void stop() {
        isRunning = false;
    }

    public GrpcSend(Wallet w, StreamObserver<RpcOuterClass.TxReq> stream, BlockingQueue<Object> quitSendChannel) {
        this.w = w;
        this.stream = stream;
        this.quitSendChannel = quitSendChannel;
    }
  public void run() {
        try {
            while (!Thread.currentThread().isInterrupted() && isRunning) {
                TxInQueue txInQueue = w.getTxSendQueue().take();
                log.info("send data:{}", txInQueue.getTx());
                byte[] bytes = JSON.toJSONString(txInQueue.getTx()).getBytes(StandardCharsets.UTF_8);
                RpcOuterClass.TxReq req = RpcOuterClass.TxReq.newBuilder().setData(ByteString.copyFrom(bytes)).build();
                try {
                    stream.onNext(req);
                } catch (StatusRuntimeException e) {
                    if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                        log.error("Connection unavailable, message: {}", e.getMessage());
                    } else {
                        log.error("Exception occurred: {}", e.getMessage());
                    }
                }
            }
        } catch (InterruptedException e) {
            // 线程被中断,停止发送
            Thread.currentThread().interrupt();
        } finally {
            stream.onCompleted();
            log.info("Sending completed: {}", stream);
            quitSendChannel.offer(new Object());
        }
    }

}

public class GrpcThread implements Runnable {
 Wallet w;
    RpcGrpc.RpcStub rpcStub;
    private volatile boolean stopThread = false;

    public void stopThread() {
        this.stopThread = true;
    }

    public GrpcThread(Wallet w, RpcGrpc.RpcStub rpcStub) {
        this.w = w;
        this.rpcStub = rpcStub;
    }
 @Override
    public void run() {

        while (!Thread.currentThread().isInterrupted() && !stopThread) {
            try {
                BlockingQueue<Object> quitSendChannel = new ArrayBlockingQueue<>(1);
                StreamObserver<RpcOuterClass.TxReq> stream = rpcStub.putTxSync(new StreamObserver<RpcOuterClass.Response>() {
                    @Override
                    public void onNext(RpcOuterClass.Response resp) {
                         log.info("gprc OnNext:{}", resp);
                        String msg = resp.getMessage();

                    }

                    @Override
                    public void onError(Throwable throwable) {

                        throwable.printStackTrace();
                        quitSendChannel.offer(new Object());
                    }

                    @Override
                    public void onCompleted() {
                        if (w.isDebug()) {
                            log.debug("Server completed sending response");
                        }
                        quitSendChannel.offer(new Object());

                    }
                });

                //发送交易
                GrpcSend grpcSend = new GrpcSend(w, stream, quitSendChannel);
                Thread senderThread = new Thread(grpcSend);
                senderThread.start();
                // 等待发送线程完成
                try {
                    senderThread.join();
                } catch (InterruptedException e) {
                    grpcSend.stop();
                    // 恢复中断状态
                    senderThread.interrupt();
                }

            } catch (Exception e) {
                if (e instanceof StatusRuntimeException) {
                    if (((StatusRuntimeException) e).getStatus().getCode() == Status.Code.UNAVAILABLE || e.getCause() instanceof IOException) {
                        // 处理连接不可用异常
                        log.error("Connection unavailable: {}", e.getMessage());
                        // 重新创建 ManagedChannel 进行重连
                        if (w.isSm()) {
                            w.getSmGrpcClient().recreateManagedChannel();
                        } else {
                            w.getEcGrpcClient().recreateManagedChannel();
                        }
                    }
                }
                log.debug("Failed to establish connection: " + e);

                Thread.currentThread().interrupt(); // 处理中断
            }
        }
    }
}

//创建 ManagedChannel
  private ManagedChannel createManagedChannel(String clusterAddress) throws RuntimeException {
        try {
            // 设置grpc日志级别
//            System.setProperty("com.tencent.kona.ssl.debug", "info");
            // 设置 SSL 相关参数
            System.setProperty("com.tencent.kona.ssl.namedGroups", "curveSM2");
            System.setProperty("com.tencent.kona.ssl.client.signatureSchemes", "sm2sig_sm3");

            Security.addProvider(new KonaCryptoProvider());
            Security.addProvider(new KonaPKIXProvider());
            Security.addProvider(new KonaSSLProvider());
            HostnameVerifier hostnameVerifier = (s, sslSession) -> true;

           //带证书
            SSLContext sslContext = createSSLContext(caPath, clientPath, userPath);

            // 创建 OkHttpChannelBuilder
            return OkHttpChannelBuilder.forTarget(clusterAddress)
                    .sslSocketFactory(sslContext.getSocketFactory())
                    .hostnameVerifier(hostnameVerifier)
                    .tlsConnectionSpec(new String[]{"TLSv1.3"}, new String[]{"TLS_SM4_GCM_SM3"})
                    .executor(Executors.newFixedThreadPool(10)) // Adjust the pool size as needed
                    .keepAliveWithoutCalls(true)
                    .keepAliveTimeout(30, TimeUnit.SECONDS)
                    .build();
        } catch (Exception e) {
            log.error("Error creating managed channel: {}", e.getMessage());
            throw new RuntimeException("Error creating managed channel, " + e.getMessage());
        }
    }
johnshajiang commented 10 months ago

升级1.0.11版本后,持续发送100w笔grpc交易,每当发送到1w笔左右的时候 ,必报以下错误

你的意思是说,升级到最新版本1.0.11之后就出现这个问题了? 之前是没有问题的?那么之前是用的哪个版本?

另外,JDK是什么版本?

tlibo commented 10 months ago

您好: 之前是1.0.10版本,jdk是jdk1.8.0_291,未升级之前的测试100W笔交易是正常的。 这个是升级之前的测试的截图 20240131111706-image

johnshajiang commented 10 months ago

非常感谢你的回复!

我刚才快速浏览了1.0.10之后的commit,暂时没有发现可疑的变化。 https://github.com/Tencent/TencentKonaSMSuite/compare/v1.0.10...v1.0.11

根据你的代码,你是在TLS 1.3协议中使用国密算法,即RFC 8998。 RFC 8998是比较早前实现的,而且相对简单,最近这个版本似乎没有对它有修改。

我可能会删除一些commit,然后发布SNAPSHOT版本,会请你帮助测试 ;-)

你的测试场景是启动少量的线程,每个线程创建一个OkHttp客户端,让它们分别执行100万个任务(发送交易)?

tlibo commented 10 months ago

好的呢 感谢 测试场景是启动两个线程,往同一个队列里面放数据,然后启动单独的线程去消费队列的数据,只有一个OkHttp客户端

johnshajiang commented 10 months ago

@tlibo Maven SNAPSHOT仓库地址:https://oss.sonatype.org/content/repositories/snapshots 我发布了4个SNAPSHOT版本: 1.0.12-481-SNAPSHOT,1.0.12-491-SNAPSHOT,1.0.12-513-SNAPSHOT,1.0.12-555-SNAPSHOT 具体可以看下面的路径:https://oss.sonatype.org/content/repositories/snapshots/com/tencent/kona/

请帮忙测试这些SNAPSHOT版本,谢谢!

tlibo commented 10 months ago

好的呢

tlibo commented 10 months ago

经测试,是我本机电脑的环境出现问题了,我换了几台电脑测试都没有问题了,谢谢您!~

johnshajiang commented 10 months ago

哈,真是意外之喜 ;-)

如果确定没有问题了,请关闭该issue。