manout's blog

Something about me

redis 的在启动后创建了一些循环事件来监听 TCP 端口和 Unix 的 Sockets, 从而使 redis 服务器可以接受新的连接,之后就可接受来自 client 的请求和命令。
中间的过程主要分为以下几步

处理新连接

redis 在 initServer() 函数中创建循环事件调用了 acceptTcpHandleracceptUnixHandler 函数来处理接受到的 TCP 连接和 Unix 的 Sockets 连接。这两个函数又调用了 acceptCommonHandler, 在这个函数中又调用了 createClient 函数创建了一个新的 client 对象,用来表示一个新的客户端连接

createClient 的工作主要如下

首先为变量 c 分配了内存,接着将 Socket 连接置为非阻塞状态,并且设置了 TCP 无延迟。然后创建了 File 循环事件(asCreateFileEvent) 来调用 readQueryFromClient. 新建的客户端默认连接的是服务器的第一个数据库(编码为 0), 最后设置好客户端的各种属性和状态.

读一个客户端的命令

readQueryFromClient 函数就是用来从客户端读取命令的,具体实现如下

Redis 会先将命令读入缓冲区,一次最多读取的大小是 PROTO_IOBUF_LEN(1024 x 16) bit. 然后调用 processInputBufferAndReplicate 函数, 来处理缓冲区中的数据,如果客户端时 master(主从同步过程), 那么 Redis 就会计算前后缓冲区的不同部分,以确定从节点接受了多少数据。
processInputBufferAndReplicate 函数会处理客户端向服务器发送命令和主节点向从节点发送命令这两种情况,不过最后都需要调用 processInputBuffer 函数.

