Advertisement

Go Cron 定时任务

阅读量:

原文地址http://zero-tt.fun/go/cron/

掌握Go的定时任务管理技巧,则必须深入学习robfig/cron包,并访问其GitHub地址github地址

1. 使用 Demo

1.1 每秒钟执行一次

复制代码
    package main
    
    import (
    "fmt"
    "time"
    
    "github.com/robfig/cron/v3"
    )
    
    func main() {
    job := cron.New(
        cron.WithSeconds(), // 添加秒级别支持,默认支持最小粒度为分钟
    )
    // 每秒钟执行一次
    job.AddFunc("* * * * * *", func() {
        fmt.Printf("secondly: %v\n", time.Now())
    })
    job.Run()   // 启动
    }

cron 表达式格式无需自行查阅资料,在此不做详细说明。
需要注意的是,默认情况下 cron 支持至分钟级的时间粒度,在初始化 cron 时,请确保调用 cron.WithSeconds() 参数以扩展至秒级别。

1.2 每分钟执行一次

复制代码
    // 每分钟执行一次
    job.AddFunc("0 * * * * *", func() {
    fmt.Printf("minutely: %v\n", time.Now())
    })

1.3 每小时执行一次

复制代码
    // 每小时执行一次
    job.AddFunc("0 0 * * * *", func() {
    fmt.Printf("hourly: %v\n", time.Now())
    })
    // 另一种写法
    job.AddFunc("@hourly", func() {
    fmt.Printf("hourly: %v\n", time.Now())
    })

cron 支持的解析器能够识别 @hourly 这种频率设置;此外还包括 daily、weekly、monthly、yearly 和 annually 等常见频率选项。

1.4 固定时间间隔执行一次

cron 表达式无法直接实现,另辟蹊径。

1.4.1 @every 写法
复制代码
    // 固定时间间隔执行
    job.AddFunc("@every 60s", func() {
    fmt.Printf("every: %v\n", time.Now())
    })

