本篇主要内容为分布式系统

分布式系统

Go语言号称是互联网时代的C语言。现在的互联网系统已经不是以前的一个主机搞定一切的时代,互联网时代的后台服务由大量的分布式系统构成,任何单一后台服务器节点的故障并不会导致整个系统的停机。同时以阿里云、腾讯云为代表的云厂商崛起标志着云时代的到来,在云时代分布式编程将成为一个基本技能。而基于Go语言构建的Docker、K8s等系统推动了云时代的提前到来。

对于已经比较完善的分布式系统,我们会简单讲讲怎么通过使用它们来提高我们的工作效率。对于没有现成解决方案的系统,我们会按照自己的业务需求提出解决方案。

分布式系统概念

我们考虑两个时间尺度:进程消息传递延迟和进程内事件间隔,如果前者相对后者不可忽略,则这组进程就是一个分布式系统。

理解这个定义,需要理解几个重要的概念(形式化的定义总是这样,摊手):进程(process)、消息(message)和事件(event)。

进程就是一个负责干活的劳工,其干的活可以分解为多个步骤,每个步骤就是一个事件,消息便是劳工交流的方式。

这里面涉及到了计算机系统中最重的几种资源:计算(computational),存储(memory),以及沟通他们的网络(network)。

总结下,我们可以从另一个角度来对分布式系统进行描述:

对外,分布式系统表现为一个整体,基于总体的存储和计算能力,提供特定功能。

对内,分布式系统表现为一组个体,基于网络消息进行通信,分工合作。

而分布式系统的设计目标是,最大化整体资源利用率的同时,处理局部错误、保持对外可用性。

分布式系统的特点

在构建分布式系统时,在逻辑上要注意以下这些方面:

  1. 可扩展性:可扩展性是对分布式系统最本质的要求,即系统设计允许我们只通过增加机器来应对不断增长的外部需求。
  2. 容错性 \ 可用性:这是可扩展性所带来的一个副作用,即在系统规模不断变大之后,单个机器故障便会成为常态。系统需要自动处理这些故障,对外保持可用性。
  3. 并发性:由于没有全局时钟进行协调,分散的机器天然处在 “平行宇宙” 中。系统需要引导这些并发变为协作,以拆解并执行集群任务。
  4. 异构性(对内):系统需要处理进群内部不同硬件、不同操作系统、不同中间件的差异性,并且能够容纳新的异构组件加入系统。
  5. 透明性(对外):对外屏蔽系统复杂性,提供逻辑上的单一性。

类型