processInputBufferAndReplicate 函数会先判断客户端是否正常,如果出现连接中断或者客户端阻塞等情况,就会立即停止命令。然后根据读取的请求生成 Redis 可以执行党的命令(包括参数). 不同的请求类型分别调用 processInlineBufferprocessMultbulkBuffer 函数,生成好命令之后,交给 procssCommand 执行,如果返回 C_OK 则重置客户端,等待下一个命令。如果返回的是 C_ERR, 客户端就会销毁。(比如执行 QUIT 命令

processCommand 函数会从 Redis 启动时加载的命令表中查找命令,然后检查命令的执行权限。

如果是 cluster, 这是会判断 key 是否属于当前的 master, 不属于返回重定向信息。

如果内存不够用,这里也需要判断一下是够有可以释放的内存,如果没有,就不能执行命令,返回错误信息。

接下来判断一些不能接受写命令的情况

  • 服务器不能进行持久化
  • 作为 master, 没有足够的可用的 slave
  • 此服务器为只读的 slave, 只有它的 master 可以接受命令

在订阅模式中,只能接受指定的命令

  • (P) SUBSCRIBE
  • (P) UNSUBSCRIBE
  • PING
  • QUIT

当 slave 和 master 失联时,只能接受有 flag t 的命令,例如 INFO, SLAVEOF

如果命令没有 CMD_LOADING 标志,并且当前服务器正在加载数据,则不能接受此命令

对 lua 脚本长度进行限制

当进行完上述的各种条件判断后,才真正开始调用 call 函数执行命令

执行命令并返回

call 函数的参数是 client 类型,取出 cmd 成员进行执行.

1
2
3
4
5
6
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime() - start;
dirty = server.dirty - dirty;
if (dirty < 0) dirty = 0;

如果是写命令,就是在服务器上产生脏数据,服务器需要标记下内存中的某些有了改变。这对于 Redis 的持久化来说非常重要,它可以知道这个命令影响了多少个 key. 命令执行完毕后并没有结束,call 函数还会进行一些其他操作。例如记录日志,写 AOF 文件,向从节点同步命令等。

摘要

从架构层面和具体实现层面分析了 kafka 如何实现高性能

架构层面

partition 实现并行处理

kafka 是基于订阅-发布的消息系统,无论是发布还是订阅,都需要指定 topic, topic 只是一个逻辑上的概念。每个 topic 都包含一个或者或者多个 partition, 不同 partition 可位于不同节点。同时 partition 在物理上对应一个本地文件夹,每个 Partition 包含一个或多个 Segment, 每个 Segment 包含一个数据文件和一个与之对应的索引文件。
Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可以通过配置让同一节点上的不同 Partition 置于不同 disk drive 上,从而实现磁盘的并行处理,充分发挥多磁盘的优势。

注: 虽然一个 Partition 可以分为多个 Segment, 但是 Segment 并不能提供并行处理。

常用数据复制及一致性方案

master-slave

  • RDBMS 的读写分离即为典型的 Master-Slave 方案
  • 同步复制可保证强一致性但会影响可用性
  • 异步复制可提供高可用性但是会降低一致性

WNR

  • 主要用于去中心化的分布式系统中。
  • N 代表总副本数,W 代表每次写操作要保证的最少写成功的副本数,R 代表每次读至少要读取的副本数
  • 当 W + R > N 时,可保证每次读取的数据至少有一个副本拥有最新的数据
  • 多个写皂搓的顺序难以保证,可能导致多副本间的写操作顺序不一致。

Paxos 及其变种

  • Google 的 Chubby, Zookeeper 的原子广播协议

基于 ISR 的数据复制方案

kafka 的数据复制是以 Partition 为单位的。而多个备份间的数据复制,通过 Follower 向 Leader 拉取数据完成。类似于 Master-Slave 方案,但是 Kafka 既不是完全的同步复制,也不完全的异步复制,而是基于 ISR 的动态复制方案。

ISR 由 Leader 动态维护。如果 Follower 不能跟上 Leader, 他会被 Leader 从 ISR 中移除,待它又重新跟上 Leader 后,会被 Leader 再次加入 ISR 中。每次改变 ISR 后,Leader 都会将最新的 ISR 持久化到 Zookeeper 中。

0.8.* 版本,如果 Follower 在 replica.lag.time.max.ms 时间内未向 Leader 发送 Fetch 请求(也即数据复制请求), 则 Leader 会将其从 ISR 中移除。如果某 Follower 持续向 Leader 发送 Fetch 请求,但是与 Leader 的数据差距在 replica.lag.max.messages 以上,也会被 Leader 从 ISR 中移除.

Leader 并不是等到前一条消息被 Commit 才接受后一条消息。Leader 按顺序接受大量消息,最新的一条消息的 offset 被记为 High Watermark. 只有被 ISR 中所有 follower 都复制过去的消息才会 commit, Consumer 只能消费被 Commit 的消息。由于 Follower 的复制时严格按照书序,所以被 commit 的消息之前的消息肯定也已经被 Commit 过。换句话,High Watermark 标记的是 Leader 所保存的最新消息的 offet, commit offset 标记的是最新的可被消费(已同步到 ISR 中的 Follower)消息。而 Leader 对数据的接受与 Follower 对数据的复制是异步进行的,因此会出现 Commit Offset 与 High Watermark 存在一定的情况。

具体实现层面

kafka 在读写上的优化

磁盘顺序写

kafka 在将消息写入磁盘时全是顺序写操作,目前大部分磁盘还是机械结构,顺序写要比随机写的效率高很多,避免了大量缓慢的机械运动。
过于频繁的小 I/O 操作会拖慢速度,所以 kafka 会将一批次消息打包到一起批量写回磁盘。

充分利用 page cache

通过 MMAP, 利用 page cache, 这是 os 级别的缓存而不是应用级别的,所以 kafka 重启后仍然可用。
读操作可直接在 page cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 page cache) 交换数据.

支持多 Disk Drive

Broker 的 log.dirs 配置项,允许配置多个文件夹。如果机器上有多个 Disk Drive, 可将不同的 Disk 挂载到不同的目录,然后将这些目录都配置到 log.dirs 里,kafka 会尽可能将不同的 Partition 分配到不同的目录,也即不同的 Disk 上面,充分利用多 Disk 的优势.

