7 基于Lease租约机制
租约一般用在微服务场景下,或者分布式缓存里
重要
以下策略有一个大前提:业务场景是读操作远远多于写操作。
1. 关于租约
租约机制是斯坦福大学在 1989 年提出来的一种方案,主要是为了解决分布式系统中缓存一致性问题的。没错,我们现在使用的技术方案,又是上世纪 90 年代出现的。
在分布式系统中,缓存的使用虽然能提高系统性能,但也带来了数据一致性的挑战。一方面,缓存的存在引入了确保数据一致性的开销和复杂性,降低了部分性能优势。另一方面,分布式系统中的通信延迟、网络故障以及主机故障等问题,使得缓存数据的一致性维护变得更加困难。例如,当多个客户端同时缓存了同一数据,而服务器端的数据发生变化时,如何及时、有效地通知客户端更新缓存,以保证数据的一致性,就是一个亟待解决的问题。
提示
租约机制提供了一种新的思路,它很像一个 cookie,一张门票。它可以保证在有效期内,缓存是一致的,从而减少不必要的请求。并且它还支持多种更新方式,既可以通过时间过期,又可以主动去通知客户过期,还可以去为现有的租约续期。
2. 一种尝试
我们先来进行一个尝试,用一种贴近生活的思路实现一个简单的租约。为此,我们需要把原有的服务进行微服务的简单改造。
2.1. 创建 PROTO
syntax = "proto3";
//protoc --go_out=. --go-grpc_out=. cache.proto
option go_package = "./cache";
message Info{
int64 id = 1;
string name = 2;
string created_time = 3;
string updated_time = 4;
}
message CommonResponse{
int32 code = 1;
string msg = 2;
}
message GetInfoRequest{
int64 id = 1;
}
message GetInfoResponse{
Info info = 1;
int64 TTL = 2;
}
message SetInfoRequest{
Info info = 1;
}
service CacheService {
rpc GetInfo(GetInfoRequest) returns(GetInfoResponse);
rpc SetInfo(SetInfoRequest) returns(CommonResponse);
}
然后在当前文件 所在的路径执行下:
protoc --go_out=. --go-grpc_out=. cache.proto
你会得到这两个文件:

这样子,pb 就完成了。
提示
关于protoc 工具和其他组件的安装,可以参考其他项目里的文档。
2.2. 实现接口方法
package logic
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"training/cache/appv5/db"
"training/cache/appv6/rpc/cache"
)
type InfoLogic struct {
cache.CacheServiceServer
}
func (i *InfoLogic) GetInfo(c context.Context, q *cache.GetInfoRequest) (*cache.GetInfoResponse, error) {
r := &cache.GetInfoResponse{}
id := q.GetId()
//先获取redis
key := fmt.Sprintf("info_%d", id)
ca, err := db.Rdb.Get(c, key).Result()
if err != nil {
fmt.Println("这是缓存的结果:" + fmt.Sprint(ca))
return r, err
}
if ca != "" {
// 获取过期时间
ttl, _ := db.Rdb.TTL(c, key).Result()
var data db.Info
_ = json.Unmarshal([]byte(ca), &data)
r.Info.Id = int64(data.ID)
r.Info.Name = data.Name
r.Info.UpdatedTime = data.UpdatedAt.Format("2006-01-02 15:04:05")
r.Info.CreatedTime = data.CreatedAt.Format("2006-01-02 15:04:05")
r.TTL = ttl.Milliseconds()
return r, nil
}
//redis 不存在就查询数据库
info := db.Info{}
data := info.Get(int(id))
if data.ID <= 0 {
return r, errors.New("数据不存在")
}
//如果数据存在,就写缓存
if err = db.Rdb.Set(context.Background(), key, fmt.Sprint(data), time.Second*5).Err(); err != nil {
return r, err
}
//构建返回结构体
r.Info.Id = int64(data.ID)
r.Info.Name = data.Name
r.Info.UpdatedTime = data.UpdatedAt.Format("2006-01-02 15:04:05")
r.Info.CreatedTime = data.CreatedAt.Format("2006-01-02 15:04:05")
r.TTL = int64(time.Second * 5)
return r, nil
}
func (i *InfoLogic) SetInfo(c context.Context, q *cache.SetInfoRequest) (r *cache.CommonResponse, err error) {
// 这里依然采用读更新,写删除的策略
//TODO: 增加一些参数校验
//先写数据库,然后过期掉缓存。
info := db.Info{}
if err = info.Save(int(q.Info.Id), q.Info.Name); err != nil {
r.Code = 20001
r.Msg = "更新失败"
return r, err
}
//将del命令改造为过期时间改为0
key := fmt.Sprintf("info_%d", q.Info.Id)
_ = db.Rdb.PExpire(context.TODO(), key, 1*time.Millisecond).Err()
r.Code = 0
r.Msg = "ok"
return r, nil
}
2.3. 让服务跑起来
package appv6
import (
"google.golang.org/grpc"
"log"
"net"
"training/cache/appv6/db"
"training/cache/appv6/logic"
"training/cache/appv6/rpc/cache"
)
func Start() {
//初始化数据库和缓存
db.NewDb()
db.NewRdb()
lis, err := net.Listen("tcp", ":9099")
if err != nil {
panic(err)
}
// 创建一个 gRPC 服务器实例
s := grpc.NewServer()
// 注册 UserService 服务
cache.RegisterCacheServiceServer(s, &logic.InfoLogic{})
fmt.Println("GRPC服务启动!正在监听端口 9099...")
// 启动 gRPC 服务器
if err = s.Serve(lis); err != nil {
fmt.Println("failed to serve: %v", err)
}
}
2.4. 存在的问题
这种方式其实就是在原有的“读更新,写删除”的基础上,增加了一个带有有效时间的值,比如 10S, 告诉使用方,这个缓存的有效期是多久,有效期内,可以一直使用。不必再发起新的请求。但是,你可能已经发现问题了:
警告
你说 10 秒,就 10 秒?要是中间有人更新了怎么办,那岂不是有一段时间不一致了?
是的,这就是麻烦事,不过我们可以给接口增加一个强制回源的“后门”:
message GetInfoRequest{
int64 id = 1;
bool flag = 2;
}
然后在逻辑种加上:
if !q.GetFlag() {
//先获取redis
key := fmt.Sprintf("info_%d", id)
ca, err := db.Rdb.Get(c, key).Result()
if err != nil {
fmt.Println("这是缓存的结果:" + fmt.Sprint(ca))
return r, err
}
if ca != "" {
// 获取过期时间
ttl, _ := db.Rdb.TTL(c, key).Result()
var data db.Info
_ = json.Unmarshal([]byte(ca), &data)
r.Info.Id = int64(data.ID)
r.Info.Name = data.Name
r.Info.UpdatedTime = data.UpdatedAt.Format("2006-01-02 15:04:05")
r.Info.CreatedTime = data.CreatedAt.Format("2006-01-02 15:04:05")
r.TTL = ttl.Milliseconds()
return r, nil
}
}
//redis 不存在就查询数据库
info := db.Info{}
data := info.Get(int(id))
if data.ID <= 0 {
return r, errors.New("数据不存在")
}
//如果数据存在,就写缓存
if err = db.Rdb.Set(context.Background(), key, fmt.Sprint(data), time.Second*5).Err(); err != nil {
return r, err
}
这样子就留好了一个强制回源的后门。
用了 TTL+强制回源的方案, 如果你对数据一致性的要求较高,想获取最新的数据,直接走强制回源就行了;如果对数据一致性要求没那么高,就沿用旧数据呗。这个方案是一种典型的二八法则,我提供几个不同的方案,你根据自身情况选择合适的。能适用绝大多数情况,又不必为某一个具体的用户量身定制。但是:
相关信息
如果一个方案好用,那它一定会被滥用。
当你把这套 RPC 接口放出去以后,你会发现越来越多的人,都倾向于走后门,整体情况根本没有像你想象的方向发展。因为:
干技术也要懂人心。
重要
这个方案的最终结局是,隐藏后门。要么以技术大佬的行政命令强制推行,要么黯然离场。而技术大佬往往不会为这种离谱的方案背书,后者便成了唯一的归宿。
3. 租约实现
3.1. 架构图

