dromara / dynamic-tp

🔥🔥🔥轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持Nacos、Apollo,Zookeeper、Consul、Etcd,可通过SPI自定义实现)。Lightweight dynamic threadpool, with monitoring and alarming functions, base on popular config centers (already support Nacos、Apollo、Zookeeper、Consul, can be customized through SPI).
https://dynamictp.cn
Apache License 2.0
3.77k stars 763 forks source link

引入 rabbitmq adapter 后,使用 @RabbitListener 等注解预先定义好监听后,在项目启动后,再使用 RabbitListenerEndpointRegistry 注册监听时,出现 [java.util.concurrent.RejectedExecutionException] #415

Open 457352727 opened 6 months ago

457352727 commented 6 months ago

版本信息

问题描述

引入 rabbitmq adapter 后,使用 @RabbitListener 等注解预先定义好监听后,在项目启动后,再使用 RabbitListenerEndpointRegistry 注册监听时,出现 [java.util.concurrent.RejectedExecutionException]。

import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;

@Configuration @AutoConfigureAfter({RabbitAutoConfiguration.class}) @AutoConfigureBefore({RabbitMqTpAutoConfiguration.class}) @EnableRabbit public class AmqpConfig {

private static final AtomicInteger NUMBER = new AtomicInteger(0);
private static final String DEFAULT_NAME = "rabbitmqPool";
private static final String DEFAULT_ALIAS_NAME_PREFIX = "rabbitmq-pool-";
private final Environment environment;
private final AbstractConnectionFactory connectionFactory;
private final RabbitTemplate rabbitTemplate;
private final RabbitMqDtpAdapter rabbitMqDtpAdapter;
private final DirectRabbitListenerContainerFactory listenerContainerFactory;

public AmqpConfig(Environment environment, AbstractConnectionFactory connectionFactory,
                  RabbitTemplate rabbitTemplate, RabbitMqDtpAdapter rabbitMqDtpAdapter,
                  DirectRabbitListenerContainerFactory listenerContainerFactory) {
    this.environment = environment;
    this.connectionFactory = connectionFactory;
    this.rabbitTemplate = rabbitTemplate;
    this.rabbitMqDtpAdapter = rabbitMqDtpAdapter;
    this.listenerContainerFactory = listenerContainerFactory;
}

@PostConstruct
public void afterPropertiesSet() {
    SimplePropertyValueConnectionNameStrategy cns = new SimplePropertyValueConnectionNameStrategy("spring.application.name");
    cns.setEnvironment(environment);
    connectionFactory.setConnectionNameStrategy(cns);

    ThreadPoolExecutor subscribePool = rabbitmqSubscribePool();
    System.out.println(subscribePool);

    connectionFactory.setExecutor(subscribePool);

    // set dynamic-tp proxy
    String publishTpName = generatePublishThreadPoolName();
    ThreadPoolExecutor rabbitmqPublishPool = rabbitmqPublishPool();
    ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(rabbitmqPublishPool);
    ConnectionFactory publisherConnectionFactory = connectionFactory.getPublisherConnectionFactory();
    if (publisherConnectionFactory instanceof AbstractConnectionFactory) {
        ((AbstractConnectionFactory) publisherConnectionFactory).setExecutor(proxy);
    }
    Map<String, ExecutorWrapper> executorWrappers = rabbitMqDtpAdapter.getExecutorWrappers();
    executorWrappers.put(publishTpName, new ExecutorWrapper(publishTpName, proxy));
    DtpLifecycleSupport.shutdownGracefulAsync(rabbitmqPublishPool, publishTpName, 5);

    rabbitTemplate.setUsePublisherConnection(true);
    rabbitTemplate.setConfirmCallback(new AmqpConfirmCallback());
    rabbitTemplate.setReturnCallback(new AmqpReturnCallback());

    listenerContainerFactory.setConnectionFactory(connectionFactory);
}

private ThreadPoolExecutor rabbitmqSubscribePool() {
    return assembleThreadPoolExecutor("rabbitMqTp#rabbitConnectionFactory");
}

private ThreadPoolExecutor rabbitmqPublishPool() {
    return assembleThreadPoolExecutor(generatePublishThreadPoolName());
}

private ThreadPoolExecutor assembleThreadPoolExecutor(String name) {
    List<TpExecutorProps> propsList = DtpProperties.getInstance().getRabbitmqTp();

    if (Objects.isNull(propsList)) {
        return getDefaultThreadPoolExecutor();
    }

    TpExecutorProps props = propsList.stream()
            .filter(i -> name.equals(i.getThreadPoolName()))
            .findFirst()
            .orElse(null);

    if (Objects.isNull(props)) {
        return getDefaultThreadPoolExecutor();
    }

    DtpExecutor dtpExecutor = new DtpExecutor(props.getCorePoolSize(), props.getMaximumPoolSize(), props.getKeepAliveTime(), props.getUnit(),
            new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    dtpExecutor.setThreadPoolName(props.getThreadPoolName());
    dtpExecutor.setThreadPoolAliasName(props.getThreadPoolAliasName());
    dtpExecutor.setRunTimeout(props.getRunTimeout());
    dtpExecutor.setQueueTimeout(props.getQueueTimeout());
    dtpExecutor.setNotifyEnabled(props.isNotifyEnabled());
    return dtpExecutor;
}

private ThreadPoolExecutor getDefaultThreadPoolExecutor() {
    DtpExecutor dtpExecutor = new DtpExecutor(30, 60, 60, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
            Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    int i = NUMBER.getAndIncrement();
    dtpExecutor.setThreadPoolName(DEFAULT_NAME + i);
    dtpExecutor.setThreadPoolAliasName(DEFAULT_ALIAS_NAME_PREFIX + i);
    dtpExecutor.setRunTimeout(2000);
    dtpExecutor.setQueueTimeout(2000);
    dtpExecutor.setNotifyEnabled(false);
    return dtpExecutor;
}

public static String generatePublishThreadPoolName() {
    return "rabbitMqTp#rabbitConnectionFactory#publish";
}

}

2. 定义监听
```java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@Order
@Slf4j
public class DefaultDeadLetterListener {

    @RabbitListener(queues = "#{@commonDeadLetterQueue.getName()}", containerFactory = "rabbitListenerContainerFactory")
    public void processDeadLetterPublicMessage(org.springframework.amqp.core.Message message,
                                               @Header(value = "#{T(com.rs.cloud.business.mns.constant.MessageHeaderKey).CONTENT_TYPE_ID}",
                                                       required = false) String contentClassName) {
        log.info("MESSAGE ###: {}", message);
        log.info("contentClassName: {}", contentClassName);
    }
}
  1. 使用 RabbitListenerEndpointRegistry 动态注册监听
    // ...
    RabbitListenerEndpointRegistry.registerListenerContainer(endpoint, listenerContainerFactory, true);
    // ...
    • 报错信息:如下图 图片
    • 猜测可能原因: 启动时,使用 AmqpConfig 中 rabbitmqSubscribePool() 返回的线程池启动了监听容器,后续因为引入了 dynamic-tp-spring-boot-starter-adapter-rabbitmq ,rabbitmqSubscribePool() 返回的线程池被 shutdown,被 ThreadPoolExecutorProxy 替代,但是项目启动时生成的监听使用的 connection 是 rabbitmqSubscribePool() 返回的线程池,而不是 ThreadPoolExecutorProxy

      复现步骤

      按照上述编写完代码后,手动调用RabbitListenerEndpointRegistry.registerListenerContainer(endpoint, listenerContainerFactory, true)

其他信息

yanhom1314 commented 6 months ago

怎么感觉你使用的有些复杂了,connectionFactory的executor你就不要手动设置了,让adapter自动去创建Proxy替换,是不就可以了

457352727 commented 6 months ago

RabbitMqDtpAdapter 这个类判断 connectionFactory 中的 executor 不是 null 才会创建 Proxy 替换,但是在 2.1.9.RELEASE 版本的 Spring AMQP 中,如果不手动设置 connectionFactory 的 executor ,其默认就是 null