Kafka消息队列相关见解资料摘录

kafka的优点如下

  • 时间复杂度为O(1);性能与数据量多少无关;
  • 高吞吐,100k条/s;
  • 消息分区
  • 分布式消费。
  • Scale out。

基础概念

Brokers

消息管理; 存储/删除

  • 一共三个broker,存储了不同的topic;
  • 以topic1为例,topic1有多个partitions;图中黄色部分为partition1-4,存储在不同的brokers上,这些是partition的Leader;
  • 灰色的topic1:partition1-4,作为partition的replica,图中我们的的副本数为2,在server.properity配置文件中由参数offsets.topic.replication.factor控制;
  • topic1:partition3有一个segments,由一个offset.index和offset.log组成。
  • offset.log有多个message顺序写入,命名是起始message的offset。

brokers的存储内容归纳如下:

  • broker——>topics——>partitions——>segments(index and logs)
  • 一个broker上存储不同的topic;
  • topic是逻辑结构,相当于query,不同的topic由多个连续的partition组成,每个partition对应一个物理文件夹;
  • kafka实现replica的单元是partitions,由参数offsets.topic.replication.factor控制;默认是3,Leader提供读写,由zk配合进行Leader的选举,选举过程见;
  • 每个partition由多个segments组成(index和log),命名为起始message的offset;

需要了解

  • 信息存储,message的存储格式
  • Replica的数据同步
  • Leader选举
  • HA方案与故障恢复
  • 过期数据清理

message结构

ZK

zookeepers(负责选举,均衡,meta记录,消费记录)

zookeeper在集群中与broker和consumer进行交互,维护数据和集群高可用。

记录consumer消费message的位置信息;
partitions故障时进行Leader Election
kafka的meta信息在zookeeper如何存储
kafka在zookeeper的结构图如下:

三级目录是一些组件:

  • consumers consumers的信息,记录对partition访问偏移量,由consumer自己维护,目录/sohudba_kafka/consumers/[consumer]/offsets/[topic]/[partition]
  • broders 维护broders的信息,包含borders下的partition,每个partition的ISR,当前leader,目录结构比较复杂,我们后面细说;
  • producers producers的信息,当前zookeeper未记录任何信息
  • admin admin维护删掉的topic,partition的重新分配(过后删除),partition选举Leader(过后删除)
  • config
  • controlers 增删topic/重新分配replica/RPC通知其他broker
  • controler_epoch
  • isr_change_notifications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
get /brokers/topics/shuaige/partitions/0/state
{"controller_epoch":5, ##表示kafka集群中的中央控制器选举次数
"leader":1, ##当前partition的leader所在的borker id
"version":1, ##版本编号默认为1
"leader_epoch":6, ##leader选举次数
"isr":[2,1,3] ##当前partition的In-sync replica,副本组的borker id列表
}
cZxid = 0x35
ctime = Wed Mar 20 11:40:59 CST 2019
mZxid = 0x35
mtime = Wed Mar 20 11:40:59 CST 2019
pZxid = 0x35
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 74
numChildren = 0

borker数据存储示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

{"version":1,
"partitions":{
"45":[2,1,3],
"34":[3,2,1],
"12":[2,3,1],
"8":[1,2,3],
"19":[3,1,2],
"23":[1,3,2],
"4":[3,2,1],
"40":[3,2,1],
"15":[2,1,3],
"11":[1,3,2],
"9":[2,1,3],
"44":[1,2,3],
"33":[2,1,3],
"22":[3,2,1],
"26":[1,2,3],
"37":[3,1,2],
"13":[3,1,2],
"46":[3,2,1],
"24":[2,3,1],
.......
}
}

数据操作

为避免broker挂后造成数据丢失,kafka实现了高可用方式。

  • 基于partition实现Replica。并与zookeeper配合实现Leader的选举。
  • 通过算法将partition的Leader与Fellowers分散于不同的broker。

replica实现

在brokers的物理结构中,replication有多个follewers,分散于不同的brokers。通过增量日志实现。

partition的log记录是顺序的,通过server.properties中log.retention.hours参数定义日志保留时长,过期则删除。新写入的message append记录在partition中。

为提升效率,

follewers会在message未写入log时,读到message则将ACK发送给Leader,因此只能保证存在Replica,不能保证数据一定持久化了。
批量复制
ISR是In-Sync Replicate 记录与Leader保持同步的列表。

同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。

不能够一个consumer group的多个consumer同时消费一个partition。

Leader Election

判断Replica活着

  • (1)与zk有心跳通讯;
  • (2)与Leader通讯及时。两者有一不满足,fellower都会从ISR中移除。

选举算法
一般的leader选举算法,有Majority Vote/Zab/Raft/PacificA。

kafka采用的即PacificA,kafka维护多个ISR,但不不像Majorty Vote算法,限制最少的2N+1节点和N+1以上投票。

即使只有1个follewer,也可完成Leader选举。


进阶内容

基本概念

Producer、Broker、Consumer

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer,以及一个ZooKeeper集群

-(1)Producer:生产者,也就是发送消息的一方。
-(2)Consumer:消费者,也就是接收消息的一方。
-(3)Broker:服务代理节点。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。

Topic、Partition

在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。
Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

AR、ISR、OSR

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步
分区中的所有副本统称为AR(Assigned Replicas)。
所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。
前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。
与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas)由此可见AR=ISR+OSR。
正常情况所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR集合为空。

1
2
3
当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。
如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。
默认情况下当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会

HW、LEO

ISR与HW和LEO也有紧密的关系。
HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset

1
2
3
4
5
6
7
假设某个分区的ISR集合中有3个副本,即一个leader副本和2个follower副本,此时分区的LEO和HW都为3。

消息3和消息4从生产者发出之后会被先存入leader副本
在消息写入leader副本之后,follower副本会发送拉取请求来拉取消息3和消息4以进行消息同步。
在同步过程中,不同的 follower 副本的同步效率也不尽相同
在某一时刻follower1完全跟上了leader副本而follower2只同步了消息3
如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO为4,那么当前分区的HW取最小值4,此时消费者可以消费到offset为0至3之间

事实上同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。
而在异步复制方式下,follower副本异步地从leader副本中复制数据,数据只要被leader副本写入就被认为已经成功提交。
在这种情况下如果follower副本都还没有复制完而落后于leader副本,突然leader副本宕机,则会造成数据丢失。
Kafka使用的这种ISR的方式则有效地权衡了数据可靠性和性能之间的关系。

相关参数

服务端参数配置

  • zookeeper.connect
    broker要连接的ZooKeeper集群的服务地址(包含端口号)
    如果ZooKeeper集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181这种格式
  • listeners
    该参数指明broker监听客户端连接的地址列表,即为客户端要连接broker的入口地址列表,配置格式为protocol1://hostname1:port1,protocol2://hostname2:port2
  • broker.id
    Kafka集群中broker的唯一标识,默认值为-1。
    如果没有设置,那么Kafka会自动生成一个。
    这个参数还和meta.properties文件及服务端参数broker.id.generation.enablereserved.broker.max.id有关
  • log.dirlog.dirs
    log.dir用来配置单个根目录,而og.dirs用来配置多个根目录(以逗号分隔)
    log.dirs的优先级比 log.dir 高,但是如果没有配置log.dirs,则会以 log.dir 配置为准。
    默认情况下只配置了log.dir 参数,其默认值为/tmp/kafka-logs。
  • message.max.bytes
    broker所能接收消息的最大值,默认值为1000012(B),约等于976.6KB。
    如果Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出RecordTooLargeException的异常。
    如果需要修改这个参数,那么还要考虑max.request.size (客户端参数)、max.message.bytes(topic端参数)等参数的影响。