零拷贝

当需要网络传输日志时,比如传输持久性日志块,常规的接口从传输需要四次拷贝。
在使用 MMAP 时,可以使用 Linux 提供的传输接口 sendfile 系统调用,直接从页缓存复制到 socket 中进行网络传输。
传统的套接字发送接口中间需要发生四次数据拷贝。

减少网络开销

通过以下几种手段减少网络开销

  • 批处理
  • 数据压缩降低网络负载
  • 高效的序列化方式

转自知乎

zookeeper 在分布式集群中的作用

数据发布与订阅(配置中心)

发布与订阅模型,即配置中心,也就是讲发布者将数据发布到 zookeeper 节点上,供订阅者动态获取数据,实现配置的集中式管理和动态更新。
例如全局的配置信息,服务框架的地址列表。

负载均衡

软件负载均衡,最典型的是消息中间件的生产,消费者负载均衡

命名服务(Naming Service)

常见的是发布者将自己的地址列表写到 zookeeper 的节点,然后订阅者可以从固定名称的节点获取地址列表,链接到发布者进行相关通信

分布式通知/协调

这个利用的是 zookeeper 的 watch 注册和异步通知机制,能够很好的实现分布式环境中不同系统间的通知与协调,实现对数据变更的实时处理

集群管理与 Master 选举

集群管理与 Master 选举

分布式锁

分布式锁,这个主要得益于 zookeeper 数据的强一致性,利用的是临时节点。锁服务分为两类,一个是独占锁,另一个是控制时序

独占,是指所有的客户端都来获取这把锁,最终只有一个能获取到,用的是临时节点

控制时序,所有来获取锁的客户端,都会被安排得到锁,只不过要有个顺序,实际上是某个节点下的临时顺序子节点来实现的

分布式队列

一种是 FIFO, 这个就是使用临时顺序节点实现的,和分布式锁服务控制时序一样。

第二种是等待队列的成员聚齐之后的才能同意按序进行。实际上,是在队列的节点里首先创建一个 /queue/num 节点,并且赋值队列的大小。这样我们可以通过监控队列节点子节点的变动来感知队列是否已满或者条件已经满足执行的需要。这种,应用场景是有条件执行的任务,条件齐备了之后任务才能执行。

kafka 使用 zookeeper 实现的服务类型

配置管理

Topic 的配置之所以能动态更新就是基于 zookeeper 做了一个动态全局配置管理

负载均衡

基于 zookeeper 的消费者,实现了该特性,动态的感知分区变动,将负载使用既定策略分不到消费者身上

命名服务

Broker 将 advertised.portadvertised.host.name, 这两个配置发布到 zookeeper 上的 zookeeper 的节点上 /brokers/ids/BrokerId(broker.id), 这个是供生产者,消费者,其它 Broker 跟其建立连接用户的。

分布式通知

比如分区增加,topic 变动,Broker 上线下线等均是基于 zookeeper 来实现的分布式通知

集群管理和 master 选举

可以通过命令行,对 kafka 集群上的 topic partition 分布,进行迁移管理,也可以对 partition leader 选举进行干预

Master 选举,要说有也是违反常规,常规的 master 选举,是基于临时顺序节点来实现的,序列号最小的作为 master. 而 kafka 的 Controller 的选举是基于临时节点来实现的,临时节点创建成功的成为 Controller, 更像一个独占锁服务.

分布式锁

独占锁,用于 Controller 的选举.

kafka 事务

kafka 事务机制的实现主要是为了支持

  • Exactly once 刚好一次语义
  • 操作的原子性
  • 有状态操作的原子性

在 kafka 0.11.0.0 之前的版本中只支持 At least OnceAt Most Once 语义,尚不支持 Exactly Once 语义

操作的原子性

