Advertisement

【案例】--(非分布式)轻量级任务调度平台

阅读量:

目录

  • 一、前言说明

  • 二、背景

    • 2.1、完成任务,顺便搭建了一个任务调度平台
  • 第三章 实现细节解析

    • 技术架构搭配方案

      • 具体实现环节解析
      • (1) 覆盖基础任务处理模块
        • (2) 搭建完整的日志收集体系
        • (3) 配备用户异常响应机制并提供智能调度控制选项
        • (4) 实时追踪运行中的任务以及队列状态变化情况
        • (5) 即时评估任务运行状态及可能出现的阻塞问题并采取相应措施"
    • 四、平台存在不足

一、前言说明

不是说功能满足不了,而是适应分布式场景上是有差距的

不是说功能满足不了

撰写这篇文章之前有必要做一番说明。我开发了一个非分布式架构下的轻量级任务调度系统。它与现有的quartz和xxl-job等成熟方案相比有明显不足【虽然在某些功能上能满足需求

可参考任务调度类对比

针对现有多种任务调度方案进行深入比较分析并参考任务调度类对比的技术文档后

二、背景

2.1、完成任务,顺便搭建了一个任务调度平台

反馈时就知道不会同意

三、具体实现解析

3.1、技术栈等选型

基于Spring Boot框架的线程池任务调度器类(非分布式架构)已完成配置,并成功注入到IOC容器中。

复制代码
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(10);                        // 线程池大小
        threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");   // 线程名称
        threadPoolTaskScheduler.setAwaitTerminationSeconds(60);         // 等待时长
        threadPoolTaskScheduler.setRemoveOnCancelPolicy(true); //队列中设置取消的任务,会被剔除
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);  // 调度器shutdown被调用时等待当前被调度的任务完成
        return threadPoolTaskScheduler;
    }

提供公共基本添加延迟、周期性、立即等任务、取消任务的相关方法

复制代码
    @Component
    public class ThreadPoolTaskSchedulerRunner {
    private volatile ConcurrentHashMap<String,ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
    @Autowired
    ThreadPoolTaskScheduler threadPoolTaskScheduler;
    //周期性
    private void addTaskScheduler(TaskScheHandler handler){
        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(handler,
                triggerContext -> {
                    CronTrigger cronTrigger = new CronTrigger(handler.getTaskObj().getCron());
                    return cronTrigger.nextExecutionTime(triggerContext);
                });
        scheduledFutureMap.put(handler.getTaskObj().getTaskId(),scheduledFuture);
    }
     //延迟
    private void addDelayScheduler(Runnable r,TaskSchedulerDo param){
        long delay = param.getDelayTime()+TimeUtil.dateToLong(new Date());
        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(
                r,TimeUtil.longToDate(delay,TimeUtil.DT_FORMAT));
        scheduledFutureMap.put(param.getTaskId(),scheduledFuture);
    }
    //立即任务
    private void addRightNowScheduler(Runnable r,TaskSchedulerDo param){
        long delay = param.getDelayTime()+TimeUtil.dateToLong(new Date());
        Future<?> future = threadPoolTaskScheduler.submit(r);
        scheduledFutureMap.put(param.getTaskId(),(ScheduledFuture)future);
    }
    
    
    private boolean isCancelled(String taskId){
        if(scheduledFutureMap.containsKey(taskId))
            return !scheduledFutureMap.get(taskId).isCancelled();//继续判断调度器中是否取消
        return false;
    }
    private boolean cancelTaskScheduler(String taskId){
        if(scheduledFutureMap.containsKey(taskId)){
            ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(taskId);
            if(!scheduledFuture.isCancelled())
                return scheduledFuture.cancel(true);
        }
        return false;
    }
     }

3.2、完成具体功能解析

(1)、支持基本任务功能

(1)、可实现延时功能以支持周期性/cron作业以及即时作业
(2)、该系统可在任务启动前动态优化策略以停止或重新启动相关作业