在组织分布式系统时,在物理上可以有以下几种类型:

  1. 主从架构(master-workers:有一个负责指挥的机器,其他机器负责干活,如 Hadoop。好处是设计和实现相对容易,坏处是单点瓶颈和故障。
  2. 点对点架构(peer-to-peer):所有机器逻辑等价。如亚马逊 Dynamo,好处是没有单点故障,坏处是机器协调不好做、一致性也不好保证。不过,如果系统是无状态的,则这种架构很合适。
  3. 多层架构(multi-tier):这是一种复合架构,实际中也最常用,比如今年来常说存储计算分离。每一层可以根据不同特点(IO 密集型、计算密集型)进行设计,甚至可以复用现有组件(云原生)。

优缺点

优点:

高可用、高吞吐、高可扩展性

  1. 无限扩展:只要设计的好,可以通过线性的增加机器资源来应对不断增长的需求。
  2. 低延迟:多地部署,将用户请求按地理路由到最近机房处理。
  3. 高可用容错:一部分机器坏掉,仍可以正常对外提供服务。

缺点:

最大的问题是复杂性。

  1. 数据的一致性。考虑到大量的机器故障:宕机、重启、关机,数据可能丢失、陈旧、出错,如何让系统容纳这些问题,对外保证数据的正确性,需要相当复杂的设计。
  2. 网络和通信故障。网络的不可靠,消息可能丢失、早到、迟到、Hang 住,这给机器间的协调带来了极大的复杂度。像 TCP 等网络基础协议,能解决部分问题,但更多的需要系统层面自己处理。更不用说,开放式网络上可能存在的消息伪造。
  3. 管理复杂度。机器数量到达一定数量级时,如何对他们进行有效监控、收集日志、负载均衡,都是很大挑战。
  4. 延迟。网络通信延迟要比机器内通信高出几个数量级,而组件越多、网络跳数越多,延迟便会更高,这些最终都会作用于系统对外服务质量上。

分布式存储系统

单机存储系统

单机存储引擎就是哈希表、B树等数据结构在机械磁盘、SSD等持久化介质上的实 现。单机存储系统是单机存储引擎的一种封装,对外提供文件、键值、表格或者关 系模型。单机存储系统的理论来源于关系数据库。数据库将一个或多个操作组成一 组,称作事务,事务必须满足原子性(Atomicity)、一致性(Consistency)、 隔离性(Isolation)以及持久性(Durability),简称为ACID特性。多个事务并 发执行时,数据库的并发控制管理器必须能够保证多个事务的执行结果不能破坏某种约定,如不能出现事务执行到一半的情况,不能读取到未提交的事务,等等。为 了保证持久性,对于数据库的每一个变化都要在磁盘上记录日志,当数据库系统突 然发生故障,重启后能够恢复到之前一致的状态。

大规模分布式存储系统的定义如下: “分布式存储系统是大量普通PC服务器通过Internet互联,对外作为一个整体提供 存储服务。”

分布式存储系统具有如下几个特性:

● 可扩展。分布式存储系统可以扩展到几百台甚至几千台的集群规模,而且,随着集 群规模的增长,系统整体性能表现为线性增长。

● 低成本。分布式存储系统的自动容错、自动负载均衡机制使其可以构建在普通PC机 之上。另外,线性扩展能力也使得增加、减少机器非常方便,可以实现自动运维。

● 高性能。无论是针对整个集群还是单台服务器,都要求分布式存储系统具备高性 能。

● 易用。分布式存储系统需要能够提供易用的对外接口,另外,也要求具备完善的监 控、运维工具,并能够方便地与其他系统集成,例如,从Hadoop云计算系统导入数 据。

分布式存储系统的挑战主要在于数据、状态信息的持久化,要求在自动迁移、自动容错、并发读写的过程中保证数据的一致性。

分布式存储涉及的技术主要来自两个 领域:分布式系统以及数据库,如下所示:

●数据分布:如何将数据分布到多台服务器才能够保证数据分布均匀?数据分布到多 台服务器后如何实现跨服务器读写操作?

●一致性:如何将数据的多个副本复制到多台服务器,即使在异常情况下,也能够保 证不同副本之间的数据一致性?

●容错:如何检测到服务器故障?如何自动将出现故障的服务器上的数据和服务迁移 到集群中其他服务器?

●负载均衡:新增服务器和集群正常运行过程中如何实现自动负载均衡?数据迁移的 过程中如何保证不影响已有服务?

●事务与并发控制:如何实现分布式事务?如何实现多版本并发控制?

●易用性:如何设计对外接口使得系统容易使用?如何设计监控系统并将系统的内部 状态以方便的形式暴露给运维人员?

●压缩/解压缩:如何根据数据的特点设计合理的压缩/解压缩算法?如何平衡压缩算 法节省的存储空间和消耗的CPU计算资源?

分布式存储系统挑战大,研发周期长,涉及的知识面广。一般来讲,工程师如果能 够深入理解分布式存储系统,理解其他互联网后台架构不会再有任何困难。

分布式存储面临的数据需求比较复杂,大致可以分为三类:

●非结构化数据:包括所有格式的办公文档、文本、图片、图像、音频和视频信息 等。

●结构化数据:一般存储在关系数据库中,可以用二维关系表结构来表示。结构化数 据的模式(Schema,包括属性、数据类型以及数据之间的联系)和内容是分开的, 数据的模式需要预先定义。

●半结构化数据:介于非结构化数据和结构化数据之间,HTML文档就属于半结构化 数据。它一般是自描述的,与结构化数据最大的区别在于,半结构化数据的模式结 构和内容混在一起,没有明显的区分,也不需要预先定义数据的模式结构。 不同的分布式存储系统适合处理不同类型的数据,

分布式存储系统分为四 类:分布式文件系统、分布式键值(Key-Value)系统、分布式表格系统和分布式 数据库。

分布式文件系统

互联网应用需要存储大量的图片、照片、视频等非结构化数据对象,这类数据以对 象的形式组织,对象之间没有关联,这样的数据一般称为Blob(Binary Large Object,二进制大对象)数据。

分布式文件系统用于存储Blob对象,典型的系统有Facebook Haystack以及 Taobao File System(TFS)。另外,分布式文件系统也常作为分布式表格系统以 及分布式数据库的底层存储,如谷歌的GFS(Google File System,存储大文件) 可以作为分布式表格系统Google Bigtable的底层存储,Amazon的EBS(Elastic Block Store,弹性块存储)系统可以作为分布式数据库(Amazon RDS)的底层存 储。

总体上看,分布式文件系统存储三种类型的数据:Blob对象、定长块以及大文件。 在系统实现层面,分布式文件系统内部按照数据块(chunk)来组织数据,每个数据 块的大小大致相同,每个数据块可以包含多个Blob对象或者定长块,一个大文件也 可以拆分为多个数据块,如图1-1所示。分布式文件系统将这些数据块分散到存储集 群,处理数据复制、一致性、负载均衡、容错等分布式系统难题,并将用户对Blob 对象、定长块以及大文件的操作映射为对底层数据块的操作。

分布式键值系统

分布式键值系统用于存储关系简单的半结构化数据,它只提供基于主键的 CRUD(Create/Read/Update/Delete)功能,即根据主键创建、读取、更新或者 删除一条键值记录。

典型的系统有Amazon Dynamo以及Taobao Tair。从数据结构的角度看,分布式键 值系统与传统的哈希表比较类似,不同的是,分布式键值系统支持将数据分布到集 群中的多个存储节点。分布式键值系统是分布式表格系统的一种简化实现,一般用作缓存,比如淘宝Tair以及Memcache。一致性哈希是分布式键值系统中常用的数据 分布技术,因其被Amazon DynamoDB系统使用而变得相当有名。

分布式表格系统

分布式表格系统用于存储关系较为复杂的半结构化数据,与分布式键值系统相比, 分布式表格系统不仅仅支持简单的CRUD操作,而且支持扫描某个主键范围。分布式 表格系统以表格为单位组织数据,每个表格包括很多行,通过主键标识一行,支持 根据主键的CRUD功能以及范围查找功能。

分布式表格系统借鉴了很多关系数据库的技术,例如支持某种程度上的事务,比如 单行事务,某个实体组(Entity Group,一个用户下的所有数据往往构成一个实体 组)下的多行事务。典型的系统包括Google Bigtable以及Megastore,Microsoft Azure Table Storage,Amazon DynamoDB等。与分布式数据库相比,分布式表格 系统主要支持针对单张表格的操作,不支持一些特别复杂的操作,比如多表关联, 多表联接,嵌套子查询;另外,在分布式表格系统中,同一个表格的多个数据行也 不要求包含相同类型的列,适合半结构化数据。分布式表格系统是一种很好的权 衡,这类系统可以做到超大规模,而且支持较多的功能,但实现往往比较复杂,而 且有一定的使用门槛。

分布式数据库

分布式数据库一般是从单机关系数据库扩展而来,用于存储结构化数据。分布式数 据库采用二维表格组织数据,提供SQL关系查询语言,支持多表关联,嵌套子查询等 复杂操作,并提供数据库事务以及并发控制。

典型的系统包括MySQL数据库分片(MySQL Sharding)集群,Amazon RDS以及Microsoft SQL Azure。分布式数据库支持的功能最为丰富,符合用户使用习惯, 但可扩展性往往受到限制。当然,这一点并不是绝对的。Google Spanner系统是一 个支持多数据中心的分布式数据库,它不仅支持丰富的关系数据库功能,还能扩展 到多个数据中心的成千上万台机器。除此之外,阿里巴巴OceanBase系统也是一个 支持自动扩展的分布式关系数据库。

关系数据库是目前为止最为成熟的存储技术,它的功能极其丰富,产生了商业的关 系数据库软件(例如Oracle,Microsoft SQL Server,IBM DB2,MySQL)以及上 层的工具及应用软件生态链。然而,关系数据库在可扩展性上面临着巨大的挑战。 传统关系数据库的事务以及二维关系模型很难高效地扩展到多个存储节点上,另 外,关系数据库对于要求高并发的应用在性能上优化空间较大。为了解决关系数据 库面临的可扩展性、高并发以及性能方面的问题,各种各样的非关系数据库风起云 涌,这类系统成为NoSQL系统,可以理解为“Not Only SQL”系统。NoSQL系统多 得让人眼花缭乱,每个系统都有自己的独到之处,适合解决某种特定的问题。这些 系统变化很快,本书不会尝试去探寻某种NoSQL系统的实现,而是从分布式存储技术 的角度探寻大规模存储系统背后的原理。

Raft算法

过去, Paxos一直是分布式协议的标准,但是Paxos难于理解,更难以实现,Google的分布式锁系统Chubby作为Paxos实现曾经遭遇到很多坑。

来自Stanford的新的分布式协议研究称为Raft,它是一个为真实世界应用建立的协议,主要注重协议的落地性和可理解性。

在了解Raft之前,我们先了解Consensus一致性这个概念,它是指多个服务器在状态达成一致,但是在一个分布式系统中,因为各种意外可能,有的服务器可能会崩溃或变得不可靠,它就不能和其他服务器达成一致状态。这样就需要一种Consensus协议,一致性协议是为了确保容错性,也就是即使系统中有一两个服务器当机,也不会影响其处理过程。

为了以容错方式达成一致,我们不可能要求所有服务器100%都达成一致状态,只要超过半数的大多数服务器达成一致就可以了,假设有N台服务器,N/2 +1 就超过半数,代表大多数了。

Paxos和Raft都是为了实现Consensus一致性这个目标,这个过程如同选举一样,参选者需要说服大多数选民(服务器)投票给他,一旦选定后就跟随其操作。Paxos和Raft的区别在于选举的具体过程不同。

在Raft中,任何时候一个服务器可以扮演下面角色之一:

  1. Leader: 处理所有客户端交互,日志复制等,一般一次只有一个Leader.
  2. Follower: 类似选民,完全被动
  3. Candidate候选人: 类似Proposer律师,可以被选为一个新的领导人。

Raft阶段分为两个,首先是选举过程,然后在选举出来的领导人带领进行正常操作,比如日志复制等。下面用图示展示这个过程:

  1. 任何一个服务器都可以成为一个候选者Candidate,它向其他服务器Follower发出要求选举自己的请求
  2. 其他服务器同意了,发出OK。注意如果在这个过程中,有一个Follower当机,没有收到请求选举的要求,因此候选者可以自己选自己,只要达到N/2 + 1 的大多数票,候选人还是可以成为Leader的。
  3. 这样这个候选者就成为了Leader领导人,它可以向选民也就是Follower们发出指令,比如进行日志复制。

以后通过心跳进行日志复制的通知。如果一旦这个Leader当机崩溃了,那么Follower中有一个成为候选者,发出邀票选举。Follower同意后,其成为Leader,继续承担日志复制等指导工作。

值得注意的是,整个选举过程是有一个时间限制的。Splite Vote是因为如果同时有两个候选人向大家邀票,这时通过类似加时赛来解决,两个候选者在一段timeout比如300ms互相不服气的等待以后,因为双方得到的票数是一样的,一半对一半,那么在300ms以后,再由这两个候选者发出邀票,这时同时的概率大大降低,那么首先发出邀票的的候选者得到了大多数同意,成为领导者Leader,而另外一个候选者后来发出邀票时,那些Follower选民已经投票给第一个候选者,不能再投票给它,它就成为落选者了,最后这个落选者也成为普通Follower一员了。

日志复制

下面以日志复制为例子说明Raft算法,假设Leader领导人已经选出,这时客户端发出增加一个日志的要求,比如日志是"sally"

Leader要求Followe遵从他的指令,都将这个新的日志内容追加到他们各自日志中

大多数follower服务器将日志写入磁盘文件后,确认追加成功,发出Commited Ok:

在下一个心跳heartbeat中,Leader会通知所有Follwer更新commited 项目。

对于每个新的日志记录,重复上述过程。

如果在这一过程中,发生了网络分区或者网络通信故障,使得Leader不能访问大多数Follwers了,那么Leader只能正常更新它能访问的那些Follower服务器,而大多数的服务器Follower因为没有了Leader,他们重新选举一个候选者作为Leader,然后这个Leader作为代表于外界打交道,如果外界要求其添加新的日志,这个新的Leader就按上述步骤通知大多数Followers,如果这时网络故障修复了,那么原先的Leader就变成Follower,在失联阶段这个老Leader的任何更新都不能算commit,都回滚,接受新的Leader的新的更新。

分布式id生成器

有时我们需要能够生成类似MySQL自增ID这样不断增大,同时又不会重复的id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒10w+。明星出轨时,会有大量热情的粉丝发微博以表心意,同样会在短时间内产生大量的消息。

在插入数据库之前,我们需要给这些消息、订单先打上一个ID,然后再插入到我们的数据库。对这个id的要求是希望其中能带有一些时间信息,这样即使我们后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序。

Twitter的snowflake算法是这种场景下的一个典型解法。

首先确定我们的数值是64位,int64类型,被划分为四部分,不含开头的第一个bit,因为这个bit是符号位。用41位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的id,然后再五位来表示机器的实例id,最后是12位的循环自增id(到达1111,1111,1111后会归0)。

这样的机制可以支持我们在同一台机器上,同一毫秒内产生2 ^ 12 = 4096条消息。一秒共409.6万条消息。从值域上来讲完全够用了。

数据中心加上实例id共有10位,可以支持我们每数据中心部署32台机器,所有数据中心共1024台实例。

表示timestamp的41位,可以支持我们使用69年。当然,我们的时间毫秒计数不会真的从1970年开始记,那样我们的系统跑到2039/9/7 23:47:35就不能用了,所以这里的timestamp只是相对于某个时间的增量,比如我们的系统上线是2018-08-01,那么我们可以把这个timestamp当作是从2018-08-01 00:00:00.000的偏移量。

worker_id分配

timestampdatacenter_idworker_idsequence_id这四个字段中,timestampsequence_id是由程序在运行期生成的。但datacenter_idworker_id需要我们在部署阶段就能够获取得到,并且一旦程序启动之后,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最终生成的id有冲突)。