生产者

必要的参数配置

  • bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单
    具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开。
    注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。
    不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。
  • key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。
    org.apache.kafka.common.serialization.StringSerializer
  • client.id:KafkaProducer对应的客户端id
    如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”“producer-2”,即字符串“producer-”与数字的拼接。

由于参数众多可用org.apache.kafka.clients.producer.ProducerConfig做key的提取

重要的生产者参数

acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的

  • acks=1。默认值即为1。
    生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
  • acks=0。
    生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了
  • acks=-1或acks=all。
    生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
1
2
3
4
5
Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。
对于某些应用来说,顺序性非常重要,比如MySQL的binlog传输,如果出现错误就会造成非常严重的后果。
如果将acks参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,那么就会出现错序的现象
如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。
一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。
max.request.size

用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB。
这个参数还涉及一些其他参数的联动,比如broker端的message.max.bytes参数,如果配置错误可能会引起一些不必要的异常。
比如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条15b的数据就会抛出其他异常

retries和retry.backoff.ms

retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。

1
2
3
消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的
生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。
如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。

不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。

重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
在配置 retries 和 retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

compression.type

这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。
该参数还可以配置为“gzip”“snappy”和“lz4”。
对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。
消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

connections.max.idle.ms

用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。

linger.ms

用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。
生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms值时发送出去。

增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。

receive.buffer.bytes

这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。
如果设置为-1,则使用操作系统的默认值。

如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。

send.buffer.bytes

用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。
与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

request.timeout.ms

配置Producer等待请求响应的最长时间,默认值为30000(ms)。
请求超时之后可以选择进行重试。
注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。



消息的发送

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker

拦截器(下一章会详细介绍),而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

ProducerRecord

  • topic和partition字段分别代表消息要发往的主题和分区号。
  • headers字段是消息的头部,Kafka0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。
  • key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。
    前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。有key的消息还可以支持日志压缩的功能。
  • value是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息。
  • timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间

发送模式

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

  • 同步

    1
    2
    3
    4
    5
    try{
    producer.send(record).get();
    } catch(ExecutionException | InterruptedException){
    e.printStakcTrace();
    }
  • 发后即忘

    1
    2
    3
    4
    5
    try{
    producer.send(record);
    } catch(ExecutionException | InterruptedException){
    e.printStakcTrace();
    }
  • 异步

    1
    2
    3
    4
    5
    6
    producer.send(record, (metadata, exception) -> {
    if (exception != null) {
    MessageDataDTO dataDTO = new MessageDataDTO(newNewTopic, message, tag, key);
    logger.info(MQConstant.PRODUCE_FAIL_NOTIFY, JSON.toJSONString(dataDTO));
    }
    });

RecordMetadata

RecordMetadata对象里包含了消息的一些元数据信息
比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。
如果在应用代码中需要这些信息,则可以使用这个方式。

close

一个KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用KafkaProducer的close()方法来回收资源。
close()方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。
与此同时KafkaProducer还提供了一个带超时时间的close()方法

序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。
而在对侧消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。

为了方便,消息的key和value都使用了字符串,对应程序中的序列化器也使用了客户端自带的org.apache.kafka.common.serialization.StringSerializer
除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口

分区器

Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,这个接口中定义了2个方法,具体如下所示。

1
2
3
4
5
6
7
8
9
10
public interface Partitioner extends Configurable, Closeable {

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);


void close();

default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
  • partition()方法用来计算分区号,返回值为int类型。
    partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。
  • close()方法在关闭分区器的时候用来回收一些资源。

DefaultPartitioner

DefaultPartitioner 的实现中,close()是空方法,而在 partition()方法中定义了主要的分区分配逻辑。

  • 如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。
  • 如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

生产者拦截器

Kafka一共有两种拦截器:生产者拦截器和消费者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如发送成功、失败统计类工作。

org.apache.kafka.clients.producer.ProducerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
public interface ProducerInterceptor<K, V> extends Configurable {

ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);


void onAcknowledgement(RecordMetadata metadata, Exception exception);

/**
* This is called when interceptor is closed
*/
void close();
}

onSend()
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。
一般来说最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。
比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。

onAcknowledgement()
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。
这个方法运行在Producer的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

原理分析

开头介绍了消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用
那么在此之后又会发生什么呢?

1
2
3
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。
在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。
Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator

RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。
如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

ProducerBatch

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。
消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。
与此同时将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量

如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

ProducerBatch#batch.size

在发送之前需要创建一块内存区域来保存对应的消息。
在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。
不过频繁的创建和释放是比较耗费资源的
在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。
不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中
这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。

在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小

  • 如果不超过,那么就以batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool 的管理来进行复用;
  • 如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender线程

Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。

1
2
对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区
而对于 KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。

在转换成<Node,List<ProducerBatch>>的形式之后,Sender 还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了
这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest,更多与 Kafka 协议有关的内容

Sender线程#InFlightRequests

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。

与此同时InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。
这个配置参数为max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。
通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据的更新

InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。
这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大

1
ProducerRecord<string, string> record = new ProducerRecord<>(topic, "Hello,Kafka !");

元数据信息的作用

我们只知道主题的名称,对于其他一些必要的信息却一无所知。
KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。

元数据信息的内容

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

更新流程

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新操作。

客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。
当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。
这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。
元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。






消费者

与生产者对应的是消费者,应用程序可以通过KafkaConsumer来订阅主题,并从订阅的主题中拉取消息。
不过在使用KafkaConsumer消费消息之前需要先了解消费者和消费组的概念

其次一个正常的消费逻辑需要具备以下几个步骤

  • (1)配置消费者客户端参数及创建相应的消费者实例。
  • (2)订阅主题。
  • (3)拉取消息并消费。
  • (4)提交消费位移。
  • (5)关闭消费者实例。

主要讲述了消费者和消费组的概念,以及如何正确使用 KafkaConsumer。
具体的内容还包括参数配置的讲解、订阅、反序列化、位移提交、再均衡、消费者拦截器、多线程的使用。

消费者与消费组

基本概念

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。
与其他一些消息中间件不同的是l在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。

当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

某个主题中共有4个分区(Partition):P0、P1、P2、P3。有两个消费组A和B都订阅了这个主题,消费组A中有4个消费者(C0、C1、C2和C3),消费组B中有2个消费者(C4和C5)。

1
2
按照Kafka默认的规则,最后的分配结果是消费组A中的每一个消费者分配到1个分区,消费组B中的每一个消费者分配到2个分区,两个消费组之间互不影响。
每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。

组内只有一个消费者

组内有两个消费者

组内有三个消费者

组内有过多消费者

