spring-projects / spring-amqp

Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ
https://spring.io/projects/spring-amqp
Apache License 2.0
808 stars 624 forks source link

RabbitTemplate.send throw java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk #1337

Open CrackerGit opened 3 years ago

CrackerGit commented 3 years ago

spring-rabbit 2.1.13

Question

I use RabbitTemplate to send messages where the caching mode is Connection and the number of cached connections is less than the maximum number of connections. At this time I used 50 concurrent pressure test abnormal

java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk
artembilan commented 3 years ago

Can you show, please, more stack trace to see what Spring AMQP code is involved in the problem? Any chances to have something from your what we can run on our side and reproduce?

garyrussell commented 3 years ago

Also, 2.1.x is at end of life; consider upgrading to 2.2.16.RELEASE or 2.3.6.

https://spring.io/projects/spring-amqp#learn

CrackerGit commented 3 years ago

this is my code

package com.wppilu.learn.spring.amqp;

import lombok.extern.slf4j.Slf4j;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import java.nio.charset.StandardCharsets;

@BenchmarkMode(Mode.Throughput)
@Threads(100)
@State(Scope.Benchmark)
@Warmup(iterations = 3,time = 5)
@Measurement(iterations = 3,time = 30)
@SpringBootApplication
@Slf4j
public class RabbitMqTemplatePerf {
    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder()
            .include(RabbitMqTemplatePerf.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(options).run();
    }

    @RabbitListener(queues = {"msg-test"})
    public void handle(Message message){

    }

    private ConfigurableApplicationContext context;
    private RabbitTemplate rabbitTemplate;

    @Setup
    public void init(){
         context = SpringApplication.run(RabbitMqTemplatePerf.class);
        rabbitTemplate=context.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

        });
    }

    @TearDown
    public void destroy(){
        context.close();
    }

    @Benchmark
    public void send(){
        try{
            rabbitTemplate.send("test","a.b", MessageBuilder.withBody("11".getBytes(StandardCharsets.UTF_8)).build());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

with config

spring:
    rabbitmq:
        publisher-confirms: true
        cache:
            connection:
                mode: connection
                size: 10
            channel:
                checkout-timeout: 30000
                size: 2000

cause :

java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:677)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:668)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:627)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:518)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1700(CachingConnectionFactory.java:102)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1380)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2079)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2047)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:996)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:989)
garyrussell commented 3 years ago

Appears to be a a problem in the amqp-client; the openOk is returned for the open request and it is somehow coming as a reply to the confirmSelect(). I suggest you ask about it on the rabbitmq-users Google group. Make sure you tell them what amqp-client version you are using.

Have you tried upgrading to a supported Spring AMQP version? They use newer clients.

CrackerGit commented 3 years ago

OK, I have tried the new version and this problem does not appear. I'm going to ask rabbitmq-users. I'm just curious about the cause of the problem. My previous guess is that if a connection exceeds the cache size, it will be physically closed, and there is a possibility to get the same Channel (one closed and one unclosed) while sending messages using the same connection. CachingConnectionFactory doesn't seem to have this problem.

CrackerGit commented 3 years ago

I have a detailed look at the code and found still have this problem. In 'CachingConnectionFactory.doCreateBareChannel' And 'CachedChannelInvocationHandler.doReturnToCache' no concurrent control, When the number of connections over the cache size and began to decrease when the 'CachingConnectionFactory.this.active' = false, the Channel will be physically close. ChannelManager also has no strict control over Channel declaration and destruction, so when I get a Channel I can get a cached Channel first, and then the Channel is closed and a new thread gets the same Channel

garyrussell commented 3 years ago

It's not clear what you mean; createBareChannel is only called if there are no cached channels or if a channel proxy in the cache has a target that is closed.

Please explain further.

freshgeek commented 3 years ago

我详细看了一下代码,发现还是有这个问题。在'CachingConnectionFactory.doCreateBareChannel' 和'CachedChannelInvocationHandler.doReturnToCache'没有并发控制, 当连接数超过缓存大小并且'CachingConnectionFactory.this.active'=false时开始减少, Channel就会物理关闭。 ChannelManager对Channel的声明和销毁也没有严格的控制, 所以拿到Channel的时候可以先拿到一个缓存的Channel,然后关闭Channel,新线程拿到同一个Channel

@CrackerGit 请问下你后面怎么解决的,是不是这个配置也有问题,如果在多线程环境下运行

CrackerGit commented 3 years ago

To avoid this problem, I use the 'connectionLimit' property of CachingConnectionFactory to make it the same as 'connectionCacheSize'

But I think it needs to be fixed

garyrussell commented 3 years ago

It is still not clear to me how two threads can get the same channel - I must be missing something - can you elaborate?