一般不同数据中心的机器,会提供对应的获取数据中心id的API,所以datacenter_id我们可以在部署阶段轻松地获取到。而worker_id是我们逻辑上给机器分配的一个id,这个要怎么办呢?比较简单的想法是由能够提供这种自增id功能的工具来支持,比如MySQL

分布式锁

在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。

在分布式场景下,我们也需要这种“抢占”的逻辑,我们可以使用Redis提供的setnx命令

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/go-redis/redis"
)

func incr() {
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // no password set
        DB:       0,  // use default DB
    })

    var lockKey = "counter_lock"
    var counterKey = "counter"

    // lock
    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockSuccess, err := resp.Result()

    if err != nil || !lockSuccess {
        fmt.Println(err, "lock result: ", lockSuccess)
        return
    }

    // counter ++
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            // log err
            println("set value error!")
        }
    }
    println("current counter is ", cntValue)

    delResp := client.Del(lockKey)
    unlockSuccess, err := delResp.Result()
    if err == nil && unlockSuccess > 0 {
        println("unlock success!")
    } else {
        println("unlock failed", err)
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
}

通过代码和执行结果可以看到,我们远程调用setnx运行流程上和单机的trylock非常相似,如果获取锁失败,那么相关的任务逻辑就不应该继续向前执行。

