Spring集成kafka
大约 3 分钟开源框架消息队列kafka
示例
引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
生产者配置
server: port: 8081 spring: application: name: kafka-producer kafka: ## 多个使用逗号分割 bootstrap-servers: 192.168.1.110:9092 producer: # 消息重发的次数。配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0 retries: 1 #一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all #acks: all #事务id #transaction-id-prefix: xw-tran
消费者配置
server: port: 8080 spring: application: name: kafka-consumer kafka: ## 多个使用逗号分割 bootstrap-servers: 192.168.1.110:9092 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: #手工ack,调用ack后立刻提交offset ack-mode: manual_immediate #容器运行的线程数 concurrency: 4
生产者发送
@Test public void sendWithCallBack() throws ExecutionException, InterruptedException { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "111111111111111111"); future.addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); SendResult<String, String> result = future.get(); }
消费端接收
@Component public class ConsumeListener { @KafkaListener(topics = {"topic"},groupId = "test1") public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){ // 打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); ack.acknowledge(); } }
启动consumer服务,发送消息测试,结果如下:
事务消息
在上述示例上修改配置,添加事务id 如下:
server:
port: 8081
spring:
application:
name: kafka-producer
kafka:
## 多个使用逗号分割
bootstrap-servers: 192.168.1.110:9092
producer:
# 消息重发的次数。配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
retries: 1
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
#acks: all
#事务id
transaction-id-prefix: xw-tran
注解式事务消息:
@GetMapping("/kafka/transaction1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage1(int i) {
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1 i="+i);
if (i == 0) {
throw new RuntimeException("fail");
}
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2 i="+i);
}
声明式:
@Test
public void sendInTransaction() throws ExecutionException, InterruptedException {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
int i=0;
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send("topic","这个是事务里面的消息:1 i="+i);
if(i==0)
{
throw new RuntimeException("input is error");
}
kafkaOperations.send("topic","这个是事务里面的消息:2 i="+i);
return true;
}
});
}