今でもあなたは私の光丶

Kafka(5)分区

副本机制

Kafka在一定数量的服务器上对主题分区进行复制。
当集群中的一个broker宕机后系统可以自动故障转移到其他可用的副本上,不会造成数据丢失。
--replication-factor 3 1leader+2follower

  1. 将复制因子为1的未复制主题称为复制主题。
  2. 主题的分区是复制的最小单元。
  3. 在非故障情况下,Kafka中的每个分区都有一个Leader副本和零个或多个Follower副本。
  4. 包括Leader副本在内的副本总数构成复制因子。
  5. 所有读取和写入都由Leader副本负责。
  6. 通常,分区比broker多,并且Leader分区在broker之间平均分配。

Follower分区像普通的Kafka消费者一样,消费来自Leader分区的消息,并将其持久化到自己的 日志中。
允许Follower对日志条目拉取进行批处理。

同步节点定义:

  1. 节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的心跳机制)
  2. 对于Follower副本分区,它复制在Leader分区上的写入,并且不要延迟太多

Kafka提供的保证是,只要有至少一个同步副本处于活动状态,提交的消息就不会丢失。

宕机如何恢复

(1)少部分副本宕机
当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数 据清空,重新从leader里pull数据。

(2)全部副本宕机
当全部副本宕机了有两种恢复方式

  1. 等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性)
  2. 选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的 数据,因此造成数据丢失)

Leader选举

下图中
分区P1的Leader是0,ISR是0和1
分区P2的Leader是2,ISR是1和2
分区P3的Leader是1,ISR是0,1,2。

生产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和 Leader保持同步。
对于P1,如果0宕机会发生什么?

Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发生故障的时 候,就需要进行分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。

如何选举?

如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新 的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的 Leader。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集 合,该集合中是一些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生 产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。

如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可用。

为什么不用少数服从多数的方法

少数服从多数是一种比较常见的一致性算发和Leader选举法。
它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;
选择Leader时也是从超过半数的同步的副本中选择。
这种算法需要较高的冗余度,跟Kafka比起来,浪费资源。
譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。
而kafka的ISR集合方法,分别只需要两个和三个副本。

如果所有的ISR副本都失败了怎么办?

此时有两种方法可选,

  1. 等待ISR集合中的副本复活,
  2. 选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。
    • 需要设置 unclean.leader.election.enable=true

这两种方法各有利弊,实际生产中按需选择。 如果要等待ISR副本复活,虽然可以保证一致 性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。

总结:

Kafka中Leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从 ISR中随机挑选一个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:

  1. 可以等待ISR中的副本任何一个恢复,接着对外提供服务,需要时间等待。
  2. 从OSR中选出一个副本做Leader副本,此时会造成数据丢失

分区重新分配

向已经部署好的Kafka集群里面添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置文 件,然后把里面的broker id修改成全局唯一的,最后启动这个节点即可将它加入到现有Kafka集群中。

问题:新添加的Kafka节点并不会自动地分配数据,无法分担集群的负载,除非我们新建一个 topic。
需要手动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的工具来重新分布某个 topic的分区。

在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:

1. 创建主题:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
create --topic tp_re_01 --partitions 5 --replication-factor 1

2. 查看主题信息:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
describe --topic tp_re_01
Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:
Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 4 Leader: 0 Replicas: 0 Isr: 0

3. 在node11搭建Kafka:

  • 拷贝JDK并安装
[root@node1 opt]# scp jdk-8u261-linux-x64.rpm node11:~

此处不需要zookeeper,切记!!!

让配置生效:

. /etc/profile
  • 拷贝node1上安装的Kafka
[root@node1 opt]# scp -r kafka_2.12-1.0.2/ node11:/opt
  • 修改node11上Kafka的配置:
  • 启动Kafka:
[root@node11 ~]# kafka-server-start.sh /opt/kafka_2.12-
1.0.2/config/server.properties

注意观察node11上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否一 致,如果是,证明node11和node1在同一个集群中。 node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

在node1上查看zookeeper的节点信息:

4. 现在我们在现有集群的基础上再添加一个Kafka节点,然后使用Kafka自带的 kafkareassign-partitions.sh 工具来重新分布分区。该工具有三种使用模式:

  1. generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
  2. execute模式,根据指定的reassign plan重新分配Partition
  3. verify模式,验证重新分配 Partition是否成功

5. 我们将分区3和4重新分布到broker1上,借助kafka-reassign-partitions.sh工具生成reassign plan,不过我们先得按照要求定义一个文件,里面说明哪些topic需要重新分区,文件内容如 下

[root@node1 ~]# cat topics-to-move.json
{
"topics": [
{
"topic":"tp_re_01"
}
],
"version":1
}