操作的原子性是指,多个操作要么全部成功要么全部失败,不存在部分成功部分失败的可能。
实现原子性操作的意义在于

  • 操作结果更可控,有助于提升数据一致性
  • 便于故障恢复。因为操作时原子的,从故障中恢复时只需要重试该操作(原操作失败) 或者直接跳过该操作(该操作成功), 而不需要记录中间状态,更不需要针对中间状态作特殊处理

实现事务机制的几个阶段

幂等性发送

producer 有其特有的 Producer ID(PID) 和 Sequence Number, PID 唯一且透明。
对于每个 PID, 该 producer 发送数据的每个 <Topic, Partition> 都对应一个从 0 开始单调递增的 Sequence Number.

类似地, Broker 端也会为每个 <PID, Topic, Partition> 维护一个序号,并且每次 commit 一条消息时将对应序号递增。对于接受的每条消息,如果其序号比 Broker 维护的序号(即最后一次 commit 时的消息的序号) 大一, 则 Broker 会接受它,否则将其丢弃

  • 如果消息序号比 Broker 维护的序号大 1 以上,说明中间有数据尚未写入,即乱序,此时 Broker 拒绝该消息,producer 抛出 InvalidSequenceNumber
  • 如果消息序号小于等于 Broker 维护的序号,说明该消息已经被保存,即重复消息,Broker 丢弃该消息,Producer 抛出 DuplicateSequenceNumber

事务性保证

事务保证可以使得应用程序将生产数据和消费数据当做一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或者消费跨多个 <Topic, Partition>.
另外有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变的)唯一的 ID, 也即 Transaction ID. Transaction ID 与 PID 可能一一对应。区别在于 Transaction ID 由用户提供,而 PID 对用户透明。
另外,为了保证新的 Producer 启动后,旧的具有相同 Transaction ID 的 Producer 即失效,每次 Producer 通过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 epoch. 由于旧的 Producer 的 epoch 比新的 Producer 的 epoch 小,kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。

有了 Transaction ID 后,Kafka 可保证

  • 跨 Session 的数据幂等发送。当具有相同的 Transaction ID 的新的 Producer 实例被创建且工作时,旧的且拥有相同 Transaction ID 的 Producer 将不再工作.
  • 跨 Session 的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么 Commit 要么 Abort, 使得新实例从一个正常状态开始恢复

完整事务过程

  • 找到 Transaction Coordinator
  • 获取 PID
  • 开启事务
  • Consume-Transform-Produce
  • Commit 或者 Abort 事情

本文转自技术世界, 原文链接

摘要

kafka 在 0.8 以前的版本中,并不提供 High Availablity 机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 partitions 都无法继续提供服务。若该 Broker 不能恢复,或者磁盘故障,则其数据将丢失。

kafka 从 0.8 版本开始提供的 High Availability 主要包含两方面

  • Data Replication
  • Leader Election

Data Repliction

kafka 的 data repliction 需要解决如下问题

  • 怎样 propagate 消息
  • 在想 producer 发送 ACK 前需要保证有多少个 Replica 已经收到该消息
  • 怎样处理某个 Replica 不工作的情况
  • 怎样处理 Failed Replica 恢复回来的情况

replica

引入 data repliction 后,一个 partitions 可以多个 replica。
如果这些 replica 在一个 broker 上,那么当这个 broker 宕机时,这个 partitions 仍然不可用,所以这些 replica 会分布在多个 broker 上.

propagate 信息

leader

这些 replica 通过 leader election 选举 leader, 其余为 follower. leader 承载 producer 的 push 请求,并将消息写入本地 log, 其他 follower 从其 leader 中 pull 数据.
leader 会维持一个与其基本保持同步的 Replica 列表,该列表成为 ISR(In-Sync Replica), 每个 Partitions 都会有一个 ISR, 并由 leader 动态维护

follower

follower 在 pull 到消息并写入 log 后,向 leader 发送 ACK.

producer commit