一共有8个消费者,7个分区,那么最后的消费者C7由于分配不到任何分区而无法消费任何消息。

投递模式

对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。

点对点模式

点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。

发布订阅模式

发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。
主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递

发布/订阅模式在消息的一对多广播时采用。

Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:·
如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

group.id

消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。
每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

必要的参数设置

在Kafka消费者客户端KafkaConsumer中有4个参数是必填的。

  • bootstrap.servers:该参数的释义和生产者客户端 KafkaProducer 中的相同,用来指定连接Kafka 集群所需的broker 地址清单
    具体内容形式为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。
    注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。
    这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。
  • group.id:消费者隶属的消费组的名称,默认值为“”。
    如果设置为空,则会报出异常:Exceptionin thread "main" org.apache.kafka.common.errors.InvalidGroupIdException:The configuredgroupId is invalid
  • key.deserializervalue.deserializer:与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。
    消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。
    这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。
    注意这里必须填写反序列化器类的全限定名,比如org.apache.kafka.common.serialization.StringDeserializer
  • client.id: 这个参数用来设定KafkaConsumer对应的客户端id,默认值也为“”。
    如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,内容形式如“consumer-1”,“consumer-2”,即字符串“consumer-”与数字的拼接。

由于参数众多可用org.apache.kafka.clients.consumer.ConsumerConfig做key的提取

重要的消费者参数

  • fetch.min.bytes
    该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。
    Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小
  • fetch.max.bytes
    该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。
    如果这个参数设置的值比任何一条写入Kafka中的消息要小,那么会不会造成无法消费呢?
    如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。
    也就是说上面问题的答案是可以正常消费。
    与此相关的Kafka中所能接收的最大消息的大小通过服务端参数message.max.bytes(对应于主题端参数max.message.bytes)来设置。
  • fetch.max.wait.ms
    这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。
    fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)。
    如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。
  • max.partition.fetch.bytes
    这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。
    这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。
  • max.poll.records
    这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。
    如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
  • connections.max.idle.ms
    这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
  • exclude.internal.topics
    Kafka中有两个内部的主题:__consumer_offsets__transaction_state
    exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。
    如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。
  • receive.buffer.bytes
    这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。
    如果设置为-1,则使用操作系统的默认值。
    如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。
  • send.buffer.bytes
    这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。
    与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
  • request.timeout.ms
    这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)。
  • metadata.max.age.ms
    这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。
    如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。
  • reconnect.backoff.ms
    这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。
  • retry.backoff.ms
    这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。
  • isolation.level
    这个参数用来配置消费者的事务隔离级别。
    字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置


订阅主题与分区

在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。
一个消费者可以订阅一个或多个主题
使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Consumer<K, V> extends Closeable {
Set<String> subscription();

void subscribe(Collection<String> var1);

void subscribe(Collection<String> var1, ConsumerRebalanceListener var2);

void subscribe(Pattern var1, ConsumerRebalanceListener var2);

void subscribe(Pattern var1);

void unsubscribe();

void assign(Collection<TopicPartition> var1);

}

subscribe(Collection);

1
2
consumer.subscribe(Lists.newArrayList(topic1));
consumer.subscribe(Lists.newArrayList(topic1));

费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。
如果前后两次订阅了不同的主题,那么消费者以最后一次的为准

1
如上示例最终消费者订阅的是topic2,而不是topic1,也不是topic1和topic2的并集。

subscribe(Pattern);

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅
在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。

assign(Collection);

消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区
在KafkaConsumer中还提供了一个assign()方法来实现这些功能

1
consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));

这里只订阅topic-demo主题中分区编号为0的分区

通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。
当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的

1
2
其实这一点从assign()方法的参数中就可以看出端倪
两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

partitionsFor(String);

List<PartitionInfo> partitionsFor(String topic);

如果我们事先并不知道主题中有多少个分区怎么办?
KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息

PartitionInfo

1
2
3
4
5
6
7
8
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}

PartitionInfo类中的属性

  • topic表示主题名称
  • partition代表分区编号
  • leader代表分区的leader副本所在的位置
  • replicas代表分区的AR集合
  • inSyncReplicas代表分区的ISR集合
  • offlineReplicas代表分区的OSR集合。

unsubscribe

既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer 中的 unsubscribe()方法来取消主题的订阅。
如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法


反序列化

之前讲述了KafkaProducer对应的序列化器,那么与此对应的KafkaConsumer就会有反序列化器。
Kafka所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short 及String类型的反序列化
这些序列化器也都实现了 Deserializer 接口,与KafkaProducer中提及的Serializer接口一样


消息消费

Kafka中的消费是基于拉模式的。
消息的消费一般有两种模式:推模式和拉模式。

  • 推模式是服务端主动将消息推送给消费者
  • 拉模式是消费者主动向服务端发起请求来拉取消息。

Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法
而poll()方法返回的是所订阅的主题(分区)上的一组消息。
如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空

poll()方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间
在消费者的缓冲区里没有可用数据时会发生阻塞。

简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单
它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容

ConsumerRecord

消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别)
这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富

具体的结构参考如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
private volatile Long checksum;
}
  • topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。
  • offset 表示消息在所属分区的偏移量。
  • timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。
  • timestampType 有两种类型:CreateTime 和LogAppendTime
    分别代表消息创建的时间戳和消息追加到日志的时间戳。
  • headers表示消息的头部内容。
  • key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是value,
    比如使用2.1.3节中的CompanySerializer序列化了一个Company对象,然后将其存入Kafka,那么消费到的消息中的 value 就是经过CompanyDeserializer 反序列化后的 Company对象。
  • serializedKeySize和serializedValueSize分别表示key和value经过序列化之后的大小
    如果key为空,则serializedKeySize值为-1。
    同样如果value为空,则serializedValueSize的值也会为-1。
  • checksum是CRC32的校验值。
    如需更加深入了解消息中的各个属性

ConsumerRecords

poll()方法的返回值类型是 ConsumerRecords
它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord

  • ConsumerRecords类提供了一个records(TopicPartition)方法来获取消息集中指定分区的消息
  • ConsumerRecords.partitions()方法用来获取消息集中所有分区


位移提交

Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。

对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。

笔者对offset做了一些区分:对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”

当然对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的

在每次调用poll()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。

1
2
3
4
并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。
再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作
对于同一分区而言,它可能在再均衡动作之后分配给新的消费者
如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在ZooKeeper中的。
而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。

这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

图中x表示某一次拉取操作中此分区消息的最大偏移量
那么我们就可以说消费者的消费位移为 x,lastConsumedOffset这个单词来标识它。
当前消费者需要提交的消费位移并不是 x,而是 x+1,

消费位移演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class CheckOffsetAndPosition {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
private static AtomicBoolean running = new AtomicBoolean(true);

public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(tp));
long lastConsumedOffset = -1;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords
= records.records(tp);
lastConsumedOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
consumer.commitSync();//同步提交消费位移
}
System.out.println("comsumed offset is " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("commited offset is " + offsetAndMetadata.offset());
long posititon = consumer.position(tp);
System.out.println("the offset of the next record is " + posititon);
}
}
  • assign()方法订阅了编号为0的分区,然后消费分区中的消息。
  • 通过调用ConsumerRecords.isEmpty()方法来判断是否已经消费完分区中的消息,以此来退出while(true)的循环

