xuxueli / xxl-job

A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
http://www.xuxueli.com/xxl-job/
GNU General Public License v3.0
27.45k stars 10.86k forks source link

xxl-job-admin 2.2.1 任务重复执行 #2217

Open 908028664 opened 3 years ago

908028664 commented 3 years ago

k8s 2节点部署, 也就是部署了2个 admin, 同一任务同一时间调度了2次,看ip得知,是这两个admin 都调度了一次该任务, 当该任务 被同一个执行器接收处理时,先到达的请求处理线程停止,返回 [job running, killed] ,后到达的请求处理线程执行成功,看了其他的帖子,都说在quartz.properties 增加 org.quartz.jobStore.acquireTriggersWithinLock: true,我在 执行器 resources 下创建了 quartz.properties 增加 org.quartz.jobStore.acquireTriggersWithinLock: true,然而,没起到作用。

908028664 commented 3 years ago

源代码是这样的: CREATE TABLE xxl_job_lock ( lock_name varchar(50) NOT NULL COMMENT '锁名称', PRIMARY KEY (lock_name) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

因为公司对对ddl的限制,我改成了这样: CREATE TABLE xxl_job_lock ( id int(1) NOT NULL, lock_name varchar(32) NOT NULL COMMENT '锁名称', PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

conn.prepareStatement("select * from xxl_job_lock where id = 1 and lock_name = 'schedule_lock' for update");

我感觉,这个改动,应该没问题吧。锁不会无效的吧。

SuperGoldHouse commented 3 years ago

好家伙好家伙 我这边是不运行 你是重复运行

muxinzi commented 3 years ago

理论上MySQL排它锁不会同一时间调度两次任务,除非第一次调度之后你没有修改下次的调度时间

908028664 commented 3 years ago

我们用的是 阿里云 polardb,admin链接的地址是 集群地址, 阿里云 SQL 洞察中 发现,for update 是在mysql主节点发生的,select xxl_job_info 是查的从节点,update 下次触发时间 是在主节点发生的, 第一种方案:将 admin 链接的 jdbc url 链接到 主库,这个系统所有的sql 都走主库,这也是xxl-job 作者 推荐的方式,这个方案已验证, 一天之内未发生重复执行,后续跟进观察 第二种方案:admin 系统 链接 集群地址,for updae 时的 Connection是原生jdbc的方式, select xxl_job_info 是mybatis 的方式,select 也用 这个 Connection 链接,这样在一个事务里,查询 也是走的 主库,这个方案 待验证,这个方案好处是 系统其它的 select 可以走 从库,减轻 主库 压力

pdy11205 commented 3 years ago

请问xxl配置的数据库地址不是集群的,项目的地址是读写的,出现这个问题应该不会是这个原因导致的吧

908028664 commented 3 years ago

请问xxl配置的数据库地址不是继承的,项目的地址是读写的,出现这个问题应该不会是这个原因导致的吧

请你把问题问的清楚些。

908028664 commented 3 years ago

实验证明: 第一种方案:将jdbc url 链接到主库上,可以有效防止重复执行 第二种方案:JobScheduleHelper.java 类中的关键三步, 1、select from xxl_job_lock ...... for update 2、select from xxl_job_info ...... 3、update xxl_job_info set trigger_next_time , XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);

让第二步 Connection 使用 第一步的 Connection,可以有效防止重复执行

下面是代码: 注意preparedStatement,两条sql ,使用 两个 preparedStatement 接收

try {

                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);

                    Instant startTransactionalTime = Instant.now();
                    logger.info("startTransactionalTime:" + dtf.format(LocalDateTime.ofInstant(startTransactionalTime, ZoneOffset.ofHours(8))));

                    preparedStatement = conn.prepareStatement("select * from xxl_job_lock where id = 1 and lock_name = 'schedule_lock' for update");
                    preparedStatement.execute();

                    // tx start2
                    Instant startQueryJobInfoTime = Instant.now();

                    // 1、pre read
                    long nowTime = System.currentTimeMillis();
                    PreparedStatement preparedStatement2 = conn.prepareStatement("SELECT t.id," +
                            " t.job_group," +
                            " t.job_cron," +
                            " t.job_desc," +
                            " t.task_group," +
                            " t.add_time," +
                            " t.update_time," +
                            " t.author," +
                            " t.alarm_type," +
                            " t.alarm_email," +
                            " t.executor_route_strategy," +
                            " t.executor_handler," +
                            " t.executor_param," +
                            " t.executor_block_strategy," +
                            " t.executor_timeout," +
                            " t.executor_fail_retry_count," +
                            " t.glue_type," +
                            " t.glue_source," +
                            " t.glue_remark," +
                            " t.glue_updatetime," +
                            " t.child_jobid," +
                            " t.trigger_status," +
                            " t.trigger_last_time," +
                            " t.trigger_next_time " +
                            " FROM xxl_job_info AS t " +
                            " WHERE t.trigger_status = 1" +
                            " and t.trigger_next_time <=  ? " +
                            " ORDER BY id ASC" +
                            " LIMIT ? ");
                    preparedStatement2.setLong(1, nowTime + PRE_READ_MS);
                    preparedStatement2.setInt(2, preReadCount);
                    ResultSet rs = preparedStatement2.executeQuery();

                    List<XxlJobInfo> scheduleList = new ArrayList<XxlJobInfo>();
                    while (rs.next()) {
                        XxlJobInfo jobInfo = new XxlJobInfo();
                        jobInfo.setId(rs.getInt("id"));
                        jobInfo.setJobGroup(rs.getInt("job_group"));
                        jobInfo.setJobCron(rs.getString(3));
                        jobInfo.setJobDesc(rs.getString(4));
                        jobInfo.setTaskGroup(rs.getString(5));
                        jobInfo.setAddTime(rs.getDate(6));
                        jobInfo.setUpdateTime(rs.getDate(7));
                        jobInfo.setAuthor(rs.getString(8));
                        jobInfo.setAlarmType(rs.getInt(9));
                        jobInfo.setAlarmEmail(rs.getString(10));
                        jobInfo.setExecutorRouteStrategy(rs.getString(11));
                        jobInfo.setExecutorHandler(rs.getString(12));
                        jobInfo.setExecutorParam(rs.getString(13));
                        jobInfo.setExecutorBlockStrategy(rs.getString(14));
                        jobInfo.setExecutorTimeout(rs.getInt(15));
                        jobInfo.setExecutorFailRetryCount(rs.getInt(16));
                        jobInfo.setGlueType(rs.getString(17));
                        jobInfo.setGlueSource(rs.getString(18));
                        jobInfo.setGlueRemark(rs.getString(19));
                        jobInfo.setGlueUpdatetime(rs.getDate(20));
                        jobInfo.setChildJobId(rs.getString(21));
                        jobInfo.setTriggerStatus(rs.getInt(22));
                        jobInfo.setTriggerLastTime(rs.getLong(23));
                        jobInfo.setTriggerNextTime(rs.getLong("trigger_next_time"));

                        scheduleList.add(jobInfo);
                    }

                    // List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    // System.out.println("=======到底哪里调了=======9===="+scheduleList.size());
                    if (scheduleList != null && scheduleList.size() > 0) {

下面分析下原因: 如果链接的是集群地址,第一个sql 是锁,必然走主库,第二个sql 查任务,走的是 从库,第三个是 更新任务下次执行时间 必然走 主库, 假设一个请求进来,执行完后,主库的该任务执行时间已更新,正在同步从库,这时,第二个请求开始执行,查询从库的任务,由于之前的同步还没有完成,所以第二个请求查出来的任务,就会和 之前的重复,所以,mysql 主从时,锁和查询任务这两步,最好是使用同一个链接,或者 强制走主库。

ArchitectRoad commented 3 years ago

你好,遇到同样问题,配置的连接是读写分离的代理地址。 请问下,第二种方案,你们是自己修改源码来实现了吗? 这样的话,直接加redis锁应该会更好吧