在 ISR 列表中的 replica 都已经向 leader 发送 ACK 后,leader 则认为该 message 成功 commit, leader 将增加 HW(high watermark, 该 offset 前 record 认为已经备份) 并向 producer 发送 ACK.
为了提高性能,其他 follower 是在收到消息后立即发送 ACK, 而不是等到写入 log 后,因此已经 commit, leader 只能保证目前存于其他 follower 的内存中,而不是持久化到磁盘中,所以也就不能保证该消息一定能被消费,但是这种场景属于极端场景,比较少见。
Consumer 读消息也是从 leader 读取,只有被 commit 的消息(offset 低于 HW 的消息), 才会暴露给 Consumer.

ACK 前需要保证有多少备份

kafka 处理失败需要明确定义一个 Broker 是否活着,对于 Kafka 而言,Kafka 存活包含两个条件,一是它必须维护与 Zookeeper 的 Session(通过 Zookeeper 的 HeartBeat 机制来实现). 二是 Follower 必须能够及时拉取 Leader 的消息,不能落后太多。

Leader 会维护一个 ISR 列表. 如果一个 follower 宕机,或者落后太多,Leader 将把他从 ISR 中移除。这里的落后太多指 Follower 复制的消息落后于 Leader 后的条数超过预定值(replica.lag.max.messages, 默认为 4000), 或者 Follower 超过一定时间(replica.lag.time.max.ms, 默认为 1000) 未向 Leader 发送 fetch 请求。

kafka 的复制机制既不是完全的同步复制,也不是单纯额异步复制。事实上,同步复制要求所有能工作的 Follower 都要复制完,这条消息才会被认为 commit, 这种复制方式会极大地影响吞吐量。而异步复制方式下,Follower 异步的从 Leader 复制数据,数据只要被 Leader 写入 log 就认为已经 commit, 这种情况下如果 follower 都复制完都落后于 leader, 而如果 Leader 突然宕机,则会丢失数据。而 Kafka 这种使用 ISR 的方式则很好的均衡了数据不丢失以及吞吐量。Follower 可以批量的从 Leader 复制数据,这样极大地提高复制性能(批量写磁盘), 极大减少了 Follower 与 Leader 的差距.

Leader Election

当 leader 宕机后,如何在 follower 中选举出新的 leader.
一个基本的原则就是,如果 leader 不再了,新的 leader 必须有原来的 leader commit 过的所有消息。

kafka 在 ZooKeeper 中动态维护了一个 ISR(In-sync replics), 这个 ISR 里的所有 Replica 都跟上了 leader, 只有 ISR 里的成员才有被选为 Leader 的可能。在这种模式下,对于 f + 1 个 Replica, 一个 Partitions 能在保证不丢失已经 commit 的消息的前提下容忍 f 个 Replica 的失败。
为了容忍 f 个 Replica 的失败,Majority Vote 和 ISR 在 commit 前需要等待的 Replica 数量是一样的,但是 ISR 需要的总的 Replica 的个数几乎是 Majority Vote 的一半。

ISR 中所有 Replica 都宕机

当 ISR 中至少有一个 Replica 时,kafka 保证 commit 的数据不丢失,但是如果某个 partition 的所有 Replica 都宕机了,就无法保证数据不丢失了。这种情况下有两种可行方案

  • 等待 ISR 中的任意一个 Replica 恢复,并将其选为 Leader
  • 选择第一个恢复的 Replica(不一定在 ISR 中) 作为 Leader

这就需要在可用性和一致性当中做出一个简单的折衷。如果一定要等待 ISR 中的 Replica 恢复,那不可用的时间可能会比较长,而且如果这个 Partitions 的 ISR 中的所有 Replica 都无法活过来了,或者数据都丢失了,这个 Partition 将永远不可用。

kafka 选择第二种方式处理这种情况,在未来的版本中,将支持通过配置选择两种方式中的一种。

如何选举 leader