结果如下

1
2
3
consumed offset is 377
conmmited offset is 378
the offset of the next record is 378

可以看出,消费者消费到此分区消息的最大偏移量为377,对应的消费位移lastConsumedOffset也就是377。
在消费完之后就执行同步提交,但是最终结果显示所提交的位移committed offset为378,并且下一次所要拉取的消息的起始偏移量 position 也为 378。

消息丢失现象

position=committed offset=lastConsumedOffset+1,当然position和committed offset并不会一直相同,这一点会在下面的示例中有所体现。
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。

上图当前一次poll()操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移
说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。
如果拉取到消息之后就进行了位移提交,即提交了x+8
那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。
也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

消息重复现象

再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的
那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。
也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

更复杂的现象

而实际情况还会有比这两种更加复杂的情形,比如第一次的位移提交的位置为 x+8,而下一次的位移提交的位置为x+4,后面会做进一步的分析。

位移提交方式

自动提交

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为 true。

当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。

在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。
自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

1
2
3
4
5
在Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。
但随之而来的是重复消费和消息丢失的问题。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了
那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。
我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送

自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?

下图拉取线程A不断地拉取消息并存入本地缓存,比如在BlockingQueue中,另一个处理线程B从缓存中读取消息并进行相应的逻辑处理。
假设目前进行到了第y+1次拉取,以及第m次位移提交的时候,也就是x+6之前的位移已经确认提交了,处理线程B却还正在消费x+3的消息。
此时如果处理线程B发生了异常,待其恢复之后会从第m此位移提交处,也就是x+6的位置开始拉取消息
那么x+3至x+6之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。


手动提交

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。
在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。
在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。
开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式,commitSync()方法的定义如下:

commitSync同步提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class OffsetCommitSync {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
private static AtomicBoolean running = new AtomicBoolean(true);

public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}

public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}

先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。
针对上面的示例还可以修改为批量处理+批量提交的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//do some logical processing with buffer.
consumer.commitSync();
buffer.clear();
}
}
}

将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是示例中大于等于200个的时候,再做相应的批量处理,之后再做批量提交。
这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩溃,那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。


commitSync有参同步提交

对于采用 commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的
如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

int i = 0;
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
} finally {
consumer.close();
}
}

在实际应用中很少会有这种每消费一条消息就提交一次消费位移的必要场景。commitSync()方法本身是同步执行的,会耗费一定的性能
而示例中的这种提交方式会将性能拉到一个相当低的点。更多时候是按照分区的粒度划分提交位移的界限

所以又优化了一个版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//do some logical processing.
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}
} finally {
consumer.close();
}
}


commitAsync异步提交

与commitSync()方法相反,异步提交的方式(commitAsync()在执行的时候消费者线程不会被阻塞
可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。

异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

1
2
3
4
5
void commitAsync();

void commitAsync(OffsetCommitCallback var1);

void commitAsync(Map<TopicPartition, OffsetAndMetadata> var1, OffsetCommitCallback var2);

第一个无参的方法和第三个方法中的offsets都很好理解,对照commitSync()方法即可。

关键的是这里的第二个方法和第三个方法中的callback参数,它提供了一个异步提交的回调方法
当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete()方法。
这里采用第二个方法来演示回调函数的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
} finally {
consumer.close();
}
}

commitAsync()提交的时候同样会有失败的情况发生,那么我们应该怎么处理呢?

如果某一次异步提交的消费位移为 x,但是提交失败了,然后下一次又异步提交了消费位移为 x+y,这次成功了。
如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了 x。

如果此时发生异常(或者再均衡),那么恢复之后的消费者(或者新的消费者)就会从x处开始消费消息,这样就发生了重复消费的问题。
为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。
在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;
如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。

1
2
3
4
5
如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障
在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。
重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。
如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移
如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
} finally {
consumer.close();
}

try {
while (running.get()) {
//poll records and do some logical processing.
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}




控制或关闭消费

  • pause/resume: 暂停/继续消费
  • isRuning.get()判断是否while
  • wakeup进行跳出while
  • close关闭

KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区
当达到一定条件时再恢复这些分区的消费。

pause/resume

KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。

这两个方法的具体定义如下:

1
2
3
void pause(Collection<TopicPartition>);
void resume(Collection<TopicPartition>);
Set<TopicPartition> paused();

KafkaConsumer还提供了一个无参的paused()方法来返回被暂停的分区集合

isRuning

示例展示的都是使用一个while循环来包裹住poll()方法及相应的消费逻辑,如何优雅地退出这个循环也很有考究。
细心的读者可能注意到有些示例代码并不是以 while(true)的形式做简单的包裹,而是使用 while(isRunning.get()的方式,这样可以通过在其他地方设定isRunning.set(false)来退出while循环。

wakeup

还有一种方式是调用KafkaConsumer的wakeup()方法,wakeup()方法是 KafkaConsumer 中唯一可以从其他线程里安全调用的方法(KafkaConsumer 是非线程安全的),调用wakeup()方法后可以退出poll()的逻辑,并抛出 WakeupException 的异常,我们也不需要处理WakeupException 的异常,它只是一种跳出循环的方式。

close

跳出循环以后一定要显式地执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源、Socket连接等。
KafkaConsumer提供了close()方法来实现关闭,close()方法有三种重载方法,分别如下:

1
2
3
4
void close();
void close(Duration var1);
@Deprecated
void close(long var1, TimeUnit var3);
  • 第一种方法没有timeout 参数,这并不意味着会无限制地等待,它内部设定了最长等待时间(30秒
  • 第二种方法是通过 timeout 参数来设定关闭方法的最长执行时间,有些内部的关闭逻辑会耗费一定的时间
    比如设置了自动提交消费位移,这里还会做一次位移提交的动作
  • 第三种方法已被标记为@Deprecated,可以不考虑。




指定位移消费

auto.offset.reset

进行消费位移的提交,正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。

试想一下,当一个新的消费组建立的时候,它根本没有可以查找的消费位移。
或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。
__consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。

在 Kafka 中每当消费者查找不到所记录的消费位移时
就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”表示从分区末尾开始消费消息。
按照默认的配置,消费者会从9开始进行消费(9是下一条要写入消息的位置),更加确切地说是从9开始拉取消息。
如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费。

除了查找不到消费位移,位移越界也会触发auto.offset.reset参数的执行,这个在下面要讲述的seek系列的方法中会有相关的介绍。

seek

提供的auto.offset.reset参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。

有些时候我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
consumer.poll(Duration.ofMillis(2000));
Set<TopicPartition> assignment = consumer.assignment();
System.out.println(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}

如果consumer.poll(Duration.ofMillis(0));会发现seek()方法并未有任何作用。

1
2
因为当poll()方法中的参数为0时,此方法立刻返回
那么poll()方法内部进行分区分配的逻辑就会来不及实施。也就是说消费者此时并未分配到任何分区

参数设置为多少合适呢?

太短会使分配分区的动作失败,太长又有可能造成一些不必要的等待。
我们可以通过KafkaConsumer的assignment()方法来判定是否分配到了相应的分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
long start = System.currentTimeMillis();
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如果不为0,则说明已经成功分配到了分区
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}

seekToBeginning和seekToEnd

endOffsets&beginningOffsets

endOffsets()方法用来获取指定分区的末尾的消息位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
// consumer.seek(tp, offsets.get(tp));
consumer.seek(tp, offsets.get(tp) + 1);
}
System.out.println(assignment);
System.out.println(offsets);

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}

}