3.2. 流程图
3.2.1. 获取数据

3.2.2. 更新数据
你可以看完课程后自己尝试画一下,其实很简单,就是读更新,写删除时,加上一个更新租约版本号的机制。
3.3. 代码块
3.3.1. 声明 PROTO
照例,我们先把 proto 文件整理下 :
syntax = "proto3";
//protoc --go_out=. --go-grpc_out=. cache.proto
option go_package = "./cache";
message Info{
int64 id = 1;
string name = 2;
string created_time = 3;
string updated_time = 4;
}
message LeaseCache{
Info info = 1; //数据结构
int64 lease_start = 2; //租约开始时间 毫秒时间戳
int64 lease_end = 3; //
int64 version = 4;
}
message CommonResponse{
int32 code = 1;
string msg = 2;
}
message GetInfoRequest{
int64 id = 1;
bool flag = 2;
}
message GetInfoResponse{
LeaseCache lease = 1;
int64 TTL = 2;
}
message SetInfoRequest{
Info info = 1;
}
message PingRequest{
string key = 1;
int64 version = 2;
}
message RegisterHandleRequest{
string ip = 1;
string port = 2;
string uri = 3;
}
service CacheService {
rpc GetInfo(GetInfoRequest) returns(GetInfoResponse);
rpc SetInfo(SetInfoRequest) returns(CommonResponse);
rpc RegisterHandle(RegisterHandleRequest) returns(CommonResponse);
rpc Ping(PingRequest) returns(CommonResponse);
}
3.3.2. 实现 GET 和 SET 方法
然后我们实现一下 Get 和 Set 方法,我们先不实现后面两个。
package logic
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"training/cache/appv5/db"
"training/cache/appv6/rpc/cache"
)
type InfoLogicV1 struct {
cache.CacheServiceServer
}
type Lease struct {
Version int64
Start int64
End int64
Value []byte
}
func (i *InfoLogicV1) GetInfo(c context.Context, q *cache.GetInfoRequest) (*cache.GetInfoResponse, error) {
r := &cache.GetInfoResponse{}
id := q.GetId()
key := fmt.Sprintf("info_%d", id)
if q.Flag != true {
//先获取redis
ca, err := db.Rdb.Get(c, key).Result()
if err != nil {
fmt.Println("这是缓存的结果:" + fmt.Sprint(ca))
return r, err
}
if ca != "" {
r.Lease = buildReturn(ca)
return r, nil
}
}
//redis 不存在就查询数据库
info := db.Info{}
data := info.Get(int(id))
if data.ID <= 0 {
return r, errors.New("数据不存在")
}
//如果数据存在,就构造缓存
now := time.Now().UnixMilli()
end := now + (time.Second * 4).Milliseconds() //租约有效期,应当略小于缓存有效期
l := &Lease{
Version: now,
Start: now,
End: end,
Value: nil,
}
ca := cache.Info{
Id: int64(data.ID),
Name: data.Name,
CreatedTime: data.CreatedAt.Format("2006-01-02 15:04:05"),
UpdatedTime: data.UpdatedAt.Format("2006-01-02 15:04:05"),
}
l.Value, _ = json.Marshal(&ca)
//保存缓存
str, _ := json.Marshal(l)
if err := db.Rdb.Set(context.Background(), key, str, time.Second*5).Err(); err != nil {
return r, err
}
// 保存租约,一般设置为不过期
leaseKey := fmt.Sprintf("info_version_%d", data.ID)
_ = db.Rdb.Set(context.Background(), leaseKey, now, 0).Err()
return r, nil
}
func (i *InfoLogicV1) SetInfo(c context.Context, q *cache.SetInfoRequest) (r *cache.CommonResponse, err error) {
// 这里依然采用读更新,写删除的策略
//TODO: 增加一些参数校验
//先写数据库,然后过期掉缓存。
info := db.Info{}
if err = info.Save(int(q.Info.Id), q.Info.Name); err != nil {
r.Code = 20001
r.Msg = "更新失败"
return r, err
}
//将缓存过期
key := fmt.Sprintf("info_%d", q.Info.Id)
_ = db.Rdb.PExpire(context.TODO(), key, 1*time.Millisecond).Err()
//将保存的租约更新到最新版本
leaseKey := fmt.Sprintf("info_version_%d", q.Info.Id)
now := time.Now().UnixMilli()
ver, _ := db.Rdb.Get(c, key).Int64()
if ver <= now {
//更新版本号
_ = db.Rdb.Set(context.Background(), leaseKey, now, 0).Err()
}
r.Code = 0
r.Msg = "ok"
return r, nil
}
func buildReturn(str string) *cache.LeaseCache {
var data Lease
var info cache.Info
_ = json.Unmarshal([]byte(str), &data)
_ = json.Unmarshal(data.Value, &info)
l := &cache.LeaseCache{
Info: &info,
LeaseStart: data.Start,
LeaseEnd: data.End,
Version: data.Version,
}
return l
}
提示
注意:这块代码只提供了基本思路,如果要在生产环境种使用,还需要严格的测试。
3.3.3. 实现 pull 和 push 两种同步方法
pull 方法
这个方法很简单,就是双方定时发起心跳,一方面确保双方都活着,另一方面可以通过发送版本号,来确定数据是否发生了更改。直接上代码:
func (i *InfoLogicV1) Ping(c context.Context, q *cache.PingRequest) (*cache.CommonResponse, error) {
r := &cache.CommonResponse{}
//TODO: 增加入参校验
//获取租约
leaseKey := fmt.Sprintf("info_version_%s", q.Key)
ver, err := db.Rdb.Get(c, leaseKey).Int64()
if err != nil {
r.Code = 20002
r.Msg = "服务内部错误"
return r, err
}
if ver > q.Version {
r.Code = 20001
r.Msg = "版本已过期"
return r, errors.New("版本已过期")
}
// 如果版本号没有发生变化,就直接返回。
r.Code = 0
r.Msg = "ok"
return r, nil
}
一般客户端实现起来稍微麻烦些,需要起一个定时器,每秒发起一次 Ping 请求,以此获取双方的存活状态,并且能够同步信息。我们写一个模拟客户端发起 Ping 请求的代码:
package appv6
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"time"
"training/cache/appv6/rpc/cache"
)
var CacheServer cache.CacheServiceClient
var CacheLease *cache.LeaseCache
var Version int64
var Key = 1
func init() {
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer func() {
_ = conn.Close()
}()
CacheServer = cache.NewCacheServiceClient(conn)
}
func Run() {
getLease()
t := time.NewTicker(1 * time.Second)
for {
select {
case <-t.C:
ping()
}
}
}
func ping() {
req, err := CacheServer.Ping(context.Background(), &cache.PingRequest{
Key: "1",
Version: Version,
})
if err != nil {
return
}
if req.GetCode() != 0 {
getLease()
}
}
func getLease() {
req, err := CacheServer.GetInfo(context.Background(), &cache.GetInfoRequest{Id: int64(Key)})
if err != nil {
fmt.Println(err)
return
}
CacheLease = req.GetLease()
Version = req.GetLease().GetVersion()
}
Push 方式
提示
Push 的方法类似一个观察者模型
先定义一个 schedule 方法:
package schedule
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
)
var Schedules *Tasks
type Tasks struct {
data []*Task
mux sync.RWMutex
}
type Task struct {
IP string
Port string
Uri string
}
func AddSchedule(s Task) {
if Schedules == nil {
data := make([]*Task, 0)
Schedules = &Tasks{
data: data,
}
}
Schedules.mux.Lock()
defer Schedules.mux.Unlock()
Schedules.data = append(Schedules.data, &s)
return
}
func Do() {
Schedules.mux.RLock()
defer Schedules.mux.RUnlock()
for _, task := range Schedules.data {
go func() {
doTask(task)
}()
}
}
func doTask(task *Task) {
url := fmt.Sprintf("http://%s:%s%s?flag=true", task.IP, task.Port, task.Uri)
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
}
defer func() {
_ = resp.Body.Close()
}()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}
然后,我们只需要实现一下RegisterHandle 方法:
func (i *InfoLogicV1) RegisterHandle(c context.Context, q *cache.RegisterHandleRequest) (*cache.CommonResponse, error) {
r := &cache.CommonResponse{}
t := schedule.Task{
IP: q.Ip,
Port: q.Port,
Uri: q.Uri,
}
schedule.AddSchedule(t)
r.Code = 0
r.Msg = "ok"
return r, nil
}
最后,只需要在修改以后,执行一下 Do 方法即可。
func (i *InfoLogicV1) SetInfo(c context.Context, q *cache.SetInfoRequest) (r *cache.CommonResponse, err error) {
// 这里依然采用读更新,写删除的策略
//TODO: 增加一些参数校验
//先写数据库,然后过期掉缓存。
info := db.Info{}
if err = info.Save(int(q.Info.Id), q.Info.Name); err != nil {
r.Code = 20001
r.Msg = "更新失败"
return r, err
}
//将缓存过期
key := fmt.Sprintf("info_%d", q.Info.Id)
_ = db.Rdb.PExpire(context.TODO(), key, 1*time.Millisecond).Err()
//将保存的租约更新到最新版本
leaseKey := fmt.Sprintf("info_version_%d", q.Info.Id)
now := time.Now().UnixMilli()
ver, _ := db.Rdb.Get(c, key).Int64()
if ver <= now {
//更新版本号
_ = db.Rdb.Set(context.Background(), leaseKey, now, 0).Err()
}
//主动推消息
schedule.Do()
r.Code = 0
r.Msg = "ok"
return r, nil
}
警告
引申:如果后续订阅的 服务越来越多,需要加一个配置文件,让服务启动的时候就把信息加载进来。
3.4. 优势和缺点
租约最大的优势就是能够有效解决缓存不一致的,并且捎带手搞定了缓存击穿的问题。可是代价也是非常大的,需要写大量的代码和细节。
经常写项目的同学应该能够理解,代码细节太多,出问题的概率也比较大,而且开发和调试也是一件非常令人头疼的事情。但这些还不是最大的缺点,真正的缺点是:租约并不是万金油,它并不是一个通用的解决方案,它只能在特殊业务场景下发挥作用。
4. 适用场景
希望你还记得我们的大前提:读操作远远多于写操作。 跑开了这个大前提,一切都是白搭。
基于租约实现的缓存一致性,往往用在分布式缓存种,比如 ETCD 就使用了这种方式。它的优点是可以在一定程度上保证强一致性,而且性能上也相当强。
可是,就像我们反复强调过的:想要得到一些,就必定要失去一些。
租约模式,实现起来过于复杂,无论是主动心跳还是被动通知,都是非常麻烦的一件事,随着 Key 的数量增加会大幅增加整个系统的复杂性。正是这个问题,限制了租约模式的使用场景:小规模范围内的数据一致性。最典型的就是 ETCD 做的服务发现和配置中心的场景,它的 Key 基本只有个位数,多方通信也都是围绕少量数据。推而广之,就是公告啊,热搜排行榜、首页推荐,这类跟用户个性化关系不大的业务场景。
相关信息
引申:为什么只适合这些场景?