setnx很适合在高并发场景下,用来争抢一些“唯一”的资源。比如交易撮合系统中卖家发起订单,而多个买家会对其进行并发争抢。这种场景我们没有办法依赖具体的时间来判断先后,因为不管是用户设备的时间,还是分布式场景下的各台机器的时间,都是没有办法在合并后保证正确的时序的。哪怕是我们同一个机房的集群,不同的机器的系统时间可能也会有细微的差别。

ZooKeeper

ZooKeeper是Hadoop Ecosystem中非常重要的组件,它的主要功能是为分布式系统提供一致性协调(Coordination)服务,与之对应的Google的类似服务叫Chubby。 分布式环境中大多数服务是允许部分失败,也允许数据不一致,但有些最基础的服务是需要高可靠性,高一致性的,这些服务是其他分布式服务运转的基础,比如naming service、分布式lock等,这些分布式的基础服务有以下要求:

  1. 高可用性
  2. 高一致性
  3. 高性能 对于这种有些挑战CAP原则 的服务该如何设计,是一个挑战,也是一个不错的研究课题,Apache的ZooKeeper也许给了我们一个不错的答案。ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务, 它暴露了一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。
package main

import (
    "time"
    "github.com/samuel/go-zookeeper/zk"
)

func main() {
    c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
    if err != nil {
        panic(err)
    }
    l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
    err = l.Lock()
    if err != nil {
        panic(err)
    }
    println("lock succ, do your business logic")

    time.Sleep(time.Second * 10)

    // do some thing
    l.Unlock()
    println("unlock succ, finish business logic")
}