其中partitions参数表示分区集合,而timeout参数用来设置等待获取的超时时间。
如果没有指定timeout 参数的值,那么 endOffsets()方法的等待时间由客户端参数request.timeout.ms来设置,默认值为 30000。
与 endOffsets 对应的是 beginningOffsets()方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加,

beginningOffsets()方法中的参数内容和含义都与 endOffsets()方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。
其实KafkaConsumer中直接提供了seekToBeginning()方法和seekToEnd()方法来实现这两个功能

offsetsForTimes

有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。
此时我们无法直接使用seek()方法来追溯到相应的位置。
KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes()方法,通过timestamp来查询与此对应的分区位置。

storeOffsetToDB

消费位移可以保存在任意的存储介质中,例如数据库、文件系统等。以数据库为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
consumer.subscribe(Arrays.asList(topic));
//省略poll()方法以及assignment的逻辑
for(TopicPartition tp: assignment){
long offset = getOffsetFromDB(tp);//从DB中读取消费位移
consumer.seek(tp, offset);
}
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
//process the record.
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//将消费位移存储在DB中
storeOffsetToDB(partition, lastConsumedOffset+1);
}
}




再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。

不过在再均衡发生期间,消费组内的消费者是无法读取消息的。
也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。
另外当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。
比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。

一般情况下应尽量避免不必要的再均衡的发生。
那如何真有这样的情况发生,采取什么手段可以避免再均衡发生

subscribe()方法时提及再均衡监听器ConsumerRebalanceListener
subscribe(Collection<String> topics,ConsumerRebalanceListener listener)subscribe(Pattern pattern,ConsumerRebalanceListener listener)方法中都有它的身影。

再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作

1
2
3
4
5
6
7
8
9
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> var1);

void onPartitionsAssigned(Collection<TopicPartition> var1);

default void onPartitionsLost(Collection<TopicPartition> partitions) {
this.onPartitionsRevoked(partitions);
}
}

onPartitionsRevoked

这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。
可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。
参数partitions表示再均衡前所分配到的分区。

onPartitionsAssigned

这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。
参数partitions表示再均衡后所分配到的分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing.
}
});

try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//process the record.
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}

}


消费者拦截器

之前讲述了生产者拦截器的使用,对应的消费者也有相应的拦截器的概念。
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作

与生产者拦截器对应的,消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor

1
2
3
4
5
6
7
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);

void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

void close();
}

onConsume

KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作
比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。

onCommit

KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。

close

close()方法和ConsumerInterceptor的父接口中的configure()方法与生产者的ProducerInterceptor接口中的用途一样

案例

在某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效,它也就不需要再被继续处理了。
下面使用消费者拦截器来实现一个简单的消息TTL(Time to Live,即过期时间)的功能。

自定义的消费者拦截器ConsumerInterceptorTTL使用消息的 timestamp 字段来判定是否过期
如果消息的时间戳与当前的时间戳相差超过10秒则判定为过期,那么这条消息也就被过滤而不投递给具体的消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ConsumerInterceptorTTL implements
ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;

@Override
public ConsumerRecords<String, String> onConsume(
ConsumerRecords<String, String> records) {
System.out.println("before:" + records);
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
= new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) ->
System.out.println(tp + ":" + offset.offset()));
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}
}

实现自定义的ConsumerInterceptorTTL之后,需要在KafkaConsumer中配置指定这个拦截器
这个指定的配置和KafkaProducer中的一样,也是通过interceptor.classes参数实现的


多线程实现

KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。
KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException
KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个acquire()方法,只有wakeup()方法是个例外

acquire()方法和我们通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待
我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。
acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。

KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。
如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。
除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。
我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。

线程封闭

多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(props, topic).start();
}
}

public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;

public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//process record.
System.out.println(record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}

可以将consumerThreadNum设置成不大于分区数的值,如果不知道主题的分区数,那么也可以通过KafkaConsumer类的partitionsFor()方法来间接获取,进而再设置合理的consumerThreadNum值。

assign、seek

第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。
不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,笔者也并不推荐。

一般而言,分区是消费线程的最小划分单位。下面我们通过实际编码来演示第一种多线程消费实现的方式

处理多线程消费实现

如果这里对消息的处理非常迅速,那么 poll()拉取的频次也会更高,进而整体消费的性能也会提升;
相反如果在这里对消息的处理缓慢

如果我们通过一定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。
于是将处理消息模块改成多线程的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic,
Runtime.getRuntime().availableProcessors());
consumerThread.start();
}

public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
private int threadNumber;

public KafkaConsumerThread(Properties props, String topic, int threadNumber) {
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
this.threadNumber = threadNumber;
executorService = new ThreadPoolExecutor(threadNumber, threadNumber,
0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}

}

public static class RecordsHandler extends Thread {
public final ConsumerRecords<String, String> records;

public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}