所有的 follower 在 zookeeper 上设置一个 watch, 一旦 leader 宕机,其对应的 ephemeral znode 自动删除,此时所有 follower 都尝试创建该节点,创建成功者(zookeeper 保证只有一个能创建成功) 即是新的 Leader, 其它 Replica 即为 Follower.
但是该方法会有 3 个问题

  • split-brain
    由 zookeeper 的特性引起,虽然 Zookeeper 能保证 Watch 按顺序触发,但并不能保证同一时刻所有 Replica 看到状态的是一样的,这就可能造成不同的 Replica 的响应不一致
  • herd effect
    如果宕机的那个 broker 上的 partition 较多,会造成多个 watch 被触发,造成集群内大量的调整
  • Zookeeper
    Zookeeper 负载过重,每个 Replica 都要为此在 Zookeeper 上注册一个 Watch, 当集群规模增加到几千个 Partition 时 Zookeeper 负载会过重

在 kafka 0.8.* 的 leader Election 方案解决上述问题,他在所有 broker 中选出一个 controller, 所有 Partition 的 Leader 都由 Controller 决定。Controller 会将 Leader 的改变直接通过 RPC 的方式(比 Zookeeper queue 更高效) 通知需为此做出相应的 broker, 同时 controller 也负责增删 Topic 以及 Replica 的重新分配。

使用 kafka consumer 要手动设置 offset 就需要使用 kafka low level consumer API, 从而更好的控制消费, 比如如下场景

  • 同一条消息读多次
  • 只读取某个 topic 的部分 partition
  • 管理事务,从而确保每条消息被处理一次,且仅被处理一次

与 consumer Group 相比,Low Level Consumer 要求用户做大量的额外工作

  • 必须在应用程序中跟踪 offset, 从而确定下一条应该消费哪条消息
  • 应用程序需要通过程序获知每个 Partition 的 Leader 是谁
  • 必须处理 Leader 的变化

使用 Low Level Consumer 的一般流程如下

  • 查找到一个活着的 Broker, 并且找出每个 Partition 的 Leader
  • 找出每个 Partition 的 Follower
  • 定义好请求,该请求应该能描述应用程序需要哪些数据
  • Fetch 数据
  • 识别 Leader 的变化,并对其作出必要的响应

消费端手动设置 offset 的确认

kafka 如果要自己手动设置 offset, 需要手动 commit 通知 topic 该信息已经消费。

比如接受到了 [1, 2, 3, 4, 5] 消息,这里处理时 [1, 2, 3, 4,] 消息消费失败,没有 commit, 但是消息 5 消费成功,成功 commit, 此时 offset 会重置到 5, 但其实之前的 1, 2, 3, 4 没有丢失,仍然保存在 kafka 中,下一次消费时会直接从 offset 取一个 batchSize 数量的信息,做如上的处理。

commitSync

该方法是 kafka consumer 手动设置 offset 后,应当调用的 commit 方法,以便 kafka 重置 offset.

官方文档说该方法提交的 offset 会成为 kafka rebalance 和 startup 之后的 first fetch, 也就是说使用该方法提交 offset 之后,offset 会被设置为该值,下次读取会从该 offset fetch. 因此,此时手动提交 offset 需要在消费端自己维护 offset.

offset 和 consumer position

kafka offset 是一条记录的唯一标识,同样也是 consumer 在这个 partition 的下次应当消费的位置。
关于 consumer 在 partition 实际上有两个概念.
consumer position 保存在一个特殊 topic, _consumer_offset 中。

  • offset
  • committed position

position

position 为下一条应当交付给 consumer 的记录。将会等于 consumer 已经在这个 partition 中拉取的 offset 的 + 1, 这个值每当 consumer 调用 poll(java 接口,其他语言类似) 会自动更新。

committed position

committed position 是最后一次确认消费的 offset.
默认行为是周期性的自动 commit, 也可以手动调用 consumer 的 commitSynccommitAsync 手动提交 offset.
committed position 是 consumer 宕机恢复时重新读取的 offset, 也是发生 rebalance 时, 原来未消费过该 partitions 的 consumer 分配到该 partitions 时读取的 offset.

