Kafka-Producer 重要配置参数
前言
Kafka 在弹性、容错性以及高吞吐量方面有着很大的优势。想要达到生产环境最优,发挥这些特性,需要进行一些特殊配置。Kafka 提供了非常多的配置属性,对于初学者而言,基础的配置已经能满足大部分使用场景,本文记录几个重要但不常见的针对 Producer 的配置参数。
- acks
- min.insync.replicas
- replica.lag.time.max.ms
- retries
- enable.idempotence
- max.in.flight.requests.per.connection
- buffer.memory
- max.block.ms
- linger.ms
- batch.size
- compression.type
acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的配置具体如下:
-
acks=0:不等待任何确认机制,即消息发送到 kafka 就认为发送成功,但是 kafka 不会保证消息没有丢失,如果 Leader Broker 挂掉,消息可能会丢失。由于不需要等待任何确认机制,所以可以达到很高的吞吐量。
-
acks=1:表示只要集群的 Leader Broker 收到消息,就会向消息生产者发送一个成功响应的 ack,此时生产者接收到 ack 之后就可以认为该消息是写入成功的. 一旦消息无法写入 Leader 分区副本 (比如网络原因、Leader Broker 崩溃), 生产者会收到一个错误响应,当生产者接收到该错误响应之后,为了避免数据丢失,会重新发送数据. 这种方式的吞吐量取决于使用的是异步发送还是同步发送。
注意:如果消息生产者收到了错误响应,即使重新发送了消息,还是可能会出现数据丢失的问题,例如:没有收到消息的 Broker 成为 Leader Broker,消息就会丢失
-
acks=all(或 -1):表示只有当集群中的所有分区副本都收到消息,生产者才认为该消息是写入成功的,此时生产者会收到一个成功响应的 ack,如果 Leader Broker 挂掉,消息不会丢失,但是生产者需要等待所有副本都同步到 Leader Broker,所以延时较高,吞吐量会降低。
补充:消息送达语义是消息系统中常见的问题,主要包含三种语义 at-most-once(消息发送或消费至多一次)、 at-least-once(消息发送或消费至少一次)、 exactly-once(消息恰好只发送一次或只消费一次)
min.insync.replicas
上面提到,当 acks=all 时,需要所有的副本都同步了才会发送成功响应到生产者,其实这里面存在一个问题:如果 Leader 副本是唯一的同步副本时会发生什么呢?此时相当于 acks=1. 所以是不安全的。Kafka 的 Broker 端提供了一个参数 min.insync.replicas,该参数控制的是消息至少被写入到多少个副本才算是”真正写入”,该值默认值为 1,生产环境设定为一个大于 1 的值可以提升消息的持久性,因为如果同步副本的数量低于该配置值,则生产者会收到错误响应,从而确保消息不丢失。
replica.lag.time.max.ms
In-sync replica(ISR) 称之为同步副本,ISR 中的副本都是与 Leader 进行同步的副本,所以不在该列表的 Follower 会被认为与 Leader 是不同步的. 那么,ISR 中存在是什么副本呢?首先可以明确的是:Leader 副本总是存在于 ISR 中. 而 Follower 副本是否在 ISR 中,取决于该 Follower 副本是否与 Leader 副本保持了“同步”。
这个同步并不是指完全的同步,即并不是说一旦 follower 副本同步滞后与 Leader 副本,就会被踢出 ISR 列表。Broker 端提供了参数 replica.lag.time.max.ms
,该参数控制了副本与 Leader 副本的最大同步滞后时间,默认值为 10s,如果超过该时间,则该副本会被踢出 ISR 列表,从而保证 Leader 副本的实时性。
当然,Follower 副本被踢出 ISR 列表,等到该副本追上了 Leader 副本的进度,该副本会被再次加入到 ISR 列表中,所以 ISR 是一个动态列表,并不是静态不变的。
retries
生产者从服务器收到的错误有可能是临时性的错误(比如 Broker 找不到 Leader)。在这种情况下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
默认情况下,生产者会在每次重试之间等待 100ms ,可以通过 retry.backoff.ms
参数来配置时间间隔。比如,设置了 acks=all 和 min.insync.replicas=2。由于某种原因,所有 follower 都挂了,由于 min.insync.replicas=2
,所以生产者无法收到来自 Broker 端的 ack。
此时我们会从 Producer 端收到一个错误消息:"Broker: Not enough in-sync replicas"。这就意味着 Kafka 不能在 Broker 上追加生产的消息 (数据) 了,因为此时的 ISR 的数量不够。此时在 Broker 端会有如下的错误消息:org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition
默认情况下,Producer 不会对此错误进行处理,这就会造成消息丢失,即 at-most-once 语义。我们可以通过配置重试次数来让生产者重新发送消息。比如配置 retries=3,默认为 0
enable.idempotence
在某些情况下,实际上已将消息提交给了所有同步副本,但是由于网络问题,Broker 无法向 Producer 发送确认 ack。由于我们设置 retries=3,所以 Producer 将重新发送消息 3 次,这可能会导致 topic 中消息重复。
比如有一个 Producer 向该 topic 发送 1M 消息,并且在提交消息之后但在生产者收到所有确认 ack 之前,Broker 失败了。在这种情况下,由于重试机制,最终可能在该 topic 上收到超过 1M 的消息,这也称为 at-lease-once 语义。
当然,我们想要实现的是 exactly-once 语义,即:即便生产者重新发送消息,消费者也应该只收到一次相同的消息。
此时需要进行幂等操作,所谓幂等,即指一次执行一个操作或多次执行一个操作具有相同的效果。配置幂等很简单,通过配置 enable.idempotence=true
即可,默认为 false。
那么,幂等是如何实现的呢?由于消息是分 batch (批次) 发送的,每个 batch (批次) 都有一个序列号。在 Broker 端,会追踪每个分区的最大序列号。如果出现序列号较小或相等的 batch (批次),Broker 将不会将该 batch (批次) 写入 topic。这样,除了保证了幂等性,还可以确保 batch (批次) 的顺序。
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
因为如果将两个批次发送到单个分区,并且第一个批次失败并被重试,但是,接着第二个批次写入成功,则第二个批次中的记录可能会首先出现,这样就会发生乱序。
如果没有启用幂等功能,但仍然希望按顺序发送消息,则应将此设置配置为 1。但是,如果已经启用了幂等,则无需显式定义此配置。
buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()
方法调用要么被阻塞,要么抛出异常,取决于如何设置 max.block.ms。
当生产者调用 send()
时,消息并不会立即发送,而是会添加到内部缓冲区中。默认 buffer.memory 值为 32MB。如果生产者发送消息的速度超过了将消息发送到 Broker 的速度,或者存在网络问题,send()
方法调用会被阻塞 max.block.ms 参数配置的时常,默认 1 分钟。
max.block.ms
该参数指定了在调用 send()
方法或使用 partitionsFor()
方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会被阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。kafka 生产者会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,这取决于 linger.ms 的配置,比如如果 linger.ms 时间到了,即便批次只包含一个消息,也会被立即发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
可以使用配置使用 linger.ms 和 batch.size。linger.ms 是准备好发送批次之前的延迟时间,默认值为 0。这意味着即使批次中只有 1 条消息,批次也会立即发送。有时,会增加 linger.ms 以减少请求数量并提高吞吐量。但这将导致更多消息保留在内存中。batch.size 是单个批次的最大大小,当满足这两个要求中的任何一个时,将发送批次。
compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 Broker 之前使用哪一种压缩算也进行压缩。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
at-least-once 配置组合
Kafka 默认的 Producer 消息送达语义就是 at-least-once,这意味这不用做任何配置就能实现 at-least-once 消息语义。因为 Kafka 中默认 acks=1 并且 retries=2147483647
at-most-once 配置组合
可以通过配置 Producer 的以下配置实现 at-most-once 语义:acks=0,retries=0
当配置了 retires 的值后,如果没有将 max.in.flight.requests.per.connection 配置的值设置为 1,有可能造成消息乱序的结果。 max.in.flight.requests.per.connection 配置代表着一个 Producer 同时可以发送的未收到确认的消息数量。如果 max.in.flight.requests.per.connection 数量大于 1,那么可能发送了 message1 后,在没有收到确认前就发送了 message2,此时 message1 发送失败后触发重试,而 message2 直接发送成功,就造成了 Broker 上消息的乱序。max.in.flight.requests.per.connection 的默认值为 5。
exactly-once 配置组合
exactly-once 是 Kafka 从版本 0.11 之后提供的高级特性。我们可以通过配置 Producer 的以下配置项来实现 exactly-once 语义: enable.idempotence=true,acks=all
当 enable.idempotence=true 时,acks 必须配置为 all。并且建议 max.in.flight.requests.per.connection 的值小于 5。