跳至主要內容

Spring集成kafka

xw大约 3 分钟开源框架消息队列kafka

示例

  1. 引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
  2. 生产者配置

    server:
        port: 8081
    spring:
        application:
            name: kafka-producer
        kafka:
            ## 多个使用逗号分割
            bootstrap-servers: 192.168.1.110:9092
            producer:
                # 消息重发的次数。配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
                retries: 1
                #一个批次可以使用的内存大小
                batch-size: 16384
                # 设置生产者内存缓冲区的大小。
                buffer-memory: 33554432
                # 键的序列化方式
                key-serializer: org.apache.kafka.common.serialization.StringSerializer
                # 值的序列化方式
                value-serializer: org.apache.kafka.common.serialization.StringSerializer
                #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
                #acks: all
                #事务id
                #transaction-id-prefix: xw-tran
    
  3. 消费者配置

    server:
        port: 8080
    spring:
        application:
            name: kafka-consumer
        kafka:
            ## 多个使用逗号分割
            bootstrap-servers: 192.168.1.110:9092
            consumer:
                # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
              auto-commit-interval: 1S
              # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
              auto-offset-reset: earliest
              # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
              enable-auto-commit: false
              # 键的反序列化方式
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              # 值的反序列化方式
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            listener:
              #手工ack,调用ack后立刻提交offset
              ack-mode: manual_immediate
              #容器运行的线程数
              concurrency: 4
    
  4. 生产者发送

        @Test
        public void sendWithCallBack() throws ExecutionException, InterruptedException {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "111111111111111111");
            future.addCallback(success -> {
                // 消息发送到的topic
                String topic = success.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = success.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = success.getRecordMetadata().offset();
                System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
            }, failure -> {
                System.out.println("发送消息失败:" + failure.getMessage());
            });
            SendResult<String, String> result = future.get();
        }
    
  5. 消费端接收

    @Component
    public class ConsumeListener {
    
        @KafkaListener(topics = {"topic"},groupId = "test1")
        public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
            // 打印出消息内容
            System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
            ack.acknowledge();
        }
    }
    
  6. 启动consumer服务,发送消息测试,结果如下:

    image-20221018234100473

事务消息

在上述示例上修改配置,添加事务id 如下:

server:
    port: 8081
spring:
    application:
        name: kafka-producer
    kafka:
        ## 多个使用逗号分割
        bootstrap-servers: 192.168.1.110:9092
        producer:
            # 消息重发的次数。配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
            retries: 1
            #一个批次可以使用的内存大小
            batch-size: 16384
            # 设置生产者内存缓冲区的大小。
            buffer-memory: 33554432
            # 键的序列化方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            # 值的序列化方式
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
            #acks: all
            #事务id
            transaction-id-prefix: xw-tran

注解式事务消息:

    @GetMapping("/kafka/transaction1")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage1(int i) {
        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1  i="+i);
            if (i == 0) {
                throw new RuntimeException("fail");
            }
        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2  i="+i);

    }

声明式:

    @Test
    public void sendInTransaction() throws ExecutionException, InterruptedException {
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
            int i=0;
            @Override
            public Object doInOperations(KafkaOperations kafkaOperations) {
                kafkaOperations.send("topic","这个是事务里面的消息:1  i="+i);
                if(i==0)
                {
                    throw new RuntimeException("input is error");
                }
                kafkaOperations.send("topic","这个是事务里面的消息:2  i="+i);
                return true;
            }
        });

    }

源代码地址: https://gitee.com/yllhq/kafka-demoopen in new window