consumer 探活

当 consumer 订阅一个 topic 后,consumer 调用 poll 时会自动加入 consumer group.
poll API 有保活机制。在底层实现中,会定期发送心跳给 server. 如果在 session.timeout.ms 时间间隔后 consumer 仍没有发送心跳,该 consumer 就会被为宕机,触发 rebalance.

consumer 可能会处于 livelock 情况,也就是 consumer 仍在定时发送心跳,但是却没有进行消费,就是从其对应的 partitions poll 消息,这种情况下 consumer 处于保活,但是不再活跃,占用资源。同样也有相关机制来防止,如果 consumer 在 max.poll.interval.ms 时间间隔后没有调用 poll 从 partitions 拉取数据,那么就会被移出其所在 consumer group, 方便其他 consumer 能够消费该 partitions.
当以上情况发生时,该 consumer 会发生一次 commit failure, 由 commitSync, 抛出一个 CommitFailException

redis 节点相关

每个 redis cluster 中的节点都有集群当前的配置信息,给出以下信息

  • 当前已知的节点
  • 与其他节点的连接的信息
  • 标志
  • 属性
  • 赋予的 slots

cluster nodes 命令可以得到与当前终端连接的节点所属的集群上面的信息。格式与 redis 在硬盘上保存信息的格式相同。

序列化格式

示例

1
2
3
4
5
6
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected
67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002 master - 0 1426238316232 2 connected 5461-10922
292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003 master - 0 1426238318243 3 connected 10923-16383
6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001 myself,master - 0 0 1 connected 0-5460

格式

输出格式为如下的 CSV 格式

1
<id> <ip:port> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>
  • id
    长度为 40 的随机字符串,在一个创建一个节点生成并不再改变
  • ip:port
    该节点的 ip 和 port
  • flags
    , 分割的 myself, master, slave, fail?, fail, handshake, noaddr, noflags
  • master 如果该节点非 master 节点且 master 节点已知,则这里应为 master id, 否则为 -
  • ping-sent unix 当前发送活跃 ping 的毫秒时间戳
  • pong-recv 最后一次收到 pong 的时间戳
  • config-epoch 节点的目前的配置版本
  • link-state 集群中节点间的连接总线状态,可以为 connected 或者 disconnected
  • slot 一个哈希 slot 数字或者区间,从 9 开始,最大为 16384. 这是每个节点各自维护的哈希槽.

flags

  • myself 当前连接的节点
  • master 这个节点为 master 节点
  • slave slave 节点
  • fail? 节点处于 PFAIL 状态,对于当前连接的节点不可达,但是逻辑上可达
  • fail 节点处于 FAIL 状态,对于多个节点都不可达,从 PFAIL 状态转来
  • handshake 不可信任的节点,当前正在握手
  • noaddr 这个节点的地址不可知
  • noflags

摘要

主要介绍 kafka high level consumer, consumer group, consumer rebalance, low level consumer 实现的语义,以及适用场景。

High Level Consumer

对于 kafka 使用者而言,不希望知道消息 offset 等的处理,但是有希望提供一些消息队列都提供的语义,比如同一消息只能被一个 consumer 消费,或者被所有 consumer 消费。因此 kafka High Level Consumer 提供了从 kafka 消费数据的搞高层抽象,从而屏蔽其中细节并提供丰富的语义。

Consumer Group

High level Consumer 将从某个 partition 读取的最后一条消息的 offset 存于 zookeeper 中。这个 offset 基于客户程序提供给 kafka 的名字来保存,这个名字被称为 Consumer Group. Consumer 是整个 kafka 集群全局的,而非某个 topic 的。每一个 High level Consumer 实例都属于一个 Consumer Group, 若不指定则属于默认的 Group.