基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的mutex.Lock很相似。

其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是/lock节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。

这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于粗粒度的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。

Zookeeper原理

Zookeeper的保证

l 顺序性,client的updates请求都会根据它发出的顺序被顺序的处理;

l 原子性, 一个update操作要么成功要么失败,没有其他可能的结果;

l 一致的镜像,client不论连接到哪个server,展示给它都是同一个视图;

l 可靠性,一旦一个update被应用就被持久化了,除非另一个update请求更新了当前值

l 实时性,对于每个client它的系统视图都是最新的

Zookeeper中的角色

领导者(Leader) : 领导者不接受client的请求,负责进行投票的发起和决议,最终更新状态。

跟随者(Follower): Follower用于接收客户请求并返回客户结果。参与Leader发起的投票。

观察者(observer): Oberserver可以接收客户端连接,将写请求转发给leader节点。但是Observer不参加投票过程,只是同步leader的状态。Observer为系统扩展提供了一种方法。

学习者 ( Learner ) : 和leader进行状态同步的server统称Learner,上述Follower和Observer都是Learner。

ZooKeeper集群

通常Zookeeper由2n+1台servers组成,每个server都知道彼此的存在。每个server都维护的内存状态镜像以及持久化存储的事务日志和快照。对于2n+1台server,只要有n+1台(大多数)server可用,整个系统保持可用。

