消息发送
数据生产流程解析
- Producer创建时,会创建一个Sender线程并设置为守护线程。
- 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消 息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
- 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算 哪个
- 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且 失败原因允许重试,那么客户端内部会对该消息进行重试。
- 落盘到broker成功,返回生产元数据给生产者。
- 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。
必要参数配置
broker配置
1. 配置条目的使用方式:
2. 配置参数:
序列化器
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作用就是用于序列化要发送的消息的。
Kafka使用 org.apache.kafka.common.serialization.Serializer
接口用于定义序列化器,将 泛型指定类型的数据转换为字节数组。
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
* 将对象转换为byte数组的接口
*
* 该接口的实现类需要提供无参构造器
* @param <T> 从哪个类型转换
*/
public interface Serializer<T> extends Closeable {
/**
* 类的配置信息
* @param configs key/value pairs
* @param isKey key的序列化还是value的序列化
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* 将对象转换为字节数组
*
* @param topic 主题名称
* @param data 需要转换的对象
* @return 序列化的字节数组
*/
byte[] serialize(String topic, T data);
/**
* 关闭序列化器
* 该方法需要提供幂等性,因为可能调用多次。
*/
@Override
void close();
}
系统提供了该接口的子接口以及实现类: org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer
自定义序列化器
数据的序列化一般生产中使用avro。
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其 中的 serialize
方法。
案例:
实体类:
package com.lagou.kafka.demo.entity;
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
序列化类:
package com.lagou.kafka.demo.serializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, User data) {
try {
// 如果数据是null,则返回null
if (data == null) return null;
Integer userId = data.getUserId();
String username = data.getUsername();
int length = 0;
byte[] bytes = null;
if (null != username) {
bytes = username.getBytes("utf-8");
length = bytes.length;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(userId);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("序列化数据异常");
}
}
@Override
public void close() {
// do nothing
}
}
生产者:
package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serializer.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// 设置自定义的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String,
User>(configs);
User user = new User();
user.setUserId(1001);
user.setUsername("张三");
ProducerRecord<String, User> record = new ProducerRecord<>(
"tp_user_01",
0,
user.getUsername(),
user
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("消息发送成功:"
+ metadata.topic() + "\t"
+ metadata.partition() + "\t"
+ metadata.offset());
} else {
System.out.println("消息发送异常");
}
});
// 关闭生产者
producer.close();
}
}
分区器
默认(DefaultPartitioner)分区计算:
- 如果record提供了分区号,则使用record提供的分区号
- 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
- 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。
- 会首先在可用的分区中分配分区号
- 如果没有可用的分区,则在该主题所有分区中分配分区号。
如果要自定义分区器,则需要
- 首先开发Partitioner接口的实现类
- 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
位于 org.apache.kafka.clients.producer
中的分区器接口:
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;
/**
* 分区器接口
*/
public interface Partitioner extends Configurable, Closeable {
/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster);
/**
* 关闭分区器的时候调用该方法
*/
public void close();
}
包 org.apache.kafka.clients.producer.internals
中分区器的默认实现:
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* 默认的分区策略:
*
* 如果在记录中指定了分区,则使用指定的分区
* 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
* 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap =
new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* 为指定的消息记录计算分区值
*
* @param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {
// 获取指定主题的所有分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 分区的数量
int numPartitions = partitions.size();
// 如果没有提供key
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) %
availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available
partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
return Utils.toPositive(Utils.murmur2(keyBytes)) %
numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new
AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter =
topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
可以实现Partitioner接口自定义分区器:
然后在生产者中配置:
拦截器
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要 用于实现Client端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条 消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
- onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线 程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息 做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
- onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发 送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在 Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发 送效率。
- close:关闭Interceptor,主要用于执行一些资源清理工作。
如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。 另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个 Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
自定义拦截器:
- 实现ProducerInterceptor接口
- 在KafkaProducer的设置中设置自定义的拦截器
案例:
1. 消息实体类:
package com.lagou.kafka.demo.entity;
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
2. 自定义序列化器
package com.lagou.kafka.demo.serializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, User data) {
try {
// 如果数据是null,则返回null
if (data == null) return null;
Integer userId = data.getUserId();
String username = data.getUsername();
int length = 0;
byte[] bytes = null;
if (null != username) {
bytes = username.getBytes("utf-8");
length = bytes.length;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(userId);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("序列化数据异常");
}
}
@Override
public void close() {
// do nothing
}
}
3. 自定义分区器
package com.lagou.kafka.demo.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {
return 2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4. 自定义拦截器1
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorOne<KEY, VALUE> implements ProducerInterceptor<KEY,
VALUE> {
private static final Logger LOGGER =
LoggerFactory.getLogger(InterceptorOne.class);
@Override
public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE>
record) {
System.out.println("拦截器1---go");
// 此处根据业务需要对相关的数据作修改
String topic = record.topic();
Integer partition = record.partition();
Long timestamp = record.timestamp();
KEY key = record.key();
VALUE value = record.value();
Headers headers = record.headers();
// 添加消息头
headers.add("interceptor", "interceptorOne".getBytes());
ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,
VALUE>(
topic,
partition,
timestamp,
key
value,
headers
);
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
System.out.println("拦截器1---back");
if (exception != null) {
// 如果发生异常,记录日志中
LOGGER.error(exception.getMessage());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
5. 自定义拦截器2
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorTwo<KEY, VALUE> implements ProducerInterceptor<KEY,
VALUE> {
private static final Logger LOGGER =
LoggerFactory.getLogger(InterceptorTwo.class);
@Override
public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE>
record) {
System.out.println("拦截器2---go");
// 此处根据业务需要对相关的数据作修改
String topic = record.topic();
Integer partition = record.partition();
Long timestamp = record.timestamp();
KEY key = record.key();
VALUE value = record.value();
Headers headers = record.headers();
// 添加消息头
headers.add("interceptor", "interceptorTwo".getBytes());
ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,
VALUE>(
topic,
partition,
timestamp,
key,
value,
headers
);
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
System.out.println("拦截器2---back");
if (exception != null) {
// 如果发生异常,记录日志中
LOGGER.error(exception.getMessage());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
6. 自定义拦截器3
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorThree<KEY, VALUE> implements
ProducerInterceptor<KEY, VALUE> {
private static final Logger LOGGER =
LoggerFactory.getLogger(InterceptorThree.class);
@Override
public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE>
record) {
System.out.println("拦截器3---go");
// 此处根据业务需要对相关的数据作修改
String topic = record.topic();
Integer partition = record.partition();
Long timestamp = record.timestamp();
KEY key = record.key();
VALUE value = record.value();
Headers headers = record.headers();
// 添加消息头
headers.add("interceptor", "interceptorThree".getBytes());
ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,
VALUE>(
topic,
partition,
timestamp,
key,
value,
headers
);
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
System.out.println("拦截器3---back");
if (exception != null) {
// 如果发生异常,记录日志中
LOGGER.error(exception.getMessage());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
7. 生产者
package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serializer.UserSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
// 设置自定义分区器
// configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
MyPartitioner.class);
configs.put("partitioner.class",
"com.lagou.kafka.demo.partitioner.MyPartitioner");
// 设置拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.lagou.kafka.demo.interceptor.InterceptorOne," +
"com.lagou.kafka.demo.interceptor.InterceptorTwo,"
+
"com.lagou.kafka.demo.interceptor.InterceptorThree"
);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// 设置自定义的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String,
User>(configs);
User user = new User();
user.setUserId(1001);
user.setUsername("张三");
ProducerRecord<String, User> record = new ProducerRecord<>(
"tp_user_01",
0,
user.getUsername(),
user
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("消息发送成功:"
+ metadata.topic() + "\t"
+ metadata.partition() + "\t"
+ metadata.offset());
} else {
System.out.println("消息发送异常");
}
});
// 关闭生产者
producer.close();
}
}
8. 运行结果:
原理剖析
由上图可以看出:KafkaProducer有两个基本线程:
- 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器 RecoderAccumulator中;
- 消息收集器RecoderAccumulator为每个分区都维护了一个 Deque 类型的双端队列。
- ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐 量,降低网络影响;
- 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护 了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利 用。
- 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取 一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则 写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户 端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch, 这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。
- Sender线程:
- 该线程从消息收集器获取缓存的消息,将其处理为 的形式, Node 表示集群的broker节点。
- 进一步将转化为形式,此时才可以 向服务端发送数据。
- 在发送之前,Sender线程将消息以 Map> 的形式保存 到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前 Node中负载压力最小的一个,以实现消息的尽快发出。
生产者参数配置补充
1. 参数设置方式:
2. 补充参数: