xuxueli / xxl-job

A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
http://www.xuxueli.com/xxl-job/
GNU General Public License v3.0
27.35k stars 10.82k forks source link

执行器支持自定义回调超时时间 #1386

Open alexander-wong-tech opened 4 years ago

alexander-wong-tech commented 4 years ago

问题来自于 #1379 ,追到源码是 com.xxl.job.core.biz.client.AdminBizClient 中的三个方法超时时间太短:硬编码为3秒。

@override public ReturnT callback(List callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList,3); }

@override public ReturnT registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3); }

@override public ReturnT registryRemove(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3); }

而调用此类的类是 com.xxl.job.core.executor.XxlJobExecutor 中的方法 private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { ...... AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); ...... }

也是硬编码方式创建,没有注入到spring容器,进而无法使用在AdminBizClient类加上 @Component + @Value("xxl.job.client.http.timeout")注入值这样的方法进行超时时间配置。

最后解决方案是:我硬编码将3秒修改为30秒,替换class字节码到jar包解决。

希望作者能够重构通过注解配置动态设置此参数。

rick-2023 commented 3 years ago

请问这个自定义超时时间何时能用上

alexander-wong-tech commented 3 years ago

我目前没有关注了,等待官方答复。

RockyLOMO commented 3 years ago

有解决方案了嘛?

qipengfei0217 commented 2 years ago

之前做项目也遇到过这个问题,用的是2.2.0版本,环境也是经常报read time out。后来,排查问题发现这个timeout是写死的,不能进行动态变更。我们环境的admin平台目前没有升级的准备,所以客户端也不敢贸然升级,怕出问题。所以,只能采用反射去修改这个timeout的值,做一个特殊的处理。希望作者@xuxueli后期能把这些参数做成可配置的。 目前的解决方案也很简单,就是在AdminBizClient初始化好后,对这些象采取反射处理,将timeout的值扩大。首先要继承XxlJobSpringExecutor类,然后复写afterSingletonsInstantiated()方法,接着在注入执行器的时候,采用自己实现的类。 实现如下:

@Slf4j
@ConditionalOnProperty(value = "service.xxlJob.enabled", havingValue = "true", matchIfMissing = true)
@Configuration
public class XxlJobConfig implements EnvironmentAware {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken:#{null}}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address:#{null}}")
    private String address;

    @Value("${xxl.job.executor.ip:#{null}}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath:#{null}}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays:30}")
    private int logRetentionDays;

    /**
     * 超时时间
     */
    private static Integer readTimeout;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringCustomizeExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    @Override
    public void setEnvironment(Environment environment) {
        readTimeout = environment.getProperty("xxl.job.executor.read.timeout", Integer.class, 10);
    }

    /**
     * 定制化XxlJobSpringExecutor
     */
    public static class XxlJobSpringCustomizeExecutor extends XxlJobSpringExecutor {
        @Override
        public void afterSingletonsInstantiated() {
            super.afterSingletonsInstantiated();

            // 目前xxl-job不能升级,先通过反射,修改timeout的值
            List<AdminBiz> adminBizList =
                    ListUtils.emptyIfNull(getAdminBizList()).stream().filter(e -> Objects.nonNull(e) && e instanceof AdminBizClient).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(adminBizList)) {
                return;
            }

            // 修改timeout的值
            Field timeoutField = ReflectionUtils.findField(AdminBizClient.class, "timeout");
            if (Objects.isNull(timeoutField)) {
                return;
            }
            ReflectionUtils.makeAccessible(timeoutField);

            adminBizList.forEach(e -> ReflectionUtils.setField(timeoutField, e, readTimeout));
        }
    }
}
lwgo2050 commented 2 years ago

大家解决的解决的办法都集中在增加超时时间,好像都没有去解决问题的本源,什么原因导致client的callback,xxljob server 3秒还不能响应,很多人都碰到了这个问题,所以基本排除了网络的问题,那么剩下的就是callback里面有慢处理了 导致3秒请求都无法响应了

SKYhuangjing commented 2 years ago

TriggerCallbackThread 当接收到大量的回调时, 从源码设计上, 是一次性取出所有的回调(可能是千级别), 提交给 server, server 循环处理, 极其容易出现耗时过久, 推荐官方讲 callback 部分的数据 拆段提交, 减少传输和处理的串行压力

SKYhuangjing commented 2 years ago

TriggerCallbackThread 当接收到大量的回调时, 从源码设计上, 是一次性取出所有的回调(可能是千级别), 提交给 server, server 循环处理, 极其容易出现耗时过久, 推荐官方讲 callback 部分的数据 拆段提交, 减少传输和处理的串行压力

@xuxueli 可以考虑下将 core 中的 Callback drainTo 拿到的大量回调数据分段提交, 其次在 JobApi.callback 设计中, 使用异步处理的线程池来提高 Admin 的响应速度, 也避免客户端调用出现 read timeout

arden-yang commented 1 month ago

这个问题一直没解决吗

lwl888 commented 1 month ago

解决了,牛逼