此外也支持了功能。 也就是一个时间段。 类似地还有 60s, 包括 1h, 还有像 `1h30m$ 等等的时间格式。 具体的格式可以通过 time.ParseDuration 获取

1.4.2 Schedule 写法
复制代码
    job.Schedule(cron.ConstantDelaySchedule{Delay: time.Minute}, cron.FuncJob(func() {
    fmt.Printf("every: %v\n", time.Now())
    }))

在创建 job 时被配置为一个调度器,并且被设定为定期执行。具体原理将在后续章节中详细说明。

注意 :虽然 @everySchedule 也能够实现每小时执行一次的这种任务,但是它和 @hourly 这种方式还是不同的,区别在于:@hourly 是在每个小时的开始的时候执行任务,换句话说,如果你在 11:55 分的时候启动了定时任务,那最近一次的执行时间是 12:00。但是 @everySchedule 这种写法,下次的执行时间会是 12:55,也就是一小时后。

2. 源码分析

2.1 Schedule

复制代码
    // 描述一个 job 如何循环执行
    type Schedule interface {
    // Next returns the next activation time, later than the given time.
    // Next is invoked initially, and then each time the job is run.
    Next(time.Time) time.Time
    }

该接口定义了一个名为Next的方法(time.Time → time.Time),用于计算任务的下一个执行时间。

2.1.1 Schedule 的实现一:SpecSchedule

通常情况下,在 NewCron() 的配置中,默认选择了这种方法,并且支持解析 Cron 表达式的能力。具体实现位于 spec.go 文件中,请确保理解该方法返回的时间即可确定下次调度时间。

2.1.2 Schedule 的实现二:ConstantDelaySchedule

同样的是ConstantDelaySchedule,请明白func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time这个方法负责计算并返回任务下次被调度的时间值。具体实现则位于constantdelay.go一文中。

2.2 Job

复制代码
    type Job interface {
    Run()
    }

接口类型,定义定时任务,cron 调度一个 Job,就去执行 Job 的 Run() 方法。

2.2.1 实现:FuncJob
复制代码
    type FuncJob func()
    
    func (f FuncJob) Run() { f() }

FuncJob 实际就是一个 func() 类型,实现了 Run() 方法。

2.3 修饰器加工 Job

修饰器具有多种形式,在编程中为了便于理解,通常会先定义一下修饰器的类型。对于修饰器的相关说明,请参考我另一篇文章《Go 修饰器》

复制代码
    type JobWrapper func(Job) Job
2.3.1 修饰器一:Job 上次的执行还没结束,这次就跳过吧
复制代码
    func SkipIfStillRunning(logger Logger) JobWrapper {
    var ch = make(chan struct{}, 1)
    ch <- struct{}{}
    return func(j Job) Job {
        return FuncJob(func() { // 这个外层 func(),封装了真实的用户期望执行的 func()
            select {
            case v := <-ch:
                j.Run() // 这里才是在执行我们真实的 Job
                ch <- v
            default:
                logger.Info("skip")
            }
        })
    }
    }

可以认为该装饰器向Job增加了锁定机制,并且这个机制仅在成功获得该lock时才能让指定的Job执行;如果无法获得lock,则直接调用logger.info()

使用示例:

复制代码
    job.AddJob("@every 1s", cron.SkipIfStillRunning(cron.DefaultLogger)(cron.FuncJob(func() {
    time.Sleep(time.Second * 3)
    fmt.Printf("SkipIfStillRunning: %v", time.Now())
    })))

当然,你也可以在创建 cron 时就使用 chain,这将会对所有 Job 起作用

复制代码
    jobs := cron.New(
    cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)),
    )

在修饰器二的作用下,在上次运行中还处于未完成状态的情况下,在本次运行前先进行阻塞操作;为此,请确保等到上次运行完成之后再进行下一次操作

复制代码
    func DelayIfStillRunning(logger Logger) JobWrapper {
    return func(j Job) Job {
        var mu sync.Mutex
        return FuncJob(func() {
            start := time.Now()
            mu.Lock()
            defer mu.Unlock()
            // 阻塞超过 1 分钟,log 记录
            if dur := time.Since(start); dur > time.Minute {
                logger.Info("delay", "duration", dur)
            }
            j.Run()
        })
    }
    }

与 SkipIfStillRunning 的工作原理类似,在缺少了一个 default 的比例后造成了阻塞,并没有直接调用 log函数。

2.3.3 修饰器的使用

可能存在多个修饰器的情况中,在一个Job中集成多个修饰器的方式类似于嵌套的方式进行层层包裹。

复制代码
    // 所有修饰器的载体
    type Chain struct {
    wrappers []JobWrapper
    }
    
    // 创建修饰器载体
    func NewChain(c ...JobWrapper) Chain {
    return Chain{c}
    }
    
    // 修饰器应用到 Job,一层一层的套
    // 假如是:NewChain(m1, m2, m3).Then(job)
    // 相当于:m1(m2(m3(job)))
    func (c Chain) Then(j Job) Job {
    for i := range c.wrappers {
        j = c.wrappers[len(c.wrappers)-i-1](j)
    }
    return j
    }

2.4 Entry 定义

复制代码
    type Entry struct {
    ID EntryID          // job id,可以通过该 id 来删除 job
    Schedule Schedule   // 用于计算 job 下次的执行时间
    Next time.Time      // job 下次执行时间
    Prev time.Time      // job 上次执行时间,没执行过为 0
    WrappedJob Job      // 修饰器加工过的 job
    Job Job             // 未经修饰的 job,可以理解成就是 AddFunc 的第二个参数
    }

结构体字段,上文已经解释清楚了

2.5 Cron 定义

复制代码
    type Cron struct {
    entries   []*Entry          // Job 集合
    chain     Chain             // 装饰器链
    stop      chan struct{}     // 停止信号
    add       chan *Entry       // add 信号
    remove    chan EntryID      // remove 信号
    snapshot  chan chan []Entry // 快照
    running   bool              // 是否正在运行
    logger    Logger            // 日志
    runningMu sync.Mutex        // 运行时锁
    location  *time.Location    // 时区相关
    parser    Parser            // Cron 解析器
    nextID    EntryID           // 
    jobWaiter sync.WaitGroup    // 正在运行的 Job
    }
2.5.1 run 方法
复制代码
    func (c *Cron) run() {
    c.logger.Info("start")
    
    // 计算每个 Job 下次的执行时间
    now := c.now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    }
    
    // 一个死循环,进行任务调度
    for {
        // 根据下一次的执行时间,对所有 Job 排序
        sort.Sort(byTime(c.entries))
    
        // 计时器,用于没有任务可调度时的阻塞操作
        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // 无任务可调度,设置计时器到一个很大的值,把下面的 for 阻塞住
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            // 有任务可调度了,计时器根据第一个可调度任务的下次执行时间设置
            // 排过序,所以第一个肯定是最先被执行的
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }
    
        for {
            select {
            // 有 Job 到了执行时间
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)
                // 检查所有 Job,执行到时的任务
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    // 执行 Job 的 func()
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    // 设置 Job 下次的执行时间
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }
    
            // 添加新 Job
            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
    
            // 获取所有 Job 的快照
            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue
    
            // 停止调度
            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return
    
            // 根据 entryId 删除一个 Job
            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }
    
            break
        }
    }
    }

3. 总结

cron 包主要包含了哪些组件:

  1. 解析模块:处理 cron 排程指令
  2. 定时执行调度系统:确定 job 的下一次启动时刻
  3. 控制模块:指定 job 的执行运行模式
  4. 任务逻辑:负责定期调用指定的函数

全部评论 (0)

还没有任何评论哟~