介绍 Kafka
Kafka 是一款基于发布与订阅的消息系统。
用生产者客户端 API 向 Kafka 生产消息,用消费者客户端 API 从 Kafka 读取这些消息。
Kafka 使用 Zookeeper 保存元数据信息。
- Kafka 0.9 版本之前,除了 broker 之外, 消费者也会使用 Zookeeper 保存一些信息,比如消费者群组的信息、 主题信息、消费分区的偏移量(在消费者群组里发生失效转移时会用到)。
- 到了 0.9.0.0 版本, Kafka 引入了一个新的消费者接口,允许 broker 直接维护这些信息。
Kafka 中的概念
消息 & 批次
Kafka 的数据单元被称为消息。消息就好比数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以对于 Kafka 来说,消息里的数据没有特别的格式或含义。
消息可以有个可选的元数据,也就是键。键也是一个字节数组,与消息一样,对于 Kafka 来说也没有特殊的含义。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题的分区数进行取模,为消息选取分区。
为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一主题和分区。
如果每一个消息都单独串行于网络,会导致大量的网络开销,把消息分批次传输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。
批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
主题 & 分区
Kafka 的消息通过主题进行分类。主题就好比数据库的表,或者文件系统里的文件夹。
主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先进先出的顺序读取。要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说, 一个主题可以横跨多个服务器,以此来提供比个服务器更强大的性能。
Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。拥有大量消息的主题如果要进行负载分散,就需要大量的分区。
生产者 & 消费者
Kafka 的客户端就是 Kafka 系统的用户,Kafka 的客户端被分为两种基本类型生产者和消费者。除此之外,还有其他高级客户端 API:用于数据集成的 Kafka Connect API 和用于流式处理的 Kafka Streams 。这些高级客户端 API 使用生产者和消费者作为内部组件,提供了高级的功能。
生产者
生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。
一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
消费者
消费者读取消息。在其他基于发布与订阅的消息系统中,消费者可能被称为订阅者 或 读者。
消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。
偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把偏移量添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
消费者群组
消费者是消费者群组的一部分。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。消费者群组保证每个分区只能被一个消费者使用 。消费者与分区之间的映射通常被称为消费者对分区的所有权关系。
通过消费者群组的方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,消费者群组里的其他消费者可以接管失效消费者的工作。
broker & 集群
一个独立的 Kafka 服务器被称为 broker。
- broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
- broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。
broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(集群控制器自动从集群的活跃成员中选举出来)。集群控制器负责管理工作,包括将分区分配给 broker 和监控 broker。在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
保留消息(在一定期限内)是 Kafka 的一个重要特性。消息被提交到磁盘,Kafka 根据设置的保留规则进行保存。主题可以配置自己的保留策略,将悄息保留到不再使用它们为止。 Kafka 有两种保留规则:
- 根据时间保留数据:根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里最后一个消息的时间戳。当前时间超过磁盘上日志片段文件的最后修改时间,超过的时间达到配置参数指定的值,那么旧消息就会过期并被删除。
- 根据消息的字节数保留数据:当单个主题中所有消息的字节数达到配置参数指定的值,那么旧消息就会过期并被删除。所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。
Kafka生产者
生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。
一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
生产者发送消息的方式
生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息
同步发送消息
同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。
- 如果服务器返回错误,Future 的 get() 方法会抛出异常。
- 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。
我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。
异常处理
如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
KafkaProducer 一般会发生两类错误。
- 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
- 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
public void send(String topic, String key, String val) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);
try {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
SendResult<String, String> sendResult = future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
异步发送消息
异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。
大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。
为了使用回调,需要一个实现了
org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
分区器
介绍分区
ProducerRecord 对象包含目标主题、消息键和值(消息)。
- 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
- 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。
这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
自定义分区策略
生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
通过分区器实现自定义分区策略的步骤:
- 定义一个类,该类实现 Partitioner 接口(分区器)
- 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {
/**
* 返回信息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
if ((keyBytes == null) || (!(key instanceof String))) {
throw new InvalidRecordException("We expect all messages to have String type as key");
}
// 实现自己的分区策略
// 返回数据写入的分区号
return 0;
}
// 关闭资源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
kafka消费者
KafkaConsumer 的概念
消费者 & 消费者群组
消费者读取消息。在其他基于发布与订阅的消息系统中,消费者可能被称为订阅者 或 读者。
消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。
偏移量是一种元数据,它是一个不断递增的整数值,在创建消息时, Kafka 会把偏移量添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
消费者群组
消费者是消费者群组的一部分。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题的一部分分区的消息。消费者群组保证每个分区只能被一个消费者使用 。消费者与分区之间的映射通常被称为消费者对分区的所有权关系。
通过消费者群组的方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,消费者群组里的其他消费者可以接管失效消费者的工作。
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。
分区再均衡
当一个消费者被关闭或发生崩溃时,这个消费者就离开群组,原本由它读取的分区将由消费者群组里的其他消费者来读取。
当一个新的消费者加入消费者群组时,这个新的消费者读取的是原本由其他消费者读取的消息。
在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。分区再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生分区再均衡。原因如下:
- 在分区再均衡期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。
- 另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
分区再均衡的过程
消费者通过向被指派为群组协调器的 broker(不同的消费者群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。
- 只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
- 如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为消费者已经死亡,就会触发一次分区再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认消费者已经死亡了才会触发分区再均衡。在清理消费者时,消费者会通知群组协调器它自己将要离开消费者群组,群组协调器会立即触发一次分区再均衡,尽量降低处理停顿。
分配分区的过程:
- 当消费者要加入消费者群组时,消费者会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。
- 群主从群组协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者,Kafka 内置了两种分区分配策略。
- 群主将分区分配完毕之后,群主把分区的分配情况列表发送给群组协调器,群组协调器再把这些信息发送给所有消费者。
每个消费者只能看到自己的分区分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次分区再均衡时重复发生。消费者群组的群主应该保证在分配分区时,尽可能少的改变原有的分区和消费者的映射关系。
订阅主题 & 轮询
应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。
应用程序调用 kafkaConsumer 的 subscribe() 方法订阅主题:
- 我们可以在调用 subscribe() 方法时传入一个主题列表作为参数。
- 我们也可以在调用 subscribe() 方法时传入一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题了。
轮询
消费者通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括消费者群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了分区再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
提交 & 偏移量
我们把更新分区当前位置的操作叫作提交。那么消费者是如何提交偏移量的呢?消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。
如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发分区再均衡,完成分区再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
- 如果消费者提交的偏移量 小于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理
- 如果消费者提交的偏移量 大于 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失
所以,处理偏移量的方式对客户端会有很大的影响。KafkaConsumer API 提供了很多种方式来提交偏移量:自动提交偏移量、手动提交偏移量。
自动提交
如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交的时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。
与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否应该提交偏移量了,如果距离上次的提交时间已经超过了配置参数 auto.commit.interval.ms 指定的值,那么就会提交上一次轮询返回的偏移量。
在调用 close() 方法之前也会进行自动提交。
让消费者自动提交偏移量是最简单的方式。不过,在使用这种简便的方式之前,需要知道自动提交将会带来怎样的结果。
假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了分区再均衡,分区再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s 内消费者已经处理过的消息会再被重复处理。我们可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗口,不过这种情况是无法完全避免的。
手动提交
手动提交指的是,把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。应用程序可以使用 commitSync()、commitAsync() 方法手动提交偏移量
- commitSync 同步提交偏移量:手动提交偏移量之后,同步等待 broker 响应。commitSync() 方法会提交由 poll() 方法返回的最新偏移量,只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败就抛出异常,我们也只能把异常记录到错误日志里。
- commitAsync 异步提交偏移量:手动提交偏移量之后,不等待 broker 响应,而是在提交偏移量时指定一个回调方法,在 broker 作出响应时会执行这个回调方法。回调经常被用于记录提交错误或生成度量指标。在成功提交或碰到无怯恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会重试。
消费者也可以提交特定的偏移量:消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map,这样我们就可以提交特定的偏移量。需要使用期望处理的下一个消息的偏移量更新 map 里的偏移量。
异步提交:同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了分区再均衡,会增加重复消息的数量。这个时候我们可以使用异步提交,我们只管发送提交请求,无需等待 broker 的响应。
再均衡监听器
在【分区再均衡前后】、【消费者开始读取消息之前】、【消费者停止读取消息之后】我们可以通过消费者 API 执行一些应用程序代码,在调用 kafkaConsumer 的 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。
再均衡监听器 ConsumerRebalanceListener 有两个需要实现的方法。
- public void onPartitionsRevoked(Collection< TopicPartition > partitions):该方法会在【分区再均衡开始之前】和【消费者停止读取消息之后】被调用。我们可以在消费者失去分区所有权之前,通过 onPartitionsRevoked() 方法来提交偏移量。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取消息了。
- public void onPartitionsAssigned(Collection< TopicPartition > partitions):该方法会在【重新分配分区之后】和【消费者开始读取消息之前】被调用。我们可以在消费者获取分区所有权之后,通过 onPartitionsAssigned() 方法来指定读取消息的起始偏移量。保证消费者总是能够从正确的位置开始读取消息。
如何退出
如果消费者确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。
consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。
调用 consumer.wakeup() 可以退出 poll(),并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。
独立消费者
我们可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和分区再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。
如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。
独立消费者除了不会发生分区再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer.partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。
public void singleCustomer() {
// 向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过这一步
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
ArrayList<TopicPartition> partitions = new ArrayList<>();
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 为自己分配分区
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
}
消费者的示例代码
再均衡监听器
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public void MyConsumerRebalanceListener(KafkaConsumer consumer) {
this.consumer = consumer;
}
public static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
/**
* 在消费者失去分区所有权之前,提交偏移量
*
* @param partitions
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
/**
* 在消费者获取分区所有权之后,指定读取消息的起始偏移量
*
* @param partitions
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long offset = 0;
for (TopicPartition partition : partitions) {
consumer.seek(partition, offset);
}
}
}
消费者读取消息
public void customer() {
consumer.subscribe(Collections.singletonList("MyTopic"), new MyConsumerRebalanceListener(consumer));
// 如果不需要手动指定消费者读取消息的起始偏移量,下面的代码不是必须的
consumer.poll(0);
long offset = 0;
for (TopicPartition partition : consumer.assignment()) {
consumer.seek(partition, offset);
}
try {
while (true) {
// 参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
// 如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量、消息以及消息键。
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ", record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 将记录保存到数据存储系统里
System.out.println(record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
// 如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。
// 使用 commitAsync() 方法只会执行一次提交,不会重试
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
log.error("Commit failed for offsets {}", map, e);
}
}
});
}
} catch (WakeupException e) {
// 我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。
consumer.commitSync();
} finally {
// 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭
// 并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}
为帮助开发者们提升面试技能、有机会入职BATJ等大厂公司,特别制作了这个专辑——这一次整体放出。
大致内容包括了: Java 集合、JVM、多线程、并发编程、设计模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大厂面试题等、等技术栈!
欢迎大家关注公众号【咕泡Java】,回复【007】,获取以上最新Java后端架构VIP学习资料以及视频学习教程,然后一起学习,一文在手,面试我有。
每一个专栏都是大家非常关心,和非常有价值的话题,如果我的文章对你有所帮助,还请帮忙点赞、好评、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!