传统的 Message queue 都会在消息被消费完后将消息删除,一方面是避免重复消费,另一方面可以保证 queue 的长度比较短,提高效率。而如上文所述,kafka 并不删除已消费的消息,为了实现传统 message queue 纸杯消费一次的语义,kafka 保证每条消息在同一个 Consumer group 里只会被某一个 Consumer 消费。与传统 Message queue 不同的是,kafka 还允许不同的 Consumer group 同时消费同一条信息,这一特性可以为消息的多元化处理提供处理。

High Level Consumer Rebalance

kafka 同一条信息在一个 consumer group 中只会被一个 consumer 消费,实际上,kafka 保证的是稳定状态下每一个 Consumer 实例只会消费一个或者多个 partition 数据,而某个 partition 的数据只会被某一个特定的 consumer 实例所消费。

kafka 对消息的分配是以 partition 为单位分配的,而非每一条消息作为分配单元。这样设计可能导致一个 consumer group 里面的 consumer 均匀消费数据,优势是每个 consumer 不同都跟大量的 broker 通信,减少通信开销,也降低了分配难度,实现也简单。
由于同一个 partition 里的数据是有序的,这种设计可以保证每个 partition 里面的数据可以被有序消费。

如果每个 consumer group 中的 consumer 数量少于 partition 数量,则至少有一个 consumer 会消费多个 partition 的数据,如果相同,则刚好一个 consumer 对应一个 partition. 如果 consumer 数量多于 partition 数量,会有部分 consumer 无法消费该 topic 中的任何一条消息。

consumer rebalance 算法

  • 将目标 topic 下的所有 partition 排序,存于
  • 对于 consumer group 下的所有 consumer 排序,存于 , 第 i 个 consumer 记为
  • , 向上取整
  • 解除 对原来分配的 partition 的消费权
  • 个 partition 分配给

目前版本的的 kafka(0.8.2.1) 的 consumer 的 consumer rebalance 的控制策略是由每一个 consumer 通过在 zookeeper 上注册完成 watch 的完成的。每个 consumer 被创建时触发时会触发 consumer group 的 rebalance, 具体启动流程如下

  • high level consumer 启动时将其 ID 注册到其 consumer group 下,在 zookeeper 上的路径为 /consumer/[consumer group]/ids/[consumer id]
  • /consumers/[consumer group]/ids 上注册 watch
  • /brokers/ids 上注册 watch
  • 如果 consumer 通过 topic filter 创建信息流,则他会同时在 /brokers/topics 上也创建 watch
  • 强制自己在其 consumer group 内启动 rebalance 流程。

在这种策略下,每一个 consumer 或者 broker 的增加或者减少都会触发 consumer rebalance. 因为每个 consumer 只负责调整自己所消费的 partition, 为了保证整个 consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance.

Low Level Consumer

使用 kafka 的 Low Level Consumer 通常是希望更好的控制数据的消费,比如

  • 信息重复消费
  • 只读取某个 topic 的部分 patition
  • 管理实务,从而确保每条信息被处理一次,且仅被处理一次

与 Consumer group 相比,Low level consumer 要求用户做大量的额外工作。

  • 必须在应用程序中跟踪 offset, 从而确定下一条应该消费哪条信息
  • 应用程序需要通过程序获知每个 partition 的 leader 是谁
  • 必须处理 leader 的变化

使用 low level consumer 的一般流程如下

  • 查找到一个活着的 broker, 并且找出每个 partition 的 leader
  • 找出每个 partition 的 follower
  • 定义好请求,该请求应该能描述应用程序需要哪些数据
  • fetch 数据
  • 识别 leader 的变化,并对其做出必要的响应

golang 缺失 golang/x/sys 包,可是这个属于核心组件,应该是不会缺失的,只能手动下载了

进入 $GOPATH/src/golang.org/x/, 手动 clone 相关代码

1
2
cd $GOPATH/src/golang.org/x/
git clone https://github.com/golang/sys.git

问题解决,类似问题可以通过相同方法

0%