@Override
public void run() {
//处理records.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}

第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了
这里引入一个共享变量offsets来参与提交

每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量offsets中
KafkaConsumerThread在每一次poll()方法之后都读取offsets中的内容并对其进行位移提交。
注意在实现的过程中对offsets读写需要加锁处理,防止出现并发问题。
并且在写入offsets的时候需要注意位移覆盖的问题,针对这个问题,可以将RecordHandler类中的run()方法实现改为如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for(TopicPartition tp:records.partitions()){
List<ConsumerRecord<String,String>> tpRecords = records.records(tp);
//处理tpRecords
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized(offsets){
if(!offsets.containsKey(tp)){
offsets.put(tp, new OffsetAndMetadata(tpRecords.size() + 1));
} else {
long position = offsets.get(tp).offset();
if(position < lastConsumerdOffset + 1){
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}
}
}
}

位移提交

1
2
3
4
5
6
synchronized(offsets){
if(!offsets.isEmpty()){
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}

其实这种位移提交的方式会有数据丢失的风险。
对于同一个分区中的消息,假设一个处理线程RecordHandler1正在处理offset为0~99的消息,而另一个处理线程RecordHandler2已经处理完了offset为100~199的消息并进行了位移提交
此时如果RecordHandler1发生异常,则之后的消费只能从200开始而无法再次消费0~99的消息,从而造成了消息丢失的现象。

这里虽然针对位移覆盖做了一定的处理,但还没有解决异常情况下的位移覆盖问题。
对此就要引入更加复杂的处理机制,这里再提供一种解决思路,总体结构上是基于滑动窗口实现的。


优化意见

对于第三种实现方式而言,它所呈现的结构是通过消费者拉取分批次的消息,然后提交给多线程进行处理
而这里的滑动窗口式的实现方式是将拉取到的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小
总体上而言没有太多的变化,不同的是对于消费位移的把控。

每一个方格代表一个批次的消息,一个滑动窗口包含若干方格,startOffset标注的是当前滑动窗口的起始位置,endOffset标注的是末尾位置。
每当startOffset指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来startOffset所指方格中对应的消息,并且拉取新的消息进入窗口。
滑动窗口的大小固定,所对应的用来暂存消息的缓存大小也就固定了,这部分内存开销可控。

方格大小和滑动窗口的大小同时决定了消费线程的并发数:一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高;
对于方格大小固定的情况,窗口越大并行度越高。不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常(比如Crash)的情况下也会引起大量的重复消费
同时还考虑线程切换的开销,建议根据实际情况设置一个合理的值,不管是对于方格还是窗口而言,过大或过小都不合适。

1
2
3
4
5
6
7
8
如果一个方格内的消息无法被标记为消费完成,那么就会造成 startOffset 的悬停。
为了使窗口能够继续向前滑动,那么就需要设定一个阈值,当 startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列

真实应用中无法消费的情况极少,一般是由业务代码的处理逻辑引起的
比如消息中的内容格式与业务处理的内容格式不符,无法对这条消息进行决断,这种情况可以通过优化代码逻辑或采取丢弃策略来避免。

如果需要消息高度可靠,也可以将无法进行业务逻辑的消息(这类消息可以称为死信)存入磁盘、数据库或Kafka
然后继续消费下一条消息以保证整体消费进度合理推进,之后可以通过一个额外的处理任务来分析死信进而找出异常的原因。




主题与分区

主题和分区是Kafka 的两个核心概念,前面章节中讲述的生产者和消费者的设计理念所针对的都是主题和分区层面的操作。

主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。
分区的划分不仅为Kafka提供了可伸缩性、水平扩展的功能,还通过多副本机制来为Kafka提供数据冗余以提高数据可靠性。
从Kafka的底层实现来说,主题和分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment)
每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。

  • 主题的管理
  • 初识KafkaAdminClient
  • 分区的管理
  • 分区数的选择

主题的管理

主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。
可以通过 Kafka提供的kafka-topics.sh 脚本来执行这些操作,这个脚本位于$KAFKA_HOME/bin/目录下
可以看到其实质上是调用了kafka.admin.TopicCommand类来执行主题管理的操作。

创建主题

如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true)
那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。

除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitionsdefault.replication.factor的值来创建一个相应的主题。
很多时候,这种自动创建主题的行为都是非预期的。
除非有特殊应用需求,否则不建议将auto.create.topics.enable参数设置为true,这个参数会增加主题的管理与维护的难度。
更加推荐也更加通用的方式是通过kafka-topics.sh脚本来创建主题。

上面的示例中创建了一个分区数为 4、副本因子为 2

一共有3个broke

1
2
3
node1 broke0
node2 broke1
node3 broke2

Kafka会在log.dir或log.dirs参数所配置的目录下创建相应的主题分区,默认情况下这个目录为/tmp/kafka-logs/

node1 节点中创建了 2 个文件夹 topic-create-0 和 topic-create-1

这里也只是看到了2个分区,而我们创建的是4个分区,其余2个分区被分配到了node2和node3节点中

查看主题

kafka-topics.sh脚本有5种指令类型:createlistdescribealterdelete
其中listdescribe指令可以用来方便地查看主题信息

describe

describe指令来查看单个主题信息的,如果不使用--topic指定主题,则会展示出所有主题的详细信息。
--topic还支持指定多个主题:--topic a,b,c

在使用 describe 指令查看主题信息时还可以额外指定topics-with-overridesunder-replicated-partitionsunavailable-partitions这三个参数来增加一些附加功能。

增加topics-with-overrides参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用topics-with-overrides参数时只显示原本只使用describe指令的第一行信息
under-replicated-partitionsunavailable-partitions参数都可以找出有问题的分区。
通过under-replicated-partitions参数可以找出所有包含失效副本的分区。


修改主题

alter

当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的alter指令提供的。

1
2
目前Kafka只支持增加分区数而不支持减少分区数。
比如我们再将主题topic-config的分区数修改为1,就会报出InvalidPartitionException的异常

为什么不支持减少分区?

实现此功能需要考虑的因素很多,比如删除的分区中的消息该如何处理?
如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。
直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;
如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源

而且在复制期间,此主题的可用性又如何得到保障?
与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。
反观这个功能的收益点却是很低的,如果真的需要实现此类功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。

在创建主题时有一个if-not-exists参数来忽略一些异常,在这里也有对应的参数
如果所要修改的主题不存在,可以通过 if-exists 参数来忽略异常

除了修改分区数,我们还可以使用kafka-topics.sh脚本的alter指令来变更主题的配置。
在创建完主题之后,我们还可以通过alter指令配合config参数增加或修改一些配置以覆盖它们配置原有的值。



删除主题

如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。
kafka-topics.sh脚本中的delete指令就可以用来删除主题,比如删除一个主题topic-delete

可以看到在执行完删除命令之后会有相关的提示信息,这个提示信息和broker端配置参数delete.topic.enable 有关。
必须将delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。


配置管理

kafka-configs.sh 脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。
kafka-configs.sh脚本包含变更配置alter和查看配置describe这两种指令类型。

同使用kafka-topics.sh脚本变更配置的原则一样,增、删、改的行为都可以看作变更操作,不过kafka-configs.sh脚本不仅可以支持操作主题相关的配置,还可以支持操作broker、用户和客户端这3个类型的配置。
kafka-configs.sh脚本使用entity-type参数来指定操作配置的类型,并且使用entity-name参数来指定操作配置的名称。

比如查看主题topic-config的配置可以按如下方式执行

add-config演示

delete-config演示

Zookeeper上的一些变化

1
2
3
4
5
使用kafka-configs.sh脚本来变更(alter)配置时
会在ZooKeeper中创建一个命名形式为/config/<entity-type>/<entity-name>

变更配置时还会在ZooKeeper中的/config/changes/节点下创建一个以“config_change_”为前缀的持久顺序节点(PERSISTENT_SEQUENTIAL)
节点命名形式可以归纳为/config/changes/config_change_<seqNo>。


主题端参数