系统启动时,集群中的server会选举出一台server为Leader,其它的就作为follower(这里先不考虑 observer角色)。接着由follower来服务client的请求,对于不改变系统一致性状态的读操作,由follower的本地内存数据库直接 给client返回结果;对于会改变系统状态的更新操作,则交由Leader进行提议投票,超过半数通过后返回结果给client。

ZooKeeper工作原理

Zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两 种模式,它们分别是恢复模式和广播模式。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和 leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。

一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个server 加入zookeeper服务中,它会在恢复模式下启动,发现leader,并和leader进行状态同步。待到同步结束,它也参与消息广播。 Zookeeper服务一直维持在Broadcast状态,直到leader崩溃了或者leader失去了大部分的followers支持。

Broadcast模式极其类似于分布式事务中的2pc(two-phrase commit 两阶段提交):即leader提起一个决议,由followers进行投票,leader对投票结果进行计算决定是否通过该决议,如果通过执行该决议(事务),否则什么也不做。

广播模式需要保证proposal被按顺序处理,因此zk采用了递增的事务id号(zxid)来保证。所有的提议(proposal) 都在被提出的时候加上了zxid。实现中zxid是一个64为的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被 选出来,它都会有一个新的epoch。低32位是个递增计数。

当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的server都恢复到一个正确的状态。

首先看一下选举的过程,zk的实现中用了基于paxos算法(主要是fastpaxos)的实现。具体如下:

1.每个Server启动以后都询问其它的Server它要投票给谁。

2.对于其他server的询问,server每次根据自己的状态都回复自己推荐的leader的id和上一次处理事务的zxid(系统启动时每个server都会推荐自己)

3.收到所有Server回复以后,就计算出zxid最大的哪个Server,并将这个Server相关信息设置成下一次要投票的Server。

4.计算这过程中获得票数最多的的sever为获胜者,如果获胜者的票数超过半数,则改server被选为leader。否则,继续这个过程,直到leader被选举出来。

此外恢复模式下,如果是重新刚从崩溃状态恢复的或者刚启动的的server还会从磁盘快照中恢复数据和会话信息。(zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复)

选完leader以后,zk就进入状态同步过程。

1.leader就会开始等待server连接

2.Follower连接leader,将最大的zxid发送给leader

3.Leader根据follower的zxid确定同步点

4.完成同步后通知follower 已经成为uptodate状态

5.Follower收到uptodate消息后,又可以重新接受client的请求进行服务了。

Zookeeper工作流程