然后使用 kafka-reassign-partitions.sh 工具生成reassign plan

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka -
-topics-to-move-json-file topics-to-move.json --broker-list "0,1" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}

Proposed partition reassignment configuration下面生成的就是将分区重新分布到broker 1上的 结果。我们将这些内容保存到名为result.json文件里面(文件名不重要,文件格式也不一定要以json为 结尾,只要保证内容是json即可),然后执行这些reassign plan:

执行计划:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka -
-reassignment-json-file topics-to-execute.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
[root@node1 ~]#

这样Kafka就在执行reassign plan,我们可以校验reassign plan是否执行完成:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka -
-reassignment-json-file topics-to-execute.json --verify
Status of partition reassignment:
Reassignment of partition tp_re_01-1 completed successfully
Reassignment of partition tp_re_01-4 completed successfully
Reassignment of partition tp_re_01-2 completed successfully
Reassignment of partition tp_re_01-3 completed successfully
Reassignment of partition tp_re_01-0 completed successfully
[root@node1 ~]#

查看主题的细节:

分区的分布的确和操作之前不一样了,broker 1上已经有分区分布上去了。使用 kafka-reassignpartitions.sh 工具生成的reassign plan只是一个建议,方便大家而已。其实我们自己完全可以编辑 一个reassign plan,然后执行它,如下:

{
"version": 1,
"partitions": [{
"topic": "tp_re_01",
"partition": 4,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "tp_re_01",
"partition": 1,
"replicas": [0],
"log_dirs": ["any"]
}, {
"topic": "tp_re_01",
"partition": 2,
"replicas": [0],
"log_dirs": ["any"]
}, {
"topic": "tp_re_01",
"partition": 3,
"replicas": [1],
"log_dirs": ["any"]
}, {
"topic": "tp_re_01",
"partition": 0,
"replicas": [0],
"log_dirs": ["any"]
}]
}

将上面的json数据文件保存到my-topics-to-execute.json文件中,然后也是执行它:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka
--reassignment-json-file my-topics-to-execute.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka
--reassignment-json-file my-topics-to-execute.json --verify
Status of partition reassignment:
Reassignment of partition tp_re_01-1 completed successfully
Reassignment of partition tp_re_01-4 completed successfully
Reassignment of partition tp_re_01-2 completed successfully
Reassignment of partition tp_re_01-3 completed successfully
Reassignment of partition tp_re_01-0 completed successfully

等这个reassign plan执行完,我们再来看看分区的分布:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --
topic tp_re_01
Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:
Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: tp_re_01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
[root@node1 ~]#

自动再均衡

我们可以在新建主题的时候,手动指定主题各个Leader分区以及Follower分区的分配情况,即什么 分区副本在哪个broker节点上。
随着系统的运行,broker的宕机重启,会引发Leader分区和Follower分区的角色转换,最后可能 Leader大部分都集中在少数几台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区 的少数几台服务器的网络I/O,CPU,以及内存都会很紧张。

Leader和Follower的角色转换会引起Leader副本在集群中分布的不均衡,此时我们需要一种手 段,让Leader的分布重新恢复到一个均衡的状态。

执行脚本:

[root@node11 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --
topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"

上述脚本执行的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本 在列表中第一个指定的brokerId上,Follower副本在随后指定的brokerId上。

然后模拟broker0宕机的情况:

# 通过jps找到Kafka进程PID
[root@node1 ~]# jps
54912 Jps
1699 QuorumPeerMain
1965 Kafka
# 直接杀死进程
[root@node1 ~]# kill -9 1965
[root@node1 ~]# jps
1699 QuorumPeerMain
54936 Jps
[root@node1 ~]#
# 查看主题分区信息:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe -
-topic tp_demo_03
Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1
Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1
[root@node1 ~]#
# 重新启动node1上的Kafka
[root@node1 ~]# kafka-server-start.sh -daemon /opt/kafka_2.12-
1.0.2/config/server.properties
[root@node1 ~]# jps
1699 QuorumPeerMain
55525 Kafka
55557 Jps
[root@node1 ~]#
# 查看主题的分区信息:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe -
-topic tp_demo_03
Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr:
1,0
Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr:
1,0
Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr:
1,0
[root@node1 ~]#
# broker恢复了,但是Leader的分配并没有变化,还是处于Leader切换后的分配情况。

是否有一种方式,可以让Kafka自动帮我们进行修改?改为初始的副本分配?
此时,用到了Kafka提供的自动再均衡脚本: kafka-preferred-replica-election.sh

先看介绍:

该工具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之 间均衡分配。
如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进行操作,自动再平衡。

具体操作:

1. 创建preferred-replica.json,内容如下:

{
"partitions": [
{
"topic":"tp_demo_03",
"partition":0
},
{
"topic":"tp_demo_03",
"partition":1
},
{
"topic":"tp_demo_03",
"partition":2
}
]
}

2. 执行操作:

[root@node1 ~]# kafka-preferred-replica-election.sh --zookeeper
node1:2181/myKafka --path-to-json-file preferred-replicas.json
Created preferred replica election path with
{"version":1,"partitions":[{"topic":"tp_demo_03","partition":0},
{"topic":"tp_demo_03","partition":1},
{"topic":"tp_demo_03","partition":2}]}
Successfully started preferred replica election for partitions
Set(tp_demo_03-0, tp_demo_03-1, tp_demo_03-2)
[root@node1 ~]#

3. 查看操作的结果:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
describe --topic tp_demo_03
Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr:
1,0
Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0
Isr: 1,0
Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1
Isr: 1,0
[root@node1 ~]#

恢复到最初的分配情况。
之所以是这样的分配,是因为我们在创建主题的时候:

--replica-assignment "0:1,1:0,0:1"

在逗号分割的每个数值对中排在前面的是Leader分区,后面的是副本分区。那么所谓的preferred replica,就是排在前面的数字就是Leader副本应该在的brokerId。

修改分区副本

实际项目中,我们可能由于主题的副本因子设置的问题,需要重新设置副本因子
或者由于集群的扩展,需要重新设置副本因子。
topic一旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。

说明:kafka 1.0版本配置文件默认没有default.replication.factor=x, 因此如果创建topic时,不 指定–replication-factor 想, 默认副本因子为1. 我们可以在自己的server.properties中配置上常用的副 本因子,省去手动调整。例如设置default.replication.factor=3, 详细内容可参考官方文档

原因分析:

假设我们有2个kafka broker分别broker0,broker1。

  1. 当我们创建的topic有2个分区partition时并且replication-factor为1,基本上一个broker上一 个分区。当一个broker宕机了,该topic就无法使用了,因为两个个分区只有一个能用。
  2. 当我们创建的topic有3个分区partition时并且replication-factor为2时,可能分区数据分布情 况是 broker0, partiton0,partiton1,partiton2, broker1, partiton1,partiton0, partiton2, 每个分区有一个副本,当其中一个broker宕机了,kafka集群还能完整凑出该topic的两个分 区,例如当broker0宕机了,可以通过broker1组合出topic的两个分区。

1. 创建主题:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
create --topic tp_re_02 --partitions 3 --replication-factor 1

2. 查看主题细节:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
describe --topic tp_re_02
Topic:tp_re_02 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
[root@node1 ~]#

3. 修改副本因子:错误

4. 使用 kafka-reassign-partitions.sh 修改副本因子:

创建increment-replication-factor.json

{
"version":1,
"partitions":[
{"topic":"tp_re_02","partition":0,"replicas":[0,1]},
{"topic":"tp_re_02","partition":1,"replicas":[0,1]},
{"topic":"tp_re_02","partition":2,"replicas":[1,0]}
]
}

5. 执行分配:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper
node1:2181/myKafka --reassignment-json-file increase-replicationfactor.json --execute
Current partition replica assignment
{"version":1,"partitions":
[{"topic":"tp_re_02","partition":2,"replicas":[1,0],"log_dirs":
["any","any"]},{"topic":"tp_re_02","partition":1,"replicas":
[0,1],"log_dirs":["any","any"]},
{"topic":"tp_re_02","partition":0,"replicas":[0,1],"log_dirs":
["any","any"]}]}
Save this to use as the --reassignment-json-file option during
rollback
Successfully started reassignment of partitions.

6. 查看主题细节:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --
describe --topic tp_re_02
Topic:tp_re_02 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 0,1 Isr:
1,0
Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0,1 Isr:
0,1
Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1,0 Isr:
1,0
[root@node1 ~]#

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只能被一个消费组下面的一个消费者 消费,这里就产生了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实 现:RangeAssignor、RoundRobinAssignor、StickyAssignor。

RangeAssignor

PartitionAssignor接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。
消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调 者选择其中的一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。 Kafka默认采用RangeAssignor的分配算法。

RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行 数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费 者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一 些分区。

大致算法如下:

assign(topic, consumers) {
// 对分区和Consumer进行排序
List<Partition> partitions = topic.getPartitions();
sort(partitions);
sort(consumers);
// 计算每个Consumer分配的分区数
int numPartitionsPerConsumer = partition.size() / consumers.size();
// 额外有一些Consumer会多分配到分区
int consumersWithExtraPartition = partition.size() % consumers.size();
// 计算分配结果
for (int i = 0, n = consumers.size(); i < n; i++) {
// 第i个Consumer分配到的分区的index
int start = numPartitionsPerConsumer * i + Math.min(i,
consumersWithExtraPartition);
// 第i个Consumer分配到的分区数
int length = numPartitionsPerConsumer + (i + 1 >
consumersWithExtraPartition ? 0 : 1);
// 分装分配结果
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start,
start + length));
}
}

RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分 区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个Topic, RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消 费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严 重,比如上图中4个分区3个消费者的场景,C0会多分配一个分区。如果此时再订阅一个分区数为4的 Topic,那么C0又会比C1、C2多分配一个分区,这样C0总共就比C1、C2多分配两个分区了,而且随着 Topic的增加,这个情况会越来越严重。

字典序靠前的消费组中的消费者比较“贪婪”。

RoundRobinAssignor

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽 量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订 阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之 间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量 均衡”的,因为某些消费者不参与一些Topic的分配。

相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽 量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅 的Topic越来越多,差值越来越大)。

对于消费组内消费者订阅Topic不一致的情况:假设有两个个消费者分别为C0和C1,有2个Topic T1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结 果如下:

看上去分配已经尽量的保证均衡了,不过可以发现C0承担了4个分区的消费而C1订阅了T2一个分 区,是不是把T2P0交给C1消费能更加的均衡呢?

StickyAssignor

动机

尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一 些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。

更核心的问题是无论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有 考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次分配的结果,尽量少的 调整分区分配的变动,显然是能节省很多开销的。

目标

从字面意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”:

  1. 分区的分配尽量的均衡
  2. 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成 的,而第二个目标才真正体现出StickyAssignor特性的。
我们先来看预期分配的结构,后续再具体分析StickyAssignor的算法实现。

例如:

  • 有3个Consumer:C0、C1、C2
  • 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区
  • 所有Consumer都订阅了这4个分区

StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对比):

如果消费者1宕机,则按照RoundRobin的方式分配结果如下:
打乱从新来过,轮询分配:

按照Sticky的方式:

仅对消费者1分配的分区进行重分配,红线部分。最终达到均衡的目的。

再举一个例子:

  • 有3个Consumer:C0、C1、C2
  • 3个Topic:T0、T1、T2,它们分别有1、2、3个分区
  • C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2

分配结果如下图所示:

消费者0下线,则按照轮询的方式分配:

按照Sticky方式分配分区,仅仅需要动的就是红线部分,其他部分不动。

StickyAssignor分配方式的实现稍微复杂点儿,我们可以先理解图示部分即可。感兴趣的同学可以 研究一下。

自定义分配策略

自定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接 口。PartitionAssignor接口的定义如下:

Subscription subscription(Set<String> topics);
String name();
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription>
subscriptions);
void onAssignment(Assignment assignment);
class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
}
class Assignment {
private final List<TopicPartition> partitions;
private final ByteBuffer userData;
}

PartitionAssignor接口中定义了两个内部类:Subscription和Assignment。

Subscription类用来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费 者所订阅topic列表和用户自定义信息。PartitionAssignor接口通过subscription()方法来设置消费者自 身相关的Subscription信息,注意到此方法中只有一个参数topics,与Subscription类中的topics的相互 呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在subscription() 方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、ip地址、host或者机架 (rack)等等。

再来说一下Assignment类,它是用来表示分配结果信息的,类中也有两个属性:partitions和 userData,分别表示所分配到的分区集合和用户自定义的数据。可以通过PartitionAssignor接口中的 onAssignment()方法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor 策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再平衡(rebalance)时可以提供分 配参考依据。

接口中的name()方法用来提供分配策略的名称,对于Kafka提供的3种分配策略而言, RangeAssignor对应的protocol_name为“range”,RoundRobinAssignor对应的protocol_name为 “roundrobin”,StickyAssignor对应的protocol_name为“sticky”,所以自定义的分配策略中要注意命名 的时候不要与已存在的分配策略发生冲突。这个命名用来标识分配策略的名称,在后面所描述的加入消 费组以及选举消费组leader的时候会有涉及。

真正的分区分配方案的实现是在assign()方法中,方法中的参数metadata表示集群的元数据信息, 而subscriptions表示消费组内各个消费者成员的订阅信息,最终方法返回各个消费者的分配信息。

Kafka中还提供了一个抽象类 org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor接口的实现,对assign()方法进行了实现,其中会将Subscription中的userData信息 去掉后,在进行分配。Kafka提供的3种分配策略都是继承自这个抽象类。如果开发人员在自定义分区分 配策略时需要使用userData信息来控制分区分配的结果,那么就不能直接继承 AbstractPartitionAssignor这个抽象类,而需要直接实现PartitionAssignor接口。

package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyAssignor extends AbstractPartitionAssignor {
}

在使用时,消费者客户端需要添加相应的Properties参数,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
MyAssignor.class.getName());