9 定时器
1. 简介
我们常常会遇到一些业务场景,需要我们每隔一段时间执行以下代码。比如:
- 每半小时触发一条消息,提醒活动活动身体。
- 每天早上8点,发送今日的天气情况。
- 每周5下午定时发送一下周报。
这种业务场景,就需要我们用到定时器。最常用的定时器是linux下的crontab,之前使用PHP或者Python的时候,就是通过定义crontab,定时执行一条脚本语句实现定时任务。
提示
引申,熟悉下crontab。它仍然是常用的定时任务方案,绝大多数的自用小工具,都会使用。
2. Go语言中的定时器
先看下Go里能够实现定时任务的工具:
func tick0() {
//常见的定时器方法
<-time.Tick(time.Second) //不常用,参数不能小于等于0,底层调用的NewTicker,返回一个单向通道
<-time.After(time.Second) //底层调用NewTimer,返回一个单向通道
<-time.NewTicker(time.Second).C //最常用的方法,大多数业务都会用这个方法
<-time.NewTimer(time.Second).C //在业务场景下用的少,在中间件里用到很多。
time.AfterFunc(time.Second, func() { /*do*/ }) //底层是对timer的封装,用的也不多,个别场景下要比NewTimer好用的多。
time.Sleep(time.Second) //最常用的方法。
}
我们逐个看一下:
2.1. NewTimer
func tick1() {
//NewTimer只会执行一次
t := time.NewTimer(3 * time.Second)
for {
select {
case <-t.C: //timer只会触发一次chan
fmt.Printf("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!")
}
}
}
2.2. NewTicker
绝大多数的定时任务都会采用NewTicker,多数公司内部使用的定时任务框架也是基于Ticker的封装。
func tick2() {
//最常用的定时任务方案
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop() //随手关闭。如果不关闭有可能会引发内存溢出
for {
select {
case t := <-ticker.C: //ticker会一直重复发送。
fmt.Println("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!", t.Unix())
}
}
}
2.3. Tick和After
Tick和After是针对ticker和timer的一层封装,他们都返回了一个单向通道。这就限定了这两个方法的使用场景和范围。以下是一个非常典型的错误用法。
func tick3() {
//错误的用法:这样子写会导致重复定义多个定时器!并且,这样子时间长的永远不会被执行。
for {
select {
case t := <-time.Tick(4 * time.Second): //Tick相当于快速启动一个ticker。
fmt.Println("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!Tick:", t.Unix())
case a := <-time.After(2 * time.Second): //After相当于快速启动一个timer。
fmt.Println("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!After:", a.Unix())
}
}
//这个写法必然会导致不停的创建新的timer引发内存泄露,引发的故障表现为协程数量猛增,CPU使用率很高。通常用不到这两个方法。
}
错误的原因可以看下他们的源码,这两个方法的官方注释写的非常清楚。
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C //问题出在这里,每次调用都会生成一个新的Ticker,之前的没有被回收掉。
}
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
另外,After会用在某些长耗时的任务之后,作为收尾,不过场景很少。
重要
这里再次重申,不用的timer和ticker随手Stop掉。“梯子不用时请横放”
2.4. AfterFunc
func tick4() {
//AfterFunc 相当于延时执行某个方法。并且只会执行一次。
//它不需要开发人员再去管理Timer,只需要完成需要延时执行的方法即可。
time.AfterFunc(3*time.Second, func() {
fmt.Println("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!AfterFunc:", time.Now().Unix())
})
select {}
}
3. Stop和Reset
这两个方法可以手动管理定时器停止或者按照新的入参重启。日常情况下,Stop用的比较多。这里可以看下源码的注释,写的比较清楚,这里就不展开了。
Reset通常在业务代码中是不建议使用的。在使用不当的情况下,会让代码逻辑陷入阻塞,进而报错。排查的时候很难模拟具体的场景。
4. 底层逻辑
4.1. Timer和Ticker
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
r runtimeTimer
}
type Timer struct {
C <-chan Time
r runtimeTimer
}
是的,没错。他们两个的结构体定义是一模一样的,区别在于runtimeTimer的实现。
4.2. runtimeTimer
基于1.19版本,已经支持泛型了。整个定时器的底层是比价复杂的,需要较多的基础知识,我们只简单聊一下。
type runtimeTimer struct {
pp uintptr //计时器所在的处理器 P 的指针地址
when int64 //计时器被唤醒的时间
period int64 //两次被唤醒的间隔
f func(any, uintptr) // NOTE: must not be closure
arg any //计时器被唤醒时调用 f 传入的参数
seq uintptr //回调函数的参数,该参数仅在 netpoll 的应用场景下使用
nextwhen int64 //当计时器状态为 timerModifiedXX 时,将会使用 nextwhen 的值设置到 where 字段上。
status uint32 //计时器的状态;
}
这些字段全部都是私有字段,我们是无法感知的。在具体的使用过程中,Timer和Ticker在实现上有一些差别:
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1) //带有一个缓冲区的通道
t := &Timer{ //实例化Timer
C: c,
r: runtimeTimer{
when: when(d),//返回一个int64
f: sendTime,//向缓冲区发送消息的方法
arg: c,
},
}
startTimer(&t.r) //启动定时器
return t
}
func NewTicker(d Duration) *Ticker {
if d <= 0 { //NewTicker 不允许出现负值
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d), //比着Timer唯一区别的地方。
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
//这个写法和思想是可以借鉴的。
func sendTime(c any, seq uintptr) {
select {
case c.(chan Time) <- Now(): //我们无需关注通道中是否有元素,只管往里面写,成功与否都能返回
default: //写入成功,通道中有了一个元素,符合预期;写入不成功,则说明通道中已有元素,符合预期。
}
//这样也保证了,上一个任务触发了,但是还没有轮到它执行时,下一个任务又触发的冲突场景。
}
4.3. 四叉堆 和 状态机
这一块内容比较复杂,我们不展开了,后续可以看引申阅读了解详细。Go语言的定时器会存在一个四叉堆的数据结构中,每次操作(增删改,运行)等等都会改变定时器的状态,再通过状态机触发相应的函数操作调整四叉堆的存储。
提示
任何定时器的实现都不是完美的。大多数情况下,我们允许有误差,只要足够小就行。
四叉堆的结构
5. 优化
5.1. 一个通用的封装
type f func(c context.Context) error
type MTicker struct {
C context.Context
T *time.Ticker
Worker f
}
func NewMTicker(c context.Context, d int, f f) *MTicker {
return &MTicker{
C: c,
T: time.NewTicker(time.Duration(d) * time.Second),
Worker: f,
}
}
func (t *MTicker) Start() {
for {
select {
case <-t.T.C:
if err := t.Worker(t.C); err != nil {
//处理异常
}
}
}
}
func (t *MTicker) Stop() {
t.T.Stop()
}
func main() {
t := NewMTicker(context.Background(), 1, func(c context.Context) error {
fmt.Printf("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!")
return nil
})
defer t.Stop()
t.Start()
}
5.2. 一个需要注意的场景
func testTicker() {
tick := time.NewTicker(time.Second / 100) //一秒执行一百次
go func() {
time.Sleep(10 * time.Second) //10秒之后结束定时器,使for range 终止
tick.Stop() //注意啊 这个stop不会关闭底层的通道,避免出现错误
fmt.Printf("tick is closed\n")
}()
for range tick.C { //一个不常用的用法,tick.C不会close,代码会一直阻塞在这里
fmt.Printf("关注香香编程喵喵喵,关注香香编程谢谢喵喵喵!") //理论上会执行10*100次
}
fmt.Printf("testPprof end \n") //这一行是无法被打印出来的
return
}
提示
可以考虑下,这种情况如何优化。
5.3. 分布式定时任务
这里不展开哈,这是个业内常见的难题之一。定时任务最怕的就是部署在多台机器上。根据不同的场景,有不同的解决方法。
第一种:单台机器,不存在问题。
第二种:多台机器,定时任务刷新当前服务内部缓存。这样的定时任务不会互相干扰,互相影响,可以直接部署。
第三种:多台机器,多个定时任务会互相干扰。比如生成一份周报。常见的做法有三类:
- 指定某台机器专门处理这些定时任务。一般小型后台管理系统可以这么用,简单粗暴。缺点是,当添加了很多长耗时的定时任务后,可能会影响到任务执行的效率,增加机器开销。
- 搭建一个统一的定时任务平台,通过配置时间和路径(往往是HTTP请求),让平台来帮忙触发任务。利用网关的特性,让定时任务请求均匀打在指定的几台服务器上。优点是使用便捷,开发量会变小。缺点是,没有根本上解决多任务竞争问题(上一个任务没有执行完成时,下一个任务就已经发起了)此时,还是需要我们自己设置一把分布式锁来控制。多数场景下的使用方案,看着很傻,胜在好用。也是我们之前最常用的方案,戏称为草台班子方案。
- 基于其他中间件,比如K8s或者ETCD等。优点是开箱即用,摆脱很多烦恼,缺点是需要一些人进行维护,而且不太好维护。
6. 引申
常见的定时器有这么几种:
- Go语言使用的四叉堆。
- Linux使用的kernel时间轮。
- Nginx的红黑树。
定时器在大多数的中间件中都是核心模块,可以详细了解下。另外,绝大多数业务场景只要与时间扯上关系,都会存在各种各样问题:润秒,时区,精度,边界Case等等,在开发中要特别小心。
引申阅读:
https://xargin.com/go-timer/
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-timer/
Issues · talkgo/night · GitHub