任务调度中心 (优化版)【原】

news/2024/11/6 1:48:38

任务调度中心

主要依赖quartz.jar相关类 判断cron表达式 , 在下次即将执行的时间在指定时间内时, 从线程池中取线程进行调度 (优化版)

为什么要有调度中心

因为在集群环境,多server都会在同一时间执行相同定时任务,那么此时定时任务的并发会造成大量数据重复或其它不可预知的业务异常.而调度中心只会按间隔触发一次请求给集群中的负载去分发.不会造成重复触发的情况.

场景

前台工作人员录入定时任务信息入TBL_TASK表后, 调度中心以很短的间隔定时全量抓取库 TBL_TASK表判断表达式时间是否临近10秒以内,如果临近了,就触发请求给目标系统,让目标系统进行真正的业务处理(比如进行百万级别的数据同步),然后只需要返回一个成功失败标志告诉调度中心,最终统一从调度中心去观察任务正常与否.也方便了集中管理任务调度. 

只要配置好相关信息,就不用在spring或java 等trigger中去配置定时任务了.

 

 

下载资料

定时任务quartz 包 : quartz-2.2.3.zip

项目源码 :  http://pan.baidu.com/s/1nu9oK4p

git地址: https://git.oschina.net/KingBoBo/TimeTaskDispatcherCenter.git

相关表:

 TBL_TASK表结构如下,您不必建表,此处只是假设有这样的表存在而以.为了方便演示,最终只是模拟取数,并不会真正从数据库中取该表数据

主键任务名调度中心调度地址执行间隔表达式
C_IDC_NAMEC_URLC_EXPRESSION
employeeTask调用X系统进行员工信息同步http://www.xxx.com/syncEmployees0/10 * * * * ?
carTask调用Y系统进行车辆信息同http://www.yyy.com/syncCars0 0/1 * * * ?

 

 

 

 

相关类:

MyTask.java

普通任务Bean,对应数据表 TBL_TASK

package com.king;

/**
 * 普通任务javaBean,从数据库取到数据到放到该对象中
 * @author King
 *
 */
public class MyTask {
    String id;
    String name;
    String url;
    String expression;

    long delayMillis;// 延迟执行时间 单位毫秒
    boolean isApproaching;

    public MyTask(String id, String name, String url, String expression) {
        super();
        this.id = id;
        this.name = name;
        this.url = url;
        this.expression = expression;
    }

    public String getExpression() {
        return expression;
    }

    public void setExpression(String expression) {
        this.expression = expression;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public long getDelayMillis() {
        return delayMillis;
    }

    public void setDelayMillis(long delayMillis) {
        this.delayMillis = delayMillis;
    }

    public boolean isApproaching() {
        return isApproaching;
    }

    public void setApproaching(boolean isApproaching) {
        this.isApproaching = isApproaching;
    }

    @Override
    public String toString() {
        return "MyTask [id=" + id + ", name=" + name + ", url=" + url + "]";
    }

}
View Code

 

 

TaskDao.java

模拟对TASK表进行CRUD操作的Dao层

package com.king;

/**
 * 模拟dao层从数据库取数据,及更新数据
 * @author King
 *
 */
public class TaskDao {
    
    /**
     * 生成2个定时任务,用来模拟定时任务表
     * @return
     */
    public MyTask[] querySomeTasks() {
        // 10秒一次
        MyTask employeeTask = new MyTask("employeeTask", "调用X系统的servlet,进行员工信息同步", "http://www.baidu.com", "0/10 * * * * ?");
        // 20秒一次
        MyTask carTask = new MyTask("carTask", "调用Y系统的servlet,进行车辆信息同步", "http://www.baidu.com", "0 0/1 * * * ?");
        MyTask[] tasks = new MyTask[] { employeeTask, carTask };
        return tasks;
    }

    /**
     * 更新
     * @return
     */
    public String updateSomething() {
        return "";
    }
}

 

MyCallable.java

线程调用类,由public static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(30);池调用

package com.king;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
 * 受线程池管控的线程类
 * @author King
 *
 */
public class MyCallable implements Callable<String> {
    //固定30个线程的线程池,如果定时任务的个数超过该值,有一定可能造成任务等待.
    //但不一定会发生,这要看是否有30多个任务是否都集中在同一时间点上触发
    public static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(30);
    private MyTask task;
    private static String encoding = "UTF-8";
    private Map runningMap;
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");

    @Override
    public String call() throws Exception {
        String returnData = "success";// 模拟返回信息
        System.out.println();
        System.out.println("任务开始时间:【" + sdf.format(new Date()) + "】");
        try {
            Thread.currentThread().sleep(1000);
            System.out.println("【模拟】用java.net.HttpURLConnection发外围传进来的task" + task);// 此处用打印语句模拟真实发送
            System.out.println("【模拟】返回报文为: " + returnData);
            System.out.println("【模拟】信息返回后更新表TaskDao.updateSomething()");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            runningMap.remove(task.getId());
        }
        System.out.println("任务结束时间:【" + sdf.format(new Date()) + "】");
        System.out.println();
        return returnData;
    }

