心跳机制,业务服务启动一个线程,每30秒主动上报一次信息(ip+port)。
xxljob-admin启动一个线程,每30秒检查一下注册的服务是否dead,dead的条件是最后一次上报时间,距离当前时间超过90(30*3)秒。
registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { // remove dead address (admin/executor) List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids != null && ids.size() > 0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } }}); xxljob-admin有一个调度线程,大概每5秒,查询一下待执行的任务,如果任务达到了执行时间点,就会请求业务服务去执行任务。
注意一点,在执行之前,先通过数据库进行加锁,防止集群的情况下,出现同时调度任务的情况。
select * from xxl_job_lock where lock_name = 'schedule_lock' for updatescheduleThread = new Thread(new Runnable() { @Override public void run() { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000); // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { //基于数据库进行加锁,保证集群情况下,同时只会存在一个执行任务 preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update"); preparedStatement.execute(); // 1、pre read long nowTime = System.currentTimeMillis(); List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList != null && scheduleList.size() > 0) { for (XxlJobInfo jobInfo : scheduleList) { if (nowTime > jobInfo.getTriggerNextTime()) { //执行任务 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); } } } } }}); | 留言与评论(共有 0 条评论) “” |