Go的timerate是如何实现的
代码概览
源码地址: https://github.com/golang/time
在我看来,这块代码写的非常好,至少有三点非常值得我们学习:
- 对于令牌桶限流器的工作原理理解的非常通透,在充分理解的基础上,将原有的算法进行了简化。去掉了桶(实际上是个队列或者其他数据结构)的维护成本,优化为 (时间X速率) 的方法,现用现取,整体变得非常紧凑。
- 业务场景考虑的非常全面。速率和容量都有非常明确的边界,基础数据结构的抽象和组合也很恰当。提供的接口功能也非常完善,几乎没有废话。
- 注释很完整,学习起来很轻松。

基础结构
Limiter
type Limiter struct {
mu sync.Mutex //并发锁,取令牌 生成令牌,当前状态时都会用到
limit Limit //每秒令牌的生产速度,Limit其实就是float64的别名
burst int //初始令牌数量,在实际使用中,还有桶的容量的作用。另外,如果生产速度为float64的最大值,这个容量在某些逻辑里不会生效。
tokens float64 //桶里当前的令牌数
last time.Time //最近一次更新令牌的时间
lastEvent time.Time //最近一次事件发生的时间,可以是过去,也可以是未来。用在预约令牌的地方
}
可以思考下为什么桶的容量是整形,当前数量却是浮点类型不是整形。
Reservation
type Reservation struct {
ok bool //是否预约成功
lim *Limiter //限流器
tokens int //本次预约的令牌数量
timeToAct time.Time //生产出足够的令牌数的时间点,注意是 时间点。
limit Limit //预约时的生产速率
}
这个结构只在预约的时候会被用到,其他方法只需要返回部分结果就行了。具体使用时会说到。
基础方法
Allow/AllowN
Allow最基础的方法,需要几个就从桶里取几个,如果不足,直接返回错误。
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok //reserveN 会返回一个Reservation结构
}
Wait/WaitN
wait 是我们最常用的,也是推荐使用的方法。他本质上非常容易理解:
- 如果令牌桶中有足够的数量,则直接返回。
- 如果令牌桶中没有足够的数量,则等待生成。如果等待的时长大于ctx的超时配置,或者ctx.done了,直接返回错误。
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
timer := time.NewTimer(d)
return timer.C, timer.Stop, func() {}
}//初始化一个定时器,最后的这个返回的函数,目前只在test中有用到。
return lim.wait(ctx, n, time.Now(), newTimer)
}
func (lim *Limiter) wait(ctx context.Context, n int, now time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
...
if n > burst && limit != Inf {
//如果需要的令牌数量大于桶的容积,并且生产速率低于最大值,直接返回错误。
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
//先校验ctx的的状态
select {
case <-ctx.Done():
return ctx.Err()
default:
}
//和ctx 的超时时间进行比较,也就是在ctx 超时之前, 最多能等多久
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// 真正获取令牌
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 判断下当前时间的差,如果是0 就说明不需要等待,直接返回就行了。
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
//把一开始的定时器传进来,把我们需要等待的【时长】 放进去
ch, stop, advance := newTimer(delay)
defer stop()
...
select {
case <-ch:
return nil //到点了,正常返回。意味着 已经拿到令牌可以正常进行了。
case <-ctx.Done():
r.Cancel() //等待超时,取消此次预约。并返回错误
return ctx.Err()
}
}
Reserve/ReserveN
Reserve本质上就是将返回的Reservation结构交到开发者手上,由开发者决定后续操作。如果能看明白wait的逻辑,基本就大致知道这个方法的实际用法。
源代码中实际上有使用的例子,本质上和wait实现的功能相似。将控制权交给开发,可以实现更多更有创意的用法,同样的也要承担可能出现BUG的风险。
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
核心方法
获取令牌 reserveN
reserveN 是三种限流器的基础方法。有几个注意事项,源码注释有说明:
- maxFutureReserve 在wait中生效,是能够允许的最大等待时长
- reserveN返回的是值,并不是指针。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock() //分配时加锁。因此,限流器的所有操作,在这里都被限制为串行的。
defer lim.mu.Unlock()
//处理极端情况。其实这里可以认为预留了人工干预的入口,可以通过SetLimit的方法 关闭或者放开限流。
if lim.limit == Inf {
//如果 令牌的生成速率为最大值,则直接返回 OK 为true。
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
} else if lim.limit == 0 {
//如果 令牌的生成速率为0,则先把桶内的令牌消耗完毕,之后的全部返回 OK 为 false。
var ok bool
if lim.burst >= n {
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: now,
}
}
//可以简单理解为 生产令牌
now, last, tokens := lim.advance(now)
// 计算一下,扣掉需要的令牌后,桶里剩下的令牌数
tokens -= float64(n)
//计算需要等待的时长。注意,只有桶里的余量不足时才需要计算。
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 计算下本次申请令牌能否成功。申请的令牌数要小于容量,实际等待时长也要小于wait传入的时长
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)//下次需要推进的时间点。这里决定了我们一开始说的,timeToAct字段一定是现在 或者未来的时间点。
}
// 更新下限流器数据
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
return r
}
生产令牌 advance
advance可以简单理解为生产token,可以预支。另外它有两个注意事项,注释也有写:
- 该方法不会更改Limiter的任何属性,需要调用方自行修改。
- 该方法内没有加锁,但它的操作需要加锁。可以看下所有调用advance的方法,都是先加锁,再调用。这也和第一条相呼应。我们正常开发的时候也要有这么一个原则:谁持有锁,谁来操作对象。
- advance本质就是计算下上次生产的时间点和当前的时间差,再乘上速率,就是当前能生成的令牌数。
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now //如果传入的时间,早于最后一次执行时间,那么以最早的时间点为准。
}
elapsed := now.Sub(last) //计算时间差
delta := lim.limit.tokensFromDuration(elapsed) //计算时间差里,生成的令牌数。
tokens := lim.tokens + delta //令牌桶里的余量+新生成的量就是当前总量
if burst := float64(lim.burst); tokens > burst {
tokens = burst //如果新生成的量大于桶的容量,则以容量为准。
}
return now, last, tokens
}
其他方法
Reservation 的取消方法
如果ctx过了超时设置,wait方法就需要把已经预支的令牌还给令牌桶,这个时候需要用到Cancel方法。代码看起来很抽象,很难以理解。如果上面都能看懂,并且理解的话,这里其实就是反向操作。
func (r *Reservation) Cancel() {
r.CancelAt(time.Now())
}
func (r *Reservation) CancelAt(now time.Time) {
...
r.lim.mu.Lock() //将限流器加锁
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
// 生产速率最大,或者申请的令牌数为0 ,或者已经过了生产点了,直接返回。
return
}
//计算一下 我们需要还给桶的令牌数=申请的总数-上次生产到我们预约生产的区间内能生产的总数
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
now, _, tokens := r.lim.advance(now) //生产令牌
tokens += restoreTokens //桶中的余量+当前已经生产出来的量
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst //和桶的容量进行比较
}
//更新限流器的状态
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
//如果当前限流器的最近事件触发时间点等于当前预约需要的时间点,需要将时间点回调。
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
}
一些重置属性的方法
Limiter是可以在使用中调整桶的容量和令牌的生产速率的。
func (lim *Limiter) SetLimit(newLimit Limit) {
lim.SetLimitAt(time.Now(), newLimit)
}
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock() //加锁也就意味着,生产速率的变化是串行的。
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now) //先生产令牌
lim.last = now
lim.tokens = tokens
lim.limit = newLimit //后改变速率
}
func (lim *Limiter) SetBurst(newBurst int) {
lim.SetBurstAt(time.Now(), newBurst)
}
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) {
lim.mu.Lock() //加锁也就意味着,修改桶的容量的变化是串行的。
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now)// 先生产
lim.last = now
lim.tokens = tokens
lim.burst = newBurst //再设置新的容量
}
这里有一点需要注意,其实源代码中的注释也写的非常清晰:设置生产速率和容量是不会影响之前时间段的,只会影响当前时间点之后。如果同时有其他服务再获取令牌,互相都不会影响。
一些便捷计算的方法
注意,这两个方法都只属于Limit,也就是上文中说的float64的别名,Limiter中的limit属性
//生产N个令牌需要多长时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
if limit <= 0 {
return InfDuration //生成速率为0时,直接拉满。
}
seconds := tokens / float64(limit)
return time.Duration(float64(time.Second) * seconds)
}
//一段时间内能生成多少个令牌
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
if limit <= 0 {
return 0 //如果生产速率小于0,则返回0
}
return d.Seconds() * float64(limit) //返回生产的令牌数,这里也能说明,limit 就是每秒生产的令牌数。
}
总结
这块代码减去注释,也不过200多行,就能完整实现一个非常常用,非常经典的算法。非常值得我们学习他的业务抽象能力和接口设计与编写能力。我根据自己的情况总结了一下学到的知识点:
- 这个限流器是对令牌桶的进一步抽象。 时间*速率 很简单的常识,其实就是令牌桶的核心思想。根据这个核心来设计代码,的的确确事半功倍。
- 业务的边界状态要考虑清楚。任何业务场景都会有多个边界问题,需要提前想清楚业务在各个边界点呈现的状态,才能保证不会出BUG。
- 对 时间 的概念,要有一个深入的认识。很多问题归根结底都是 时间+ 的问题。