    public MyTask getTask() {
        return task;
    }

    public void setTask(MyTask task) {
        this.task = task;
    }

    public Map getRunningMap() {
        return runningMap;
    }

    public void setRunningMap(Map runningMap) {
        this.runningMap = runningMap;
    }

}

 

 

 

 

TimeTaskDispatcherCenter.java

任务调度中心主类,Main()方法模拟了10次调度. 

实际主要模拟了,从数据库中取到task表中信息,判断表达式时间是否临近10秒,如果临近了就从池中取线程延迟一定时间后执行该task任务

 

package com.king;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.quartz.CronExpression;

/**
 * 定时任务调度中心
 * @author King
 *
 */
public class TimeTaskDispatcherCenter {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
    private static final int APPROACH_SECONDS = 10;// 临近时间 单位秒
    // 线程安全的map
    private static final Map RUNNING_MAP = Collections.synchronizedMap(new HashMap());

    public void dispatch() {
        System.out.println("执行中的任务:" + RUNNING_MAP.keySet().toString());
        // 模拟从从数据库取task数据
        TaskDao dao = new TaskDao();
        MyTask[] tasks = dao.querySomeTasks();
        for (MyTask task : tasks) {
            if (judgeAppropching(task)) {// 如果即将执行的时间临近当前时间10秒内
                if (RUNNING_MAP.containsKey(task.getId())) {// 如果运行中的任务已包含当前任务,不执行该任务
                    continue;
                } else {
                    RUNNING_MAP.put(task.getId(), task);
                    execute(task);// 从池中取线程,运行该task
                }
            }
        }
    }