主线程的工作:

  1. 刚开始时各个Server处于一个平等的状态peer
  2. 主线程加载配置后启动。
  3. 主线程启动QuorumPeer线程,该线程负责管理多数协议(Quorum),并根据表决结果进行角色的状态转换。
  4. 然后主线程等待QuorumPeer线程。

QuorumPeer线程

  1. 首先会从磁盘恢复zkdatabase(内存数据库),并进行快照回复。
  2. 然后启动server的通信线程,准备接收client的请求。
  3. 紧接着该线程进行选举leader准备,选择选举算法,启动response线程(根据自身状态)向其他server回复推荐的leaer。
  4. 刚开始的时候server都处于looking状态,进行选举根据选举结果设置自己的状态和角色。

QuorumPeer有几种状态

  1. Looking: 寻找状态,这个状态不知道谁是leader,会发起leader选举
  2. Observing: 观察状态,这时候observer会观察leader是否有改变,然后同步leader的状态
  3. Following: 跟随状态,接收leader的proposal ,进行投票。并和leader进行状态同步
  4. Leading: 领导状态,对Follower的投票进行决议,将状态和follower进行同步

当一个Server发现选举的结果自己是Leader把自己的状态改成Leading,如果Server推荐了其他人为Server它 将自己的状态改成Following。做Leader的server如果发现拥有的follower少于半数时,它重新进入looking状态,重新进行 leader选举过程。(Observing状态是根据配置设置的)

Leader主线程

1.首先leader开始恢复数据和清除session

启动zk实例,建立请求处理链(Leader的请求处理 链):PrepRequestProcessor->ProposalRequestProcessor->CommitProcessor->Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor

2.得到一个新的epoch,标识一个新的leader , 并获得最大zxid(方便进行数据同步)

3.建立一个学习者接受线程(来接受新的followers的连接,follower连接后确定followers的zxvid号,来确定是需要对follower进行什么同步措施,比如是差异同步(diff),还是截断(truncate)同步,还是快照同步)

4.向follower建立一个握手过程leader->follower NEWLEADER消息,并等待直到多数server发送了ack

5.Leader不断的查看已经同步了的follower数量,如果同步数量少于半数,则回到looking状态重新进行leaderElection过程,否则继续step5.

Follower的工作流程

1.启动zk实例,建立请求处理链:FollowerRequestProcessor->CommitProcessor->FinalProcessor

2.follower首先会连接leader,并将zxid和id发给leader

3.接收NEWLEADER消息,完成握手过程。

4.同leader进行状态同步

5.完成同步后,follower可以接收client的连接

6.接收到client的请求,根据请求类型

l 对于写操作, FollowerRequestProcessor会将该操作作为LEADER.REQEST发给LEADER由LEADER发起投票。

l 对于读操作,则通过请求处理链的最后一环FinalProcessor将结果返回给客户端

对于observer的流程不再赘述,observer流程和Follower的唯一不同的地方就是observer不会参加leader发起的投票。

learnerCnxAcceptor线程

1.该线程监听Learner的连接

2.接受Learner请求,并为每个Learner创建一个LearnerHandler来服务

LearnerHandler线程的服务流程

1.检查server来的第一个包是否为follower.info或者observer.info,如果不是则无法建立握手。

2.得到Learner的zxvid,对比自身的zxvid,确定同步点

3.和Learner建立第二次握手,向Learner发送NEWLEADER消息

4.与server进行数据同步。

5.同步结束,知会server同步已经ok,可以接收client的请求。

6.不断读取follower消息判断消息类型

i. 如果是LEADER.ACK,记录follower的ack消息,超过半数ack,将proposal提交(Commit)

ii. 如果是LEADER.PING,则维持session(延长session失效时间)

iii. 如果是LEADER.REQEST,则将request放入请求链进行处理–Leader写请求发起proposal,然后根据follower回复的结 果来确定是否commit的。最后由FinallRequestProcessor来实际进行持久化,并回复信息给相应的response给server

Zookeeper的拓展

为了提高吞吐量通常我们只要增加服务器到Zookeeper集群中。但是当服务器增加到一定程度,会导致投票的压力增大从而使得吞吐量降低。因此我们引出了一个角色:Observer。