概述基本ThreadPoolTaskScheduler类的具体操作流程,并详细说明其核心功能模块设计与实现原理。为了满足上述功能需求,在系统架构设计阶段需重点考虑哪些关键组件?

复制代码
    //添加定时任务
    public void add(TaskSchedulerDo job){
        if(isCancelled(job.getTaskId())) stop(job);
        TaskScheHandler handler = TaskEnum.getObj(job);
        addTaskScheduler(handler);
    }
    //停止定时任务
    public  void stop(TaskSchedulerDo job){
        if(isCancelled(job.getTaskId())){
            cancelTaskScheduler(job.getTaskId());
            scheduledFutureMap.remove(job.getTaskId());
        }
    }

(2)、支持日志收集功能

对于日志收集而言,在调度平台上具有不可或缺的作用。为了更加精细地跟踪每个任务的执行状态以及是否存在阻塞情况,并且了解任务运行所需的时间长短等因素的变化情况,在平台中增加相应的日志收集功能是非常必要的。
为此,请创建一个Log日志线程类。

复制代码
    public class TaskLogHandler implements Runnable {
    private Runnable r;
      //  LogEntiry logBean;
    TaskSchedulerDo param;
    TaskLogHandler(Runnable r,TaskSchedulerDo param){
        this.r = r;
        this.param = param;
    }
    
    @Override
    public void run() {
        try{
            //日志开始  todo
            r.run(); //执行任务
        }catch (Exception e){
            //错误日志结束  todo
            return;
        }
        //成功日志 todo
    }
    }

对添加任务的方法进行改进如下:这样确保在每个任务过程中持续跟踪该任务的日志信息。

复制代码
    private void addDelayScheduler(Runnable r,TaskSchedulerDo param){
        long delay = param.getDelayTime()+TimeUtil.dateToLong(new Date());
        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(
                new TaskLogHandler(r,param),TimeUtil.longToDate(delay,TimeUtil.DT_FORMAT));
        scheduledFutureMap.put(param.getTaskId(),scheduledFuture);
    }

—主要就new TaskLogHandler(r,param)这点区别。

(3)、支持用户异常,选择性关闭调度功能

在上文中,在实现任务调度时创建了TaskLogHandler日志收集器。当用户线程发生异常事件时,必然会被捕获为TaskLogHandler类中的Exception事件,并在此处根据任务参数信息能够处理该任务是否已关闭调度的情况。

(4)、实时监控正在执行和任务队列的任务情况

采用该方法, 能够获取当前主线程正在执行的任务数量, 并查看队列Q中的任务数量

复制代码
    private Object getDetail(){
        Map<String,Object> map = new HashMap<>();
        map.put("activeSize",threadPoolTaskScheduler.getActiveCount());
        ScheduledExecutorService sE = threadPoolTaskScheduler.getScheduledExecutor();
        if(sE instanceof ThreadPoolTaskScheduler){
            ScheduledThreadPoolExecutor  scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) sE;
            map.put("queueSize",scheduledThreadPoolExecutor.getQueue().size());
        }
        return map;
    }

(5)、实时监控任务执行、阻塞等情况

为满足这一需求而设计的任务方案主要借鉴了TaskLogHandler任务日志以实现功能。这些TASK_ID将通过唯一的标识符以及scheduledFutureMap字段来进行实时跟踪,并确保对当前任务的状态进行实时监控。

四、平台存在不足

ThreadPoolTaskScheduler能够搭建轻量级任务调度平台,在满足部分业务需求方面具有可行性;但存在以下缺陷:其一为无法实现分布式部署;其二由于采用了单机部署模式,在任务处理能力上受到线程数量的直接影响:线程数量过多会导致对CPU及内存资源产生过高等性能消耗;线程数量不足则会导致系统处理效率降低;其三缺乏高可用特性:当服务出现故障时将导致任务调度功能失效。

全部评论 (0)

还没有任何评论哟~