    /**
     * 如果字符串代表的cron表达式时间临近,返回true
     * 
     * @param task
     *            当expression字符串为空或cron表达式为空,返回false
     * @return
     */
    private boolean judgeAppropching(MyTask task) {
        CronExpression cron = null;
        try {
            cron = new CronExpression(task.getExpression());// 把字符串转换成cron表达式,用以计算下次执行时间
        } catch (ParseException e) {
            e.printStackTrace();
        }
        if (cron != null) {// 如果expression正确
            // 获取下次执行时间点 (long)
            Date nextValidDate = cron.getNextValidTimeAfter(new Date());
            long nextValidTimeMills = nextValidDate.getTime();
            // 计算 下次执行时间点和系统当前时间点 时间差 (delaymillis毫秒)
            long delayMillis = nextValidTimeMills - System.currentTimeMillis();
            System.out.println("任务" + task.getId() + "\t\t\t【下次执行时间预计为:】" + sdf.format(nextValidDate) + "距离当前时间还差" + delayMillis / 1000 + "秒左右");
            // 如果 0秒<时间差<10秒 ,返回true
            if (delayMillis > 0 && delayMillis < APPROACH_SECONDS * 1000) {
                task.setDelayMillis(delayMillis);// 这一句话很重要,设置了延迟执行时间,execute()方法体中需要该延迟时间
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 如果字符串代表的cron表达式时间临近,返回true
     * 
     * @param expression
     *            当expression字符串为空或cron表达式为空,返回false
     * @return
     */
    private boolean judgeAppropching(String expression) {
        CronExpression cron = null;
        try {
            cron = new CronExpression(expression);// 把字符串转换成cron表达式,用以计算下次执行时间
        } catch (ParseException e) {
            e.printStackTrace();
        }
        if (cron != null) {// 如果expression正确
            // 获取下次执行时间点 (long)
            Date nextValidDate = cron.getNextValidTimeAfter(new Date());
            long nextValidTimeMills = nextValidDate.getTime();
            // 计算 下次执行时间点和系统当前时间点 时间差 (delaymillis毫秒)
            long delayMillis = nextValidTimeMills - System.currentTimeMillis();
            // 如果 0秒<时间差<10秒 ,返回true
            System.out.println("【下次执行时间预计为:】" + sdf.format(nextValidDate) + "距离当前时间还差" + delayMillis / 1000 + "秒左右");
            if (delayMillis > 0 && delayMillis < APPROACH_SECONDS * 1000) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    private void execute(MyTask task) {
        MyCallable call = new MyCallable();
        call.setTask(task);
        call.setRunningMap(RUNNING_MAP);
        // 调度该任务,但延迟一定毫秒 ,judgeAppropching()会把延迟时间设置进去
        MyCallable.executor.schedule(call, task.getDelayMillis(), TimeUnit.MILLISECONDS);
    }

    // 实际主要模拟了,从数据库中取到task表中信息,判断表达式时间是否临近10秒,如果临近了就从池中取线程延迟一定时间后执行该task任务
    public static void main(String[] args) throws Exception {
        TimeTaskDispatcherCenter center = new TimeTaskDispatcherCenter();
        // center.judgeAppropching("0/10 * * * * ?");

        for (int i = 0; i < 10; i++) { //真实场景用while(true)
            center.dispatch();
            Thread.currentThread().sleep(2000);// 隔2秒去数据库取全表数据进行调度
        }

    }
}

 

 

 

 

打印结果

由于是多线程,小部分打印语句可能会互相穿插

执行中的任务:[]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:40 000距离当前时间还差8秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差28秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:40 000距离当前时间还差6秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差26秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:40 000距离当前时间还差4秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差24秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:40 000距离当前时间还差2秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差22秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:40 000距离当前时间还差0秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差20秒左右

任务开始时间:【2016-09-02 16:33:40 006】
【模拟】用java.net.HttpURLConnection发外围传进来的taskMyTask [id=employeeTask, name=调用X系统的servlet,进行员工信息同步, url=http://www.xxx.com/syncEmployees]
【模拟】返回报文为: success
【模拟】信息返回后更新表TaskDao.updateSomething()
任务结束时间:【2016-09-02 16:33:41 007】

执行中的任务:[]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:50 000距离当前时间还差8秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差18秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:50 000距离当前时间还差6秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差16秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:50 000距离当前时间还差4秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差14秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:50 000距离当前时间还差2秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差12秒左右
执行中的任务:[employeeTask]
任务employeeTask            【下次执行时间预计为:】2016-09-02 16:33:50 000距离当前时间还差0秒左右
任务carTask            【下次执行时间预计为:】2016-09-02 16:34:00 000距离当前时间还差10秒左右

任务开始时间:【2016-09-02 16:33:50 002】
【模拟】用java.net.HttpURLConnection发外围传进来的taskMyTask [id=employeeTask, name=调用X系统的servlet,进行员工信息同步, url=http://www.xxx.com/syncEmployees]
【模拟】返回报文为: success
【模拟】信息返回后更新表TaskDao.updateSomething()
任务结束时间:【2016-09-02 16:33:51 003】

 

其它

判断cron表达式是否有效

比如checkCronExpression("0/10 * * * * ? 2018")

    private boolean checkCronExpression(String cron){
        boolean b=false;
        try{
            CronExpression ce = new CronExpression(cron);
            Date date = ce.getNextValidTimeAfter(new Date()); 
            if(date != null){
                b=true;
            }
        }catch (ParseException e) {
            logger.error(e.getMessage(),e);
        }
        return b;
    }

 

转载于:https://www.cnblogs.com/whatlonelytear/p/5653614.html


http://www.niftyadmin.cn/n/4827491.html

相关文章

Python3内置函数(51-60)

# 51.delattr() # 用于删除属性。 class A(object):x 12y 23delattr(A, x)# 52.format() # Python2.6 开始&#xff0c;新增了一种格式化字符串的函数 str.format()&#xff0c;它增强了字符串格式化的功能。 # 基本语法是通过 {} 和 : 来代替以前的 % 。 # format 函数可以接…

JS 笔记 0330

1. 复习 <!-- 一, 对象的基本介绍对象也是数据的集合是通过属性和属性值来定义数据属性的作用就类似于,数组的索引下标对象是没有length属性的一般从数据库获取的数据形式 [{},{},{},]数组,forEach循环 , for , for...in对象,只能使用 for...in 循环对象的基本操作方法基…

第一篇,就写今天看的东西

一被别人问&#xff1a;你是学什么方向的 我默默回答一句&#xff1a;数据挖掘 别人意味深长の回答一句&#xff1a;哦....... 想必看出了我只是个小白。 既然清楚自己是个小白&#xff0c;开这个博客也只是为了让自己更好的做笔记&#xff0c;并且渴望得到大神的指点&#xff…

Python3内置函数(61-69)

# 61.max() # 返回给定参数的最大值&#xff0c;参数可以为序列。 lst1 (1, 2, 45, 6, 7, 64, 32, 14) print(max(lst1))# 62.memoryview() # 返回给定参数的内存查看对象 v memoryview(bytearray(qwerty, utf-8)) print(v[1]) print(v[-1])# 63.repr() # 将对象转化为供解释…

JS 事件 0331

1.复习 DOM操作,内容的设定 写入 标签对象.innerHTML ‘内容’ 支持解析标签 标签对象.innerText ‘内容’ 不支持,不解析标签 获取 var 变量 标签对象.innerHTML 获取标签的所有内容,包括标签 var 变量 标签对象.innerText 获取标签的所有文本内容,没有标签 事件 鼠标事件…

JS 秒表

<body><div>00:00:00:00</div><button>开始</button><button disabled>暂停</button><button disabled>继续</button><button disabled>重置</button><script>// 秒表功能分析// 核心思路:// 定义一…

Python3的configparser模块的使用

import configparserconfig configparser.ConfigParser()# 字典模式生成配置文件 # 第一个section config[DEFAULT] {A: abc,B: 123, # 数字也要写成string类型C: hello}# 第二个section config[Head] {H1: 100, H2: 200, H3: 300}# 第三个section config[www] {W1: 199, …

Hadoop生态圈介绍及入门(转)

本帖最后由 howtodown 于 2015-4-2 23:15 编辑 问题导读1.Hadoop生态圈介绍了哪些组件&#xff0c;分别都是什么&#xff1f;2.大数据与Hadoop是什么关系&#xff1f;本章主要内容&#xff1a;理解大数据的挑战了解Hadoop生态圈了解Hadoop发行版使用基于Hadoop的企业级应用你可…