Observers 的需求源于 ZooKeeper follower服务器在上述工作流程中实际扮演了两个角色。它们从客户端接受连接与操作请求,之后对操作结果进行投票。这两个职能在 ZooKeeper集群扩展的时候彼此制约。如果我们希望增加 ZooKeeper 集群服务的客户数量(我们经常考虑到有上万个客户端的情况),那么我们必须增加服务器的数量,来支持这么多的客户端。然而,从一致性协议的描述可以看到, 增加服务器的数量增加了对协议的投票部分的压力。领导节点必须等待集群中过半数的服务器响应投票。于是,节点的增加使得部分计算机运行较慢,从而拖慢整个 投票过程的可能性也随之提高,投票操作的会随之下降。这正是我们在实际操作中看到的问题——随着 ZooKeeper 集群变大,投票操作的吞吐量会下降。

所以需要增加客户节点数量的期望和我们希望保持较好吞吐性能的期望间进行权衡。要打破这一耦合关系,引入了不参与投票的服务器,称为 Observers。 Observers 可以接受客户端的连接,将写请求转发给领导节点。但是,领导节点不会要求 Observers 参加投票。相反,Observers 不参与投票过程,仅仅和其他服务节点一起得到投票结果。

这个简单的扩展给 ZooKeeper 的可伸缩性带来了全新的镜像。我们现在可以加入很多 Observers 节点,而无须担心严重影响写吞吐量。规模伸缩并非无懈可击——协议中的一歩(通知阶段)仍然与服务器的数量呈线性关系。但是,这里的穿行开销非常低。因此 可以认为在通知服务器阶段的开销无法成为主要瓶颈。

此外Observer还可以成为特定场景下,广域网部署的一种方案。原因有三点:1.为了获得更好的读性能,需要让客户端足够近,但如 果将投票服务器分布在两个数据中心,投票的延迟太大会大幅降低吞吐,是不可取的。因此希望能够不影响投票过程,将投票服务器放在同一个IDC进行部 署,Observer可以跨IDC部署。2. 投票过程中,Observer和leader之间的消息、要远小于投票服务器和server的消息,这样远程部署对带宽要求就较小。3.由于 Observers即使失效也不会影响到投票集群,这样如果数据中心间链路发生故障,不会影响到服务本身的可用性。这种故障的发生概率要远高于一个数据中 心中机架间的连接的故障概率,所以不依赖于这种链路是个优点。

etcd

etcd是分布式系统中,功能上与ZooKeeper类似的组件,这两年越来越火了。上面基于ZooKeeper我们实现了分布式阻塞锁,基于etcd,也可以实现类似的功能

package main

import (
    "log"

    "github.com/zieckey/etcdsync"
)

func main() {
    m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
    if m == nil || err != nil {
        log.Printf("etcdsync.New failed")
        return
    }
    err = m.Lock()
    if err != nil {
        log.Printf("etcdsync.Lock failed")
        return
    }

    log.Printf("etcdsync.Lock OK")
    log.Printf("Get the lock. Do something here.")

    err = m.Unlock()
    if err != nil {
        log.Printf("etcdsync.Unlock failed")
    } else {
        log.Printf("etcdsync.Unlock OK")
    }
}

etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:

  1. 先检查/lock路径下是否有值,如果有值,说明锁已经被别人抢了
  2. 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3
  3. watch /lock下的事件,此时陷入阻塞
  4. /lock路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。

值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API

选择合适的锁

业务还在单机就可以搞定的量级时,那么按照需求使用任意的单机锁方案就可以。

如果发展到了分布式服务阶段,但业务规模不大,qps很小的情况下,使用哪种锁方案都差不多。如果公司内已有可以使用的ZooKeeper、etcd或者Redis集群,那么就尽量在不引入新的技术栈的情况下满足业务需求。

业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的setnx的简单锁。

对锁数据的可靠性要求极高的话,那只能使用etcd或者ZooKeeper这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的etcd或ZooKeeper集群可以承受得住实际的业务请求压力。需要注意的是,etcd和Zookeeper集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入proxy,没有proxy那就需要业务去根据某个业务id来做分片。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。

在选择具体的方案时,还是需要多加思考,对风险早做预估。

Hystrix

熔断器

https://github.com/Netflix/Hystrix

如果你觉得我的文章对你有帮助的话,希望可以推荐和交流一下。欢迎關注和 Star 本博客或者关注我的 Github