文章

SpringBoot 集成 Kafka

前言

Kafka 是一个高吞吐、分布式、基于发布订阅的消息系统。常用于日志收集、消息系统、用户活动追踪、运营指标监控,大数据实时计算等场景。

主要概念:

  • Broker:Kafka 集群中的服务器,可以有多台服务器,用来存储消息。
  • Topic:消息的集合,可以有多个 Topic,每个 Topic 可以有多个分区,每个分区可以有多个副本。
  • Partition:消息的分区,每个分区是一个队列,用来存储消息。
  • Replication:副本,用来保证消息的高可用。
  • Consumer Group:消费者组,用来消费消息。
  • Offset:偏移量,用来标识消息的位置。
  • Producer:生产者,用来生产消息。
  • Zookeeper:Kafka 的协调者,用来管理集群(KIP-500 提案中计划移除,计划使用 Raft 算法代替其功能)

引入依赖

核心依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

完整依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.phixlin</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

</project>

创建配置类

配置参数具体的作用可以查看源码中的注释。主要注意的是,KafkaTemplate 的类型为 <Integer,String>,我们可以找 KafkaTemplate 的 send 方法,有多个重载方法,其中有个方法如下,key 和 data 参数都为泛型,这其实就是对应着 KafkaTemplate<Integer,String>。具体可以看源码(org.apache.kafka.clients.producer.KafkaProducer#doSend)。目的是用于将消息发送到不同分区,如果不想手动指定发送到哪个分区,则可以利用 key 去实现。这里 key 是 Integer 类型,template 会根据 key 路由到对应的 partition 中,如果 key 存在对应的 partitionID 则发送到该 partition 中,否则由算法选择发送到哪个 partition。

实现顺序

  1. 创建消费者和生产者配置类
  2. 创建消费者和生产者配置 Map
  3. 根据配置 Map 创建生产者工厂(ProducerFactory)、消费者工厂(ConsumerFactory)
  4. 根据消费者工厂(DefaultKafkaConsumerFactory)创建消费监听容器工厂(KafkaListenerContainerFactory)
    根据生产者工厂(DefaultKafkaProducerFactory)创建生产者模板(KafkaTemplate)
  5. 根据消费监听容器工厂(KafkaListenerContainerFactory)创建消费者监听器(KafkaMessageListenerContainer)
package cn.phixlin.conf;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @author phixlin
 * @description
 * @date 2024/11/6 14:54
 */
@Configuration
public class KafkaConf {

    @Primary
    @ConfigurationProperties(prefix = "spring.kafka.line-deviate")
    @Bean(value = "lineDeviateKafkaProperties")
    public KafkaProperties lineDeviateKafkaProperties() {
        return new KafkaProperties();}

    @Bean("lineDeviateKafkaTemplate")
    public KafkaTemplate<String, String> lineDeviateKafkaTemplate(
            @Autowired @Qualifier("lineDeviateKafkaProperties")KafkaProperties lineDeviateKafkaProperties) {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(lineDeviateKafkaProperties.buildProducerProperties()));
    }

    /**
     * 消费者批量消费的监听工厂
     * @param lineDeviateKafkaProperties kafka 配置信息
     * @return 监听工厂
     */
    @Bean("lineDeviateKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> lineDeviateKafkaListenerContainerFactory(
            @Autowired @Qualifier("lineDeviateKafkaProperties")KafkaProperties lineDeviateKafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        DefaultKafkaConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(lineDeviateKafkaProperties.buildConsumerProperties());
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

}

根据配置类设置参数

spring.kafka:
  line-deviate:
    #公共参数,其他的 timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms 保持默认值
    properties:
      #这个参数指定 producer 在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小 (batch-size),到达时间后生产者也会发送批量消息到 broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
      linger.ms: 50 #默认值:0 毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
      #这个参数指定 producer 在一个 TCP connection 可同时发送多少条消息到 broker 并且等待 broker 响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为 1,可以保证发送到 broker 的顺序和调用 send 方法顺序一致,即便出现失败重试的情况也是如此。
      #注意:当前消息符合 at-least-once,自 kafka1.0.0 以后,为保证消息有序以及 exactly once,这个配置可适当调大为 5。
      max.in.flight.requests.per.connection: 1 #默认值:5,设置为 1 即表示 producer 在 connection 上发送一条消息,至少要等到这条消息被 broker 确认收到才继续发送下一条,因此是有序的。
    #生产者的配置,可参考 org.apache.kafka.clients.producer.ProducerConfig
    producer:
      #这个参数可以是任意字符串,它是 broker 用来识别消息是来自哪个客户端的。在 broker 进行打印日志、衡量指标或者配额限制时会用到。
      clientId: ${spring.application.name} #方便 kafkaserver 打印日志定位请求来源
      bootstrap-servers: xx.xxx.xx.xxx:9092,xx.xxx.xx.xxx:9092,xx.xxx.xx.xxx:9092 #kafka 服务器地址,多个以逗号隔开
      #acks=0:生产者把消息发送到 broker 即认为成功,不等待 broker 的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
      #acks=1:生产者会在该分区的 leader 写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果 leader 宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
      #acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
      #如果是发送日志之类的,允许部分丢失,可指定 acks=0,如果想不丢失消息,可配置为 all,但需密切关注性能和吞吐量。
      acks: all #默认值:1
      #当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合 retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举 leader 的时间长,这样可以避免生产者过早结束重试导致失败。
      #另外需注意,当开启重试时,若未设置 max.in.flight.requests.per.connection=1,则可能出现发往同一个分区的两批消息的顺序出错,比如,第一批发送失败了,第二批成功了,然后第一批重试成功了,此时两者的顺序就颠倒了。
      retries: 3  #发送失败时重试多少次,0= 禁用重试(默认值)
      #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy 压缩算法由 Google 研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip 消耗更多的 CPU 资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和 Kafka 存储成本。
      compressionType: "none" #如果不开启压缩,可设置为 none(默认值),比较大的消息可开启。
      #当多条消息发送到一个分区时,Producer 会进行批量发送,这个参数指定了批量消息大小的上限(以字节为单位)。当批量消息达到这个大小时,Producer 会一起发送到 broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
      batch-size: 16384 #默认 16K,值越小延迟越低,但是吞吐量和性能会降低。0= 禁用批量发送
      #这个参数设置 Producer 暂存待发送消息的缓冲区内存的大小,如果应用调用 send 方法的速度大于 Producer 发送的速度,那么调用会阻塞一定(max.block.ms)时间后抛出异常。
      buffer-memory: 33554432 #缓冲区默认大小 32M
    #消费者的配置,可参考:org.apache.kafka.clients.consumer.ConsumerConfig
    consumer:
      #这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。
      #暂不用提供 clientId,2.x 版本可放出来,1.x 有多个 topic 且 concurrency>1 会出现 JMX 注册时异常
      #clientId: ${spring.application.name} #方便 kafkaserver 打印日志定位请求来源
      # 签中 kafka 集群
      bootstrap-servers: xx.xxx.xx.xxx:9092,xx.xxx.xx.xxx:9092,xx.xxx.xx.xxx:9092 #kafka 服务器地址,多个以逗号隔开
      group-id: line-alert
      topic: line_alert_task
      #这个参数指定了当消费者第一次读取分区或者无 offset 时拉取那个位置的消息,可以取值为 latest(从最新的消息开始消费),earliest(从最老的消息开始消费),none(如果无 offset 就抛出异常)
      autoOffsetReset: latest #默认值:latest
      #这个参数指定了消费者是否自动提交消费位移,默认为 true。如果需要减少重复消费或者数据丢失,你可以设置为 false,然后手动提交。如果为 true,你可能需要关注自动提交的时间间隔,该间隔由 auto.commit.interval.ms 设置。
      enable-auto-commit: true
      #周期性自动提交的间隔,单位毫秒
      auto-commit-interval: 2000 #默认值:5000
      #这个参数允许消费者指定从 broker 读取消息时最小的 Payload 的字节数。当消费者从 broker 读取消息时,如果数据字节数小于这个阈值,broker 会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少 broker 和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻 broker 压力。
      fetchMinSize: 1 #默认值: 1
      #上面的 fetch.min.bytes 参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为 500ms。
      fetchMaxWait: 500 #默认值:500 毫秒
      #这个参数控制一个 poll() 调用返回的记录数,即 consumer 每次批量拉多少条数据。
      maxPollRecords: 500 #默认值:500
    listener:
      concurrency: 3  #推荐设置为 topic 的分区数

创建消费入口(KafkaListener)

包含了两个示例,一个使用默认监听容器工厂 org. springframework. kafka. config. KafkaListenerContainerFactory 的单次消费方式,具体源码可追踪 org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor#GENERATED_ID_PREFIX;另一个是使用了自定义监听容器工厂的批量消费方式。

package cn.phixlin.kafka.listener;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @author 
 * @description
 * @date 2024/11/7 17:22
 */
@Slf4j
@RequiredArgsConstructor
@Component("lineDeviateTaskConsumerHandler")
public class TaskConsumerHandler {


    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;


    @KafkaListener(topics = {"${spring.kafka.line-deviate.consumer.topic}"},
            groupId = "${spring.kafka.line-deviate.consumer.group-id}")
    public void lineDeviateConsumer(ConsumerRecord<Integer, String> record) {
        log.info("lineDeviateTaskConsumerHandler receive:{}", record.value());}


    @KafkaListener(topics = {"${spring.kafka.line-deviate.consumer.topic}"},
            groupId = "${spring.kafka.line-deviate.consumer.group-id}",
            containerFactory = "lineDeviateKafkaListenerContainerFactory")
    public void lineDeviateBatchConsumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        long start = System.currentTimeMillis();
        // 批量消费
        records.forEach(record -> CompletableFuture.runAsync(() -> {
            log.info("lineDeviateTaskConsumerHandler receive:{}", record.value());}, threadPoolTaskExecutor));
        // 手动提交 需要在配置中关闭自动提交
        ack.acknowledge();
        log.info("lineDeviateTaskConsumerHandler batch consumer size:{}, time:{}ms", records.size(), (System.currentTimeMillis() - start));}
}

生产消息

package cn.phixlin.kafka.producer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @author 
 * @description
 * @date 2024/11/8 16:53
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class AlertKafkaProducer {

    private final KafkaTemplate<String, String> lineDeviateKafkaTemplate;


    public void sendAlert(String msg) {
        try {
            // 默认异步发送
            lineDeviateKafkaTemplate.send("topic", msg);} catch (Exception e) {
            log.error("sendLindDeviateAlert sendKafka err:", e);}
    }

}
License:  CC BY 4.0