主题端参数 释义 对应的broker端参数
cleanup.policy 日志压缩策略。默认值为delete,还可以配置为compact log.cleanup.policy
compression.type 消息的压缩类型。默认值为producer,表示保留生产者中所使用的原始压缩类型。
还可以配置为uncompressed、snappy、lz4、gzip
compression.typc
delete.retention.ms 被标识为删除的数据能够保留多久。默认值为86400000,即1天 log.cleaner.delete.retention.ms
file.delete.delay.ms 清理文件之前可以等待多长时间,默认值为60000,即1分钟 log.segment.delete.delay.ms
flush.messages 需要收集多少消息才会将它们强制刷新到磁盘,默认值为Long.MAX_VALUE
即让操作系统来决定。建议不要修改此参数的默认值
log.flush.interval.messages
flush.ms 需要等待多久才会将消息强制刷新到磁盘,默认值为Long.MAX_VALUE
即让操作系统来决定。建议不要修改此参数的默认值
log.flush.interval.ms
follower.replication.throttled.replicas 用来配置被限制速率的主题所对应的follower副本列表 follower.replication.throttled.replicas
index.interval.bytes 用来控制添加索引项的须率。每超过这个参数所设置的消息字节数时就可以添加一个新的索引项,默认值为4096 log.index.interval.bytes
leader.replication.throttled.replicas 用来配置被限制速率的主题所对应的leader副本列表 leader.replication.throttled.replicas
max.message.bytes 消息的最大字节数,默认值为1000012 message.max.bytes
message.format.version 消息格式的版本,默认值为2.0-IV1 log.message.format.version
message.timestamp.diffcrence.max.ms 消息中自带的时间戳与broker收到消息时的时间戳之间最大的差值,默认值为Long.MAX_VALUE
此参数只有在meesage.timestamp.type参数设置为CreateTime时才有效
log.message.timestamp.difference.max.ms
message.timestamp.type 消息的时间戳类型。默认值为CreateTime,还可以设置为LogAppendTime log.message.timestamp.type
min.cleanable.dirty.ratio 日志清理时的最小污浊率,默认值为0.5 log.cleaner.min.cleanable.ratio
min.compaction.lag.ms 日志再被清理前的最小保留时间,默认值为0 log.cleaner.min.compaction.lag.ms
min.insync.replicas 分区ISR集合中至少要有多少个副本,默认值为l min.insync.replicas
prcallocate 在创建日志分段的时候是否要预分配空间,默认值为false log.prcallocatc
retention.bytes 分区中所能保留的消息总量,默认值为-1, 即没有限制 log.retention.bytes
retention.ms 使用deletc的日志清理策略时消息能够保留多长时间,默认值为604800000,即7天。
如果设置为-1,则表示没有限制
log.retention.ms
segment.bytes 日志分段的最大值,默认值为1073741824,即1GB log.segment.bytes
segment.index.bytes 日志分段索引的最大值,默认值为10485760,即10MB log.index.size.max.bytes
segmcnt.jitter.ms 滚动日志分段时,在 segment.ms 的基础之上增加的随机数,默认为0 log.roll.jitter.ms
segment.ms 最长多久滚动一次日志分段,默认值为604800000,即7天 log.roll.ms
unclean.leader.election.enable 是否可以从非ISR集合中选举leader副本,默认值为falsc,如果设置为true,则可能造成数据丢失 unclean.leader.election.enable




kafka-topics脚本

zookeeper、patitions、replication-factory、topic
  • kafka-topics.sh脚本中的zookeeperpartitionsreplication-factortopic这4个参数分别代表ZooKeeper连接地址、分区数、副本因子和主题名称。
1
2
3
4
5
6
7
8
创建主题时对于主题名称的命名方式也很有讲究。首先是不能与已经存在的主题同名,如果创建了同名的主题就会报错

创建主题时还会检测是否包含“.”或“_”字符。
为什么要检测这两个字符呢?
因为在Kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将点号“.”改成下画线“_”。
假设遇到一个名称为“topic.1_2”的主题,还有一个名称为“topic_1.2”的主题,那么最后的metrics的名称都会为“topic_1_2”,这样就发生了名称冲突。

以双下画线开头的主题一般看作Kafka的内部主题,比如__consumer_offsets和__transaction_state
list、describe、alter、delete
  • kafka-topics.sh脚本中create参数表示的是创建主题的指令类型, 对应的还有listdescribealterdelete这4个同级别的指令类型,每个类型所需要的参数也不尽相同。

接下来的示例

replica-assignment
  • kafka-topics.sh脚本中还提供了一个replica-assignment参数来手动指定分区副本的分配方案

1
2
3
注意同一个分区内的副本不能有重复,比如指定了0:0,1:1这种,就会报出AdminCommand-FailedException异常
如果分区之间所指定的副本数不同,比如0:1,0,1:0这种,就会报出AdminOperationException异常
类似0:1,0:1,1:0这种企图跳过一个分区的行为也是不被允许的
config

创建主题时我们还可以通过config参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。
在创建主题时可以同时设置多个参数

机架

Kafka从0.10.x版本开始支持指定broker的机架信息
指定机架信息是通过broker端参数broker.rack来配置的,比如配置当前broker所在的机架为“RACK1”:

1
broke.rack = RACK1

如果一个集群中有部分broker指定了机架信息,并且其余的broker没有指定机架信息
那么在执行kafka-topics.sh脚本创建主题时会报出的AdminOperationException

此时若要成功创建主题,要么将集群中的所有broker都加上机架信息或都去掉机架信息,要么使用disable-rack-aware参数来忽略机架信息


日志和副本的关系

主题、分区、副本和 Log(日志)的关系如上图,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。
同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。

1
2
对于示例中的分区数为4、副本因子为2、broker数为3的情况下,按照2、3、3的分区副本个数分配给各个broker是最优的选择。
再比如在分区数为3、副本因子为3,并且broker数同样为3的情况下,分配3、3、3的分区副本个数给各个broker是最优的选择,也就是每个broker中都拥有所有分区的一个副本。

在ZooKeeper的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案




分区副本的分配

生产者的分区分配是指为每条消息指定其所要发往的分区,消费者中的分区分配是指为消费者指定其可以消费消息的分区
而这里的分区分配是指为集群制定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。

在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;
如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案了。

创建主题时的内部分配逻辑按照机架信息划分成两种策略

  • 未指定机架信息和指定机架信息。如果集群中所有的 broker 节点都没有配置broker.rack参数,或者使用disable-rack-aware参数来创建主题,那么采用的就是未指定机架信息的分配策略,
  • 否则采用的就是指定机架信息的分配策略。
未指定机架

未指定机架信息的分配策略比较容易理解,这里通过源码来逐一进行分析。
所对应的具体实现为kafka.admin.AdminUtils.scala文件中的assignReplicasToBrokersRackUnaware

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,        // 分区数
replicationFactor: Int, // 副本因子
brokerList: Iterable[Int], // 集群中的broke列表
fixedStartIndex: Int, // 起始索引,即第一个副本分配的位置
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]() // 保存分配的结果
val brokerArray = brokerList.toArray // broke的列表

//如果startIndex小于0, 根据broke列表长度随机生成一个, 以此保证是有效的brikeId
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

// 确保起始分区号不小于0
var currentPartitionId = math.max(0, startPartitionId)

// 指定了副本的间隔, 目的是为了均匀的得到副本分配到不同的broke上
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

// 轮询所有分区,将每个分区的副本分配到不同的broke上
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1

val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

// 保存该分区所有副本分配的broke集合
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

// 保存该分区所有副本的分配信息
ret.put(currentPartitionId, replicaBuffer)

// 继续位下一个分区分配副本
currentPartitionId += 1
}
ret
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
assignReplicasToBrokersRackUnaware()方法的核心是遍历每个分区 partition
然后从 brokerArray (brokerId 的列表)中选取replicationFactor个brokerId分配给这个partition。

该方法首先创建一个可变的Map用来存放该方法将要返回的结果,即分区partition和分配副本的映射关系。
由于fixedStartIndex为-1,所以startIndex是一个随机数,用来计算一个起始分配的brokerId,同时又因为startPartitionId为-1
所以currentPartitionId的值为0,可见默认情况下创建主题时总是从编号为0的分区依次轮询进行分配。

nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,从字面上理解有点绕口。

举个例子
假设集群中有3个broker节点,对应于代码中的brokerArray
创建的某个主题中有3个副本和6个分区,那么首先从partitionId(partition的编号)为0的分区开始进行分配

假设第一次计算(由rand.nextInt(brokerArray.length)随机产生)得到的nextReplicaShift值为1,第一次随机产生的startIndex 值为 2
那么 partitionId 为 0 的第一个副本的位置(这里指的是brokerArray的数组下标)firstReplicaIndex=(currentPartitionId+startIndex)%brokerArray.length=(0+2)%3=2,第二个副本的位置为replicaIndex(firstReplicaIndex,nextReplicaShift,j,brokerArray.length)=replicaIndex(2,nextReplicaShift+1,0,3)=?,
这里引入了一个新的方法replicaIndex()

继续计算 replicaIndex(2,nextReplicaShift+1,0,3)=replicaIndex(2,2,0,3)=(2+(1+(2+0)%(3-1)%3=0。
继续计算下一个副本的位置replicaIndex(2,2,1,3)=(2+(1+(2+1)%(3-1)%3=1。所以partitionId为0的副本分配位置列表为[2,0,1]
如果brokerArray正好是从0开始编号的,也正好是顺序不间断的,即brokerArray为[0,1,2],那么当前partitionId为0的副本分配策略为[2,0,1]。
如果brokerId不是从0开始的,也不是顺序的(有可能之前集群的其中几个broker下线了)

最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单假设brokerArray就是[0,1,2]。

同样计算下一个分区,即partitionId为1的副本分配策略。
此时nextReplicaShift的值还是2,没有满足自增的条件。这个分区的 firstReplicaIndex=(1+2)%3=0。
第二个副本的位置replicaIndex(0,2,0,3)=(0+(1+(2+0)%(3-1)%3=1
第三个副本的位置replicaIndex(0,2,1,3)=2,最终partitionId为2的分区分配策略为[0,1,2]。
指定机架

相比较而言,指定机架信息的分配策略比未指定机架信息的分配策略要稍微复杂一些
但主体思想并没相差很多,只是将机架信息作为附加的参考项。

假设目前有3个机架rack1、rack2和rack3,Kafka集群中的9个broker点都部署在这3个机架之上,机架与broker节点的对照关系如下:

1
2
3
4
5
6
7
指定基架信息的assignReplicasToBrokersRackAware()方法里的brokerArray的值在这里就会被转换为[0,3,6,1,4,7,2,5,8]
显而易见,这是轮询各个机架而产生的结果,如此新的brokerArray(确切地说是arrangedBrokerList)中包含了简单的机架分配信息。
之后的步骤也和assignReplicasToBrokersRackUnaware()方法类似,同样包含startIndex、currentPartiionId、nextReplicaShift 的概念,循环为每一个分区分配副本。
分配副本时除了处理第一个副本,其余的也调用 replicaIndex()方法来获得一个 broker

但这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单地将这个broker添加到当前分区的副本列表之中
还要经过一层筛选,满足以下任意一个条件的broker不能被添加到当前分区的




分区的管理

主要介绍与分区相关的知识和操作,包括优先副本的选举、分区重分配、复制限流、修改副本因子等内容。

优先副本的选举

分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。
如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用,此时就需要Kafka从剩余的follower副本中挑选一个新的leader副本来继续对外提供服务。
虽然不够严谨,但从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。

我们可以将leader副本所在的broker节点叫作分区的leader节点,而follower副本所在的broker节点叫作分区的follower节点。

1
2
3
比如我们将brokerId为2的节点重启
可以看到原本分区1的leader节点为2,现在变成了0,如此一来原本均衡的负载变成了失衡
节点0的负载最高,而节点1的负载最低。

为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferredreplica)的概念。

1
2
3
4
5
所谓的优先副本是指在 AR 集合列表中的第一个副本。
比如上面主题 topic-partitions 中分区 0的AR集合列表(Replicas)为[1,2,0],那么分区0的优先副本即为1。
理想情况下优先副本就是该分区的leader副本,所以也可以称之为preferred leader。
Kafka要确保所有主题的优先副本在Kafka集群中均匀分布,这样就保证了所有分区的leader均衡分布。
如果leader分布过于集中,就会造成集群负载不均衡。

所谓的优先副本的选举是指通过一定的方式促使优先副本选举为leader副本,以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。

在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是auto.leader.rebalance.enable,此参数的默认值为true

1
2
3
4
5
即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则 Kafka 的控制器会启动一个定时任务
这个定时任务会轮询所有的 broker节点
计算每个broker节点的分区不平衡率(broker中的不平衡率=非优先副本的leader个数/分区总数)
是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为 10%
如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。

执行周期由参数leader.imbalance.check.interval.seconds控制,默认值为300秒,即5分钟。

1
2
3
不过在生产环境中不建议将auto.leader.rebalance.enable设置为默认的true
因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控
如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险

Kafka中kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能。

优先副本的选举过程是一个安全的过程,Kafka客户端可以自动感知分区leader副本的变更。

分区重分配

1
2
3
4
5
当集群中的一个节点突然宕机下线时,如果节点上的分区是单副本的,那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态

如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他follower副本中。

这个节点上的分区副本都已经处于功能失效的状态,Kafka 并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节点上
  • 下线节点

当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。

  • 新增节点

当集群中新增broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,而之前的主题分区并不会自动分配到新加入的节点中,因为在它们被创建时还没有这个新节点,这样新节点的负载和原先节点的负载之间严重不均衡。

为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。
Kafka提供了kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。

kafka-reassign-partitions脚本

kafka-reassign-partitions.sh脚本的使用分为 3 个步骤:

  • 首先创建需要一个包含主题清单的JSON 文件
  • 其次根据主题清单和 broker 节点清单生成一份重分配方案
  • 最后根据这份方案执行具体的重分配动作。
1
首先在一个由3个节点(broker 0、broker 1、broker 2)组成的集群中创建一个主题topic-reassign,主题中包含4个分区和2个副本

1
2
由于某种原因,我们想要下线brokerId为1的broker节点
在此之前我们要做的就是将其上的分区副本迁移出去。

使用kafka-reassign-partitions.sh

第一步就是要创建一个JSON文件,文件内容为要进行分区重分配的主题清单

第二步就是根据这个JSON文件和指定所要分配的broker节点列表来生成一份候选的重分配方案

第三步执行具体的重分配动作

最后的结果

分区重分配的基本原理

1
2
3
4
先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的leader副本那里复制所有的数据。
根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。
在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。
注意在重分配的过程中要确保有足够的空间。

细心的可能观察到主题topic-reassign中有3个leader副本在broker 0上,而只有1个leader副本在broker 2上,这样负载就不均衡了。

不过我们可以借助kafka-perferred-replica-election.sh 脚本来执行一次优先副本的选举动作,之后可以看到主题 topic-reassign 的具体信息已经趋于完美:

复制限流

修改副本因子