kafka生产消费
生产者
发送流程
生产者客户端由主线程和Sender线程运行,在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。RecordAccumulator缓存消息可以进行批量发送,进而减少网络传输的资源消耗提升性能。
参数
bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2。
key.serializer:序列化操作的序列化器,如
org.apache.kafka.common.serialization.StringSerializer
value.serializer:序列化操作的序列化器,如
org.apache.kafka.common.serialization.StringSerializer
acks
acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应
acks=0。生产者发送消息之后不需要等待任何服务端的响应。
acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
::: warn
注意acks参数配置的值是一个字符串类型,而不是整数类型。
:::
max.request.size:这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即1MB
retries:用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。
retry.backoff.ms: 参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
compression.type:默认值为
none
,即默认情况下,消息不会被压缩。该参数还可以配置为gzip,snappy
和lz4
。connections.max.idle.ms:用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。
linger.ms:生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
receive.buffer.bytes:这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。
send.buffer.bytes:用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms:Producer等待请求响应的最长时间,默认值为30000(ms)。
消费者
消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每一个分区只能被一个消费组中的一个消费者所消费。
消费策略
kafka通过partition.assignment.strategy
参数来设置消费者与主题之间的分区分配策略,默认情况下使用RangeAssignor分配策略。
RangeAssignor分配策略
RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。核心代码如下所示:
RoundRobinAssignor分配策略
RoundRobinAssignor分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor分配策略对应的 partition.assignment.strategy 参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor
。分配核心代码如下:
StickyAssignor分配策略
StickyAssignor分配策略,“sticky”这个单词可以翻译为“黏性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
- 分区的分配要尽可能均匀。
- 分区的分配尽可能与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor分配策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂得多。
自定义分区分配策略
实现org.apache.kafka.clients.consumer.internals.PartitionAssignor
接口
PartitionAssignor接口中定义了两个内部类:Subscription和Assignment。Subscription类用来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者的订阅主题列表和用户自定义信息。PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription 信息,注意到此方法中只有一个参数 topics,与Subscription类中的topics的相呼应,但并没有体现有关userData的参数。为了增强用户对分配结果的控制,可以在 subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如权重、IP地址、host或机架(rack)等。
Assignment类,它用来表示分配结果信息,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和用户自定义的数据。PartitionAssignor接口中的onAssignment()方法是在每个消费者收到消费组leader 分配结果时的回调函数。