目录
入门
cron 默认支持最小粒度为分钟,最多支持秒级别。
GitHub:https://github.com/robfig/cron
安装
1 |
go get -u github.com/robfig/cron/v3 |
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package main import ( "fmt" "time" "github.com/robfig/cron/v3" ) func main() { // 创建 cron 实例 c := cron.New() // 添加任务 // "@every 1s" 表示每秒 // func() {} 表示任务 c.AddFunc("@every 1s", func() { fmt.Println("tick every 1 second") }) // 开始任务 c.Start() //defer c.Stop() // 停止任务, 但是不能关闭已经在执行中的任务 } |
使用非常简单,创建cron
对象,这个对象用于管理定时任务。
调用cron
对象的AddFunc()
方法向管理器中添加定时任务。AddFunc()
接受两个参数,参数 1 以字符串形式指定触发时间规则,参数 2 是一个无参的函数,每次触发时调用。@every 1s
表示每秒触发一次,@every
后加一个时间间隔,表示每隔多长时间触发一次。例如@every 1h
表示每小时触发一次,@every 1m2s
表示每隔 1 分 2 秒触发一次。time.ParseDuration()
支持的格式都可以用在这里。
调用c.Start()
启动定时循环。
运行效果,每隔 1s 输出一行字符串:
1 2 3 4 5 |
tick every 1 second tick every 1 second tick every 1 second tick every 1 second tick every 1 second |
Job接口 自定义任务struct
除了直接将无参函数作为回调外,cron
还支持Job
接口:(以下是cron.go源码)
1 2 3 4 |
// cron.go type Job interface { Run() } |
我们定义一个实现接口Job
的结构:
1 2 3 4 5 6 7 |
type GreetingJob struct { Name string } func (g GreetingJob) Run() { fmt.Println("Hello ", g.Name) } |
调用cron
对象的AddJob()
方法将GreetingJob
对象添加到定时管理器中:
1 2 3 4 5 6 7 8 9 |
func main() { // 创建cron实例 c := cron.New() // 添加,实现了Job接口的结构体,作为任务 c.AddJob("@every 1s", GreetingJob{"dj"}) // 开始任务 c.Start() } |
运行效果:
1 2 3 4 5 6 |
$ go run main.go Hello dj Hello dj Hello dj Hello dj Hello dj |
使用自定义的结构可以让任务携带状态(Name
字段)。
实际上AddFunc()
方法内部也调用了AddJob()
方法。
创建任务的3种方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
func main() { // 创建cron 实例 job := cron.New() // 定时规则 spec :="@every 1s" // 添加任务(方式一: AddFunc) jobId,err := job.AddFunc(spec, func() { fmt.Println("tick every 1 second") }) if err != nil { fmt.Printf("添加定时任务失败:%v \n",err.Error()) return } fmt.Printf("AddFunc已添加,id: %v \n",jobId) // 添加任务(方式二: AddJob) addJobId,err := job.AddJob(spec, &GreetingJob{Name: "AddJob"}) if err != nil { fmt.Printf("添加定时任务失败:%v \n",err.Error()) return } fmt.Printf("AddJob已添加,id: %v \n",addJobId) // 添加任务(方式三: Schedule) // 每次延迟5秒执行 scheduleJobId := job.Schedule(cron.ConstantDelaySchedule{Delay: 5*time.Second},GreetingJob{Name: "Schedule"}) fmt.Printf("Schedule已添加,id: %v \n",scheduleJobId) } |
线程安全
cron
会创建一个新的 goroutine 来执行触发回调。如果这些回调需要并发访问一些资源、数据,我们需要显式地做同步。
时间格式处理
时间格式
与Linux 中crontab
命令相似,cron
库支持用 5 个空格分隔的域来表示时间。这 5 个域含义依次为:
Minutes
:分钟,取值范围[0-59]
,支持特殊字符* / , -
;Hours
:小时,取值范围[0-23]
,支持特殊字符* / , -
;Day of month
:每月的第几天,取值范围[1-31]
,支持特殊字符* / , - ?
;Month
:月,取值范围[1-12]
或者使用月份名字缩写[JAN-DEC]
,支持特殊字符* / , -
;Day of week
:周历,取值范围[0-6]
或名字缩写[JUN-SAT]
,支持特殊字符* / , - ?
。
注意,月份和周历名称都是不区分大小写的,也就是说SUN/Sun/sun
表示同样的含义(都是周日)。
特殊字符含义如下:
*
:使用*
的域可以匹配任何值,例如将月份域(第 4 个)设置为*
,表示每个月;/
:用来指定范围的步长,例如将小时域(第 2 个)设置为3-59/15
表示第 3 分钟触发,以后每隔 15 分钟触发一次,因此第 2 次触发为第 18 分钟,第 3 次为 33 分钟。。。直到分钟大于 59;,
:用来列举一些离散的值和多个范围,例如将周历的域(第 5 个)设置为MON,WED,FRI
表示周一、三和五;-
:用来表示范围,例如将小时的域(第 1 个)设置为9-17
表示上午 9 点到下午 17 点(包括 9 和 17);?
:只能用在月历和周历的域中,用来代替*
,表示每月/周的任意一天。
了解规则之后,我们可以定义任意时间:
30 * * * *
:分钟域为 30,其他域都是*
表示任意。每小时的 30 分触发;30 3-6,20-23 * * *
:分钟域为 30,小时域的3-6,20-23
表示 3 点到 6 点和 20 点到 23 点。3,4,5,6,20,21,22,23 时的 30 分触发;0 0 1 1 *
:1(第 4 个) 月 1(第 3 个) 号的 0(第 2 个) 时 0(第 1 个) 分触发。
记熟了这几个域的顺序,再多练习几次很容易就能掌握格式。熟悉规则了之后,就能熟练使用crontab
命令了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func main() { c := cron.New() // "30 * * * *" 定义执行的时间 c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println("On the half hour of 3-6am, 8-11pm") }) c.AddFunc("0 0 1 1 *", func() { fmt.Println("Jun 1 every year") }) c.Start() for { time.Sleep(time.Second) } } |
预定义时间规则
为了方便使用,cron
预定义了一些时间规则:
@yearly
:也可以写作@annually
,表示每年第一天的 0 点。等价于0 0 1 1 *
;@monthly
:表示每月第一天的 0 点。等价于0 0 1 * *
;@weekly
:表示每周第一天的 0 点,注意第一天为周日,即周六结束,周日开始的那个 0 点。等价于0 0 * * 0
;@daily
:也可以写作@midnight
,表示每天 0 点。等价于0 0 * * *
;@hourly
:表示每小时的开始。等价于0 * * * *
。
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func main() { c := cron.New() // 使用了预定义的时间 c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) c.AddFunc("@daily", func() { fmt.Println("Every day on midnight") }) c.AddFunc("@weekly", func() { fmt.Println("Every week") }) c.Start() for { time.Sleep(time.Second) } } |
固定时间间隔
cron支持固定时间间隔,格式为:
1 |
@every <duration> |
含义为每隔duration
触发一次。<duration>
会调用time.ParseDuration()
函数解析,所以ParseDuration
支持的格式都可以。例如1h30m10s
。在快速开始部分,我们已经演示了@every
的用法了,这里就不赘述了。
时区
默认情况下,所有时间都是基于当前时区的。当然我们也可以指定时区,有 2 两种方式:
- 在时间字符串前面添加一个
CRON_TZ=
+ 具体时区,具体时区的格式在之前carbon
的文章中有详细介绍。东京时区为Asia/Tokyo
,纽约时区为America/New_York
; - 创建
cron
对象时增加一个时区选项cron.WithLocation(location)
,location
为time.LoadLocation(zone)
加载的时区对象,zone
为具体的时区格式。或者调用已创建好的cron
对象的SetLocation()
方法设置时区。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func main() { // 方式一 : cron.New(cron.WithLocation()) 指定时区 nyc, _ := time.LoadLocation("America/New_York") c := cron.New(cron.WithLocation(nyc)) c.AddFunc("0 6 * * ?", func() { fmt.Println("Every 6 o'clock at New York") }) // 方式二 :CRON_TZ= 指定时区 c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() { fmt.Println("Every 6 o'clock at Tokyo") }) c.Start() } |
自定义时间格式
cron
支持灵活的时间格式,如果默认的格式不能满足要求,我们可以自己定义时间格式。时间规则字符串需要cron.Parser
对象来解析。我们先来看看默认的解析器是如何工作的。
首先定义各个域:(以下是parser.go源码)
1 2 3 4 5 6 7 8 9 10 11 12 |
// parser.go const ( Second ParseOption = 1 << iota SecondOptional Minute Hour Dom Month Dow DowOptional Descriptor ) |
除了Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)
外,还可以支持Second
。相对顺序都是固定的:(以下是parser.go源码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// parser.go var places = []ParseOption{ Second, Minute, Hour, Dom, Month, Dow, } var defaults = []string{ "0", "0", "0", "*", "*", "*", } |
默认的时间格式使用 5 个域。
我们可以调用cron.NewParser()
创建自己的Parser
对象,以位格式传入使用哪些域,例如下面的Parser
使用 6 个域,支持Second
(秒):
1 2 3 |
parser := cron.NewParser( cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, ) |
调用cron.WithParser(parser)
创建一个选项传入构造函数cron.New()
,使用时就可以指定秒了:
1 2 3 4 5 6 7 |
// cron.WithParser(parser) 使用自定义的时间格式 c := cron.New(cron.WithParser(parser)) // "1 * * * * *" 个数和顺序必须 与 自定义的时间格式一致 c.AddFunc("1 * * * * *", func () { fmt.Println("every 1 second") }) c.Start() |
这里时间格式必须使用 6 个域,顺序与上面的const
定义一致。
因为上面的时间格式太常见了,cron
定义了一个便捷的函数:(以下是option.go源码)
1 2 3 4 5 6 |
// option.go func WithSeconds() Option { return WithParser(NewParser( Second | Minute | Hour | Dom | Month | Dow | Descriptor, )) } |
注意Descriptor
表示对@every/@hour
等的支持。有了WithSeconds()
,我们不用手动创建Parser
对象了:
1 |
c := cron.New(cron.WithSeconds()) |
cron.New()的选项
cron
对象创建使用了选项模式,我们前面已经介绍了 3 个选项:
WithLocation
:指定时区;WithParser
:使用自定义的解析器;WithSeconds
:让时间格式支持秒,实际上内部调用了WithParser
。
cron
还提供了另外两种选项:
WithLogger
:自定义Logger
;WithChain
:Job 包装器。
WithLocation
、WithParser
、WithSeconds
这三个选项是与时间相关的设置,上面已经说了,下面说一说另外两个。
WithLogger
WithLogger
可以设置cron
内部使用我们自定义的Logger
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
func main() { // 创建 cron c := cron.New( // 使用自定义Logger cron.WithLogger( cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) // 添加任务 c.AddFunc("@every 1s", func() { fmt.Println("hello world") }) // 开始任务 c.Start() } |
上面调用cron.VerbosPrintfLogger()
包装log.Logger
,这个logger
会详细记录cron
内部的调度过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
$ go run main.go cron: 2020/06/26 07:09:14 start cron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00 hello world cron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00 cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00 hello world cron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00 cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00 cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00 hello world cron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0 |
我们看看默认的Logger
是什么样的:(以下是logger.go的源码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// logger.go var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, false} } func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, true} } type printfLogger struct { logger interface{ Printf(string, ...interface{}) } logInfo bool } |
WithChain
Job 包装器可以在执行实际的Job
前后添加一些逻辑:
- 捕获
panic
; - 如果
Job
上次运行还未结束,推迟本次执行; - 如果
Job
上次运行还未介绍,跳过本次执行; - 记录每个
Job
的执行情况。
我们可以将Chain
类比为 Web 处理器的中间件。实际上就是在Job
的执行逻辑外在封装一层逻辑。我们的封装逻辑需要写成一个函数,传入一个Job
类型,返回封装后的Job
。cron
为这种函数定义了一个类型JobWrapper
:(以下是chain.go源码)
1 2 |
// chain.go type JobWrapper func(Job) Job |
然后使用一个Chain
对象将这些JobWrapper
组合到一起:(以下是chain.go源码)
1 2 3 4 5 6 7 |
type Chain struct { wrappers []JobWrapper } func NewChain(c ...JobWrapper) Chain { return Chain{c} } |
调用Chain
对象的Then(job)
方法应用这些JobWrapper
,返回最终的`Job:(以下是chain.go源码)
1 2 3 4 5 6 |
func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len(c.wrappers)-i-1](j) } return j } |
注意应用JobWrapper
的顺序。
内置JobWrapper
cron
内置了 3 个用得比较多的JobWrapper
:
Recover
:捕获内部Job
产生的 panic;DelayIfStillRunning
:触发时,如果上一次任务还未执行完成(耗时太长),则等待上一次任务完成之后再执行;SkipIfStillRunning
:触发时,如果上一次任务还未完成,则跳过此次执行。
下面分别介绍。
Recover
先看看如何使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type panicJob struct { count int } func (p *panicJob) Run() { p.count++ if p.count == 1 { panic("oooooooooooooops!!!") } fmt.Println("hello world") } func main() { c := cron.New() // Then()是任务 c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{})) c.Start() } |
panicJob
在第一次触发时,触发了panic
。因为有cron.Recover()
保护,后续任务还能执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
go run main.go cron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=... goroutine 18 [running]: github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xbc panic(0x4cf380, 0x513280) C:/Go/src/runtime/panic.go:969 +0x174 main.(*panicJob).Run(0xc0000140e8) D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xba github.com/robfig/cron/v3.Recover.func1.1() D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x6f github.com/robfig/cron/v3.FuncJob.Run(0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x2c github.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x68 created by github.com/robfig/cron/v3.(*Cron).startJob D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0x7a hello world hello world hello world hello world |
我们看看cron.Recover()
的实现,很简单:(以下是cron.go的源码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// cron.go func Recover(logger Logger) JobWrapper { return func(j Job) Job { return FuncJob(func() { defer func() { if r := recover(); r != nil { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] err, ok := r.(error) if !ok { err = fmt.Errorf("%v", r) } logger.Error(err, "panic", "stack", "...\n"+string(buf)) } }() j.Run() }) } } |
就是在执行内层的Job
逻辑前,添加recover()
调用。如果Job.Run()
执行过程中有panic
。这里的recover()
会捕获到,输出调用堆栈。
DelayIfStillRunning
还是先看如何使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type delayJob struct { count int } func (d *delayJob) Run() { time.Sleep(2 * time.Second) d.count++ log.Printf("%d: hello world\n", d.count) } func main() { c := cron.New() // DelayIfStillRunning c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{})) c.Start() } |
上面我们在Run()
中增加了一个 2s 的延迟,输出中间隔变为 2s,而不是定时的 1s:
1 2 3 4 5 |
$ go run main.go 2020/06/27 14:11:16 1: hello world 2020/06/27 14:11:18 2: hello world 2020/06/27 14:11:20 3: hello world 2020/06/27 14:11:22 4: hello world |
看看源码:(以下是chain.go的源码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// chain.go func DelayIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var mu sync.Mutex return FuncJob(func() { start := time.Now() mu.Lock() defer mu.Unlock() if dur := time.Since(start); dur > time.Minute { logger.Info("delay", "duration", dur) } j.Run() }) } } |
首先定义一个该任务共用的互斥锁sync.Mutex
,每次执行任务前获取锁,执行结束之后释放锁。所以在上一个任务结束前,下一个任务获取锁是无法成功的,从而保证的任务的串行执行。
SkipIfStillRunning
还是先看看如何使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type skipJob struct { count int32 } func (d *skipJob) Run() { atomic.AddInt32(&d.count, 1) log.Printf("%d: hello world\n", d.count) if atomic.LoadInt32(&d.count) == 1 { time.Sleep(2 * time.Second) } } func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{})) c.Start() time.Sleep(10 * time.Second) } |
输出:
1 2 3 4 5 6 7 8 9 |
$ go run main.go 2020/06/27 14:22:07 1: hello world 2020/06/27 14:22:10 2: hello world 2020/06/27 14:22:11 3: hello world 2020/06/27 14:22:12 4: hello world 2020/06/27 14:22:13 5: hello world 2020/06/27 14:22:14 6: hello world 2020/06/27 14:22:15 7: hello world 2020/06/27 14:22:16 8: hello world |
注意观察时间,第一个与第二个输出之间相差 3s,因为跳过了两次执行。
注意DelayIfStillRunning
与SkipIfStillRunning
是有本质上的区别的,前者DelayIfStillRunning
只要时间足够长,所有的任务都会按部就班地完成,只是可能前一个任务耗时过长,导致后一个任务的执行时间推迟了一点。SkipIfStillRunning
会跳过一些执行。
看看源码:(以下是源码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func SkipIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var ch = make(chan struct{}, 1) ch <- struct{}{} return FuncJob(func() { select { case v := <-ch: j.Run() ch <- v default: logger.Info("skip") } }) } } |
定义一个该任务共用的缓存大小为 1 的通道chan struct{}
。执行任务时,从通道中取值,如果成功,执行,否则跳过。执行完成之后再向通道中发送一个值,确保下一个任务能执行。初始发送一个值到通道中,保证第一个任务的执行。
其它API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package main import ( "fmt" "time" "github.com/robfig/cron/v3" ) func main() { job := cron.New() // 根据任务id移除任务 job.Remove(jobId) // 根据任务id 获取任务实体 entry := job.Entry(jobId) fmt.Printf("id为 %v 的实体:%#v \n",addJobChainId,entry) // 获取所有任务实体 entryArr := job.Entries() for _,val := range entryArr { fmt.Printf("entryArr - 实体:%#v \n",val) } // 获取时区 location := job.Location().String() fmt.Printf("location:%v \n",location) job.Start() // 开始任务 //defer job.Stop() // 停止任务, 但是不能关闭已经在执行中的任务 } |