介绍与分析 在这篇文章中,会介绍了在Java中定义Kafka消费者的各种方法。 Spring、Micronaut、Vert。x和AkkaStreams在引擎盖下使用kafkaclients库,并提供完整的功能集来消费Kafka消息。 Kafka是一个著名的事件流量平台。我们在很多项目中使用它。没什么不寻常的工具很棒。各种框架和库提供与Kafka的集成。 在这篇文章中,我想介绍一下其中一些Java语言,看看我们如何创建一个客户实例,用来读取Kafka消息: 连接到Kafka的第一种方法是使用kafkaclients库中的KafkaConsumer类。其他的库或框架集成商通常使用该库。在本节中,我将重点介绍直接使用它。虽然它非常简单,但我们需要付出一些努力来提高效率。 首先,我们希望我们的消费者能持续工作。因此,我们将在单独的线程中运行它,我们需要自己管理它。此外,我们需要将轮询放入不定式循环中。 另一件事是关闭消费者,这可能很棘手。我们可以关闭线程并使用超时来关闭套接字和网络连接。然而,采用这种方法,我们错过了两个要点: 显式关闭消费者会立即触发重新平衡,因为组协调员没有发现消费者因丢失心跳而离开。该操作也会完成待处理的偏移量提交。因此,在再次运行消费者之后,我们不会两次消费某些消息。 接下来,如果我们想并行消费消息,我们需要提供一个自定义的解决方案来在同一个消费者组中运行特定数量的消费者。 然而,每个消费者都需要两个线程一个用于轮询,另一个用于心跳。 在消息的批量消费方面,我们在轮询一个队列后得到一个记录集合(可能为空)。 因此,我们不必提供任何特定的配置或机制。 当我们想要流式传输接收到的数据时,我们可以使用JDK的StreamAPI。但是如果我们想并行使用它们,情况就会变得复杂。更复杂的代码变得更容易出错。 默认情况下,消费者会自动提交偏移量。但是,我们可以更改这一点并手动完成工作。API为我们提供了几种同步或异步调用操作的方法。此外,我们可以提交从队列上最后一次轮询收到的所有消息的所有偏移量,或者提供特定的主题分区值。 使用普通的Kafka消费者,我们处理ConsumerRecord包含消息本身及其元数据的实例。它本身并不是一个缺点。但是,如果我们想解析它,我们需要提供我们的机制。 所以,总的来说,我会谨慎使用这种方法,而是考虑其他可用的可能性。 那么,让我们看看如何在一些框架或工具包中使用Kafka。 SpringBoot 当您在项目中使用SpringBoot时,您可以使用SpringforKafka集成。它提供了一种方便的监听器机制来实现对Kafka消息的消费。 我们可以通过两种方式消费消息:使用消息侦听器的容器,或通过提供带有KafkaListener注释的类。 当我们想使用消息侦听器方法时,我们需要提供两种类型的容器之一来运行我们的侦听器: KafkaMessageListenerContainer在单个线程上为容器配置中提供的所有主题提供消息消费,ConcurrentMessageListenerContainerKafkaMessageeListenerContainer允许在多线程环境中使用消息,为每个线程提供一个消息。 容器具有丰富的API,允许我们设置各种配置参数(如线程、批处理、确认、错误处理程序等)。重要的是要设置一个监听器类一个消息驱动的POJO。MessageListener它是orBatchMessageListener接口的一个实例。两者都是基本的,允许我们使用类型化的ConsumerRecord实例。Spring还提供了其他更复杂的接口。 然而,在Spring中使用Kafka消息最直接的方法是使用KafkaListener注解实现一个bean。处理接收到的消息的方法的签名可能会有所不同。您将使用的输入参数取决于您的需要,并且有很多可能性(有关详细信息,请查看注释javadocs)。在启动时,Spring会查找注解使用情况(带有注解的类必须是Spring组件)并创建运行在侦听器中定义的逻辑的Kafka消费者。 ComponentpublicclassKafkaListenerConsumer{KafkaListener(topics{spring。kafka。consumer。topic},groupId{spring。kafka。consumer。groupid})publicvoidprocessMessage(ListMessageStringcontent){processinglogiccomeshere}} 默认情况下,KafkaListener在单个线程中运行我们不会并行使用来自主题分区的消息。但是,我们可以通过两种方式改变这种行为。 第一个是定义concurrency注解的参数,我们可以在其中设置给定侦听器正在使用的线程数。 ComponentpublicclassKafkaListenerConsumer{KafkaListener(concurrency2,topics{spring。kafka。consumer。topic},groupId{spring。kafka。consumer。groupid})publicvoidprocessMessage(ListMessageStringcontent){processinglogiccomeshere}} 第二个选项是为containerFactory参数提供一个值。它是生产用于运行侦听器逻辑的容器的容器工厂bean的名称。当工厂不是单线程时(并发设置为大于1的值),框架将容器线程分配给分区。 BeanConcurrentKafkaListenerContainerFactoryString,StringmultiThreadedListenerContainerFactory(){ConcurrentKafkaListenerContainerFactoryString,StringfactorynewConcurrentKafkaListenerContainerFactory();factory。setConsumerFactory(consumerFactory());factory。setConcurrency(3);returnfactory;} 在这两种情况下,如果我们的线程数多于分区数,则有些线程仍处于空闲状态。 这还没有结束我们甚至可以使用topicPartitions参数为特定分区指定侦听器方法。有了这样的解决方案,Spring会自动在单独的线程中运行每一个。 ComponentpublicclassPartitionedKafkaListenerConsumer{KafkaListener(clientIdPrefixpart0,topics{spring。kafka。consumer。topic},groupId{spring。kafka。consumer。groupid},topicPartitions{TopicPartition(topic{spring。kafka。consumer。topic},partitions{0})})publicvoidpartition0(ConsumerRecordString,Stringcontent){processinglogiccomeshere}KafkaListener(clientIdPrefixpart1,topics{spring。kafka。consumer。topic},groupId{spring。kafka。consumer。groupid},topicPartitions{TopicPartition(topic{spring。kafka。consumer。topic},partitions{1})})publicvoidpartition1(ConsumerRecordString,Stringcontent){processinglogiccomeshere}} SpringforKafka也提供了批量消费消息的功能。当然,我们也是有不止一种选择。 第一个是容器工厂中批处理的配置开关。启用后,我们可以提供一个接受消息列表的侦听器。 重要的是我们需要使用批处理容器作为KafkaListener注解中的containerFactory参数值。一个选项使用带有前缀的消息侦听器接口。Batch他们接受消费者记录列表而不是单个记录。 当涉及到手动提交消息偏移量时,我们有同样丰富的选择。 首先,我们有原始的Kafka设置,即enable。auto。commit。当它为真时,Kafka根据其配置提交所有消息。否则,将根据配置中设置的确认模式的值来选择负责提交的实体。对于ack设置为MANUALorMANUALIMMEDIATE,由开发人员提交偏移量。对于所有其他值,由容器来运行它。此外,我们可以指定提交操作的同步性。 当我们使用手动提交时,我们可以Acknowledgment在框架的一些消息监听器中使用该类。该接口提供了调用已处理消息的提交操作或丢弃上次轮询的剩余记录的方法。 SpringforKafka让我印象深刻的是设置工作Kafka消费者的方法的数量。我们可以通过多种方式做到这一点,这很好,因为框架的弹性。但是,当我们迷失在各种可用选项中时,它也可能是有害的。 Micronaut 与Spring一样,Micronaut框架与Kafka进行了专门的集成,并且也适用于消息驱动的POJO。消费者的配置甚至类似。 我们从KafkaListener类级别的注释开始。这是我们定义一组消费者的地方。这样的组是基于为具有给定groupId的特定组提供默认值或值的配置文件内容配置的。我们可以使用注释参数覆盖这些值。 侦听器类的每个公共或包私有方法,用Topic(提供强制性主题名称模式)注释,成为在后台运行的Kafka消费者。我们也可以将注释放在类级别,但所有公共或私有包方法都成为Kafka消费者。所以我们需要小心这个。KafkaListener(groupIdmicronautgroup,clientId{kafka。consumers。micronautgroup。clientid})publicclassMicronautListener{Topic({kafka。consumers。micronautgroup。topic})voidreceive(KafkaKeyStringkey,Stringvalue){processinglogiccomeshere}} 要设置并发消息处理,我们可以将线程数定义为KafkaListener注释参数。如果我们提供的线程数少于分区数,一些消费者将处理来自两个或更多分区的消息。另一方面,如果我们设置更多它们,一些会保持闲置,什么也不做。这与Spring集成中的行为相同。 KafkaListener(groupIdmicronautgroup,clientId{kafka。consumers。micronautgroup。clientid},threads5)publicclassMultithreadedMicronautListener{Topic({kafka。consumers。micronautgroup。topic})voidreceive(KafkaKeyStringkey,Stringvalue,intpartition){switch(partition){case0:processinglogiccomesherebreak;case1:processinglogiccomesherebreak;case2:processinglogiccomesherebreak;default:log。error(Message(key{},value{})fromunexpectedpartition({})received。,key,value,partition);}}} 同样,我们可以使用注释上的batch参数启用批处理。 然后我们可以在消费者方法中消费一个记录列表(或领域类)。 KafkaListener(groupIdmicronautgroup,clientId{kafka。consumers。micronautgroup。clientid},batchtrue)publicclassBatchedMicronautListener{Topic({kafka。consumers。micronautgroup。topic})voidreceive(ListConsumerRecordString,Stringrecords){log。info(Batchreceived:{},records。size());records。forEach(recprocessinglogiccomeshere);}} 偏移提交的Micronaut管理提供了一些选项。我们通过OffsetStrategy枚举定义使用哪一个。您可以在框架文档中找到对它的出色描述。这是处理记录后使用手动提交的示例: Micronaut中Kafka类的配置比Spring中的配置更加简洁。在维护代码方面,更改或更新的地方更少了。但是,与Spring不同,我们不能以编程方式定义消费者,而无需使用注释。 AkkaStream 下一个客户端是带有Alpakka连接器〔urlhttps:doc。akka。iodocsakkacurrentstreamindex。html〕的AkkaStreams〔url〕库。 此设置提供了一种使用来自Kafka的消息的反应方式。它在底层使用了AkkaActors框架。 在这里,我们将Kafka消费者作为事件源的实例。在提供的数据类型、提供的元数据和分区信息以及处理偏移提交的方式方面有所不同。 让我们从Consumer。plainSource。ConsumerRecord它在单个线程中为整个主题发出消息,保留给定分区的消息消费顺序。根据Kafka消费者配置,流可以自动提交已处理的记录。 Consumer。plainSource(consumerSettings,Subscriptions。topics(topicName))。map(consumerRecord{processinglogiccomesherereturnconsumerRecord;})。runWith(Sink。ignore(),materializer)。toCompletableFuture()。handle(AppSupport。doneHandler())。join(); 我们也可以选择手动提交消息。如果是这样,我们需要使用提供消费者记录和有关当前偏移量信息的可提交源之一。在处理完一条消息后,我们可以利用额外的数据来调用一个Committer实例来进行手动提交。 Consumer。committableSource(consumerSettings,Subscriptions。topics(topicName))。map(committableMessage{processinglogiccomesherereturncommittableMessage;})。mapAsync(maxParallelism,msgCompletableFuture。completedFuture(msg。committableOffset()))。runWith(Committer。sink(CommitterSettings。create(committerSettings)),materializer)。toCompletableFuture()。handle(AppSupport。doneHandler())。join(); 关于并行处理事件,该库也提供了出色的工具。最简单的解决方案是使用普通分区源,它发出记录源以及主题分区信息。 当我们使用来自子源的消息时,该操作在每个分区的单独线程上运行。但是,我们可以使用分区信息以自定义方式分配线程分配(我们需要使用flatMapMerge和groupBy运算符)。 Consumer。plainPartitionedSource(consumerSettings,Subscriptions。topics(topicName))。mapAsync(maxPartitions,pair{SourceConsumerRecordString,String,NotUsedsourcepair。second();returnsource。map(record{processinglogiccomesherereturnrecord;})。runWith(Sink。ignore(),materializer);})。runWith(Sink。ignore(),materializer)。toCompletableFuture()。handle(AppSupport。doneHandler())。join(); 并行使用数据时最重要的是结果的顺序。我们有两个选择。 第一个是使用mapAsync运算符。它是使用参数中指定大小的线程池。该阶段确保下游发出消息的顺序,但不保证处理顺序。另一方面假设发出消息的顺序对我们来说并不重要。在这种情况下,我们可以使用mapAsyncUnordered操作符它会在处理完成后向下游传递消息,而不管接收顺序如何。 批处理也可用。我们可以通过使用类似groupedor的批处理操作符batch来实现它。在这种情况下,我们需要使用一个CommittableOffsetBatch实例并使用批处理中最后处理的消息的偏移量对其进行更新。然后,我们需要在流程的下一步中调用commit。 AkkaStreams对Kafka的支持令人惊叹。它将消息作为数据流来消费,这是最适合Kafka消费者的方式。 由于消息源的粒度,我们可以轻松地为我们的案例选择最合适的一个。通过利用Streams的强大API,我们可以非常快速地获得批处理或背压等功能。 使用AkkaStreams时对我来说最大的缺点是操作符的丰富性。您可能需要一些时间来熟悉它。但是,当您查看连接器源代码时,您会发现许多如何将Streams与Kafka一起使用的示例。Vertx 介绍的最后一种实现消费来自Kafka的消息的方法是使用Vert。x工具包。 该方法类似于AkkaStreams的方法它适用于Verticle,一种轻量级Actor的形式。verticles使用事件总线在彼此之间传递消息。 它们可以在事件循环和工作线程上运行。核心库提供基本功能(如在AkkaStreams中),我们需要使用外部组件来连接Kafka,即vertxkafkaclient。 虽然一般假设与AkkaStreams中的假设非常相似,但使用代码看起来不同。 Vert。x应用程序使用事件循环和工作线程。 前者将事件传递到目标顶点,并且可以运行快速、非阻塞的代码。后者的目的是完成繁重的工作,例如IO或昂贵的计算。 因此,我们应该考虑使用工作线程来消费Kafka消息,这样循环不会被阻塞,应用程序运行顺畅。 示例代码包含作为工作人员运行的Kafkaverticles。 对,那么我们该如何创建Kafka消息的消费者呢?Vert。x客户端为此提供了一个类KafkaConsumer。它提供了几种工厂方法,用于根据提供的配置创建实例。 有了消费者,我们需要在启动顶点之前订阅一些Kafka主题。我们可以从subscribe方法的几个变体中进行选择。调用其中之一使顶点能够从单个或多个主题中读取数据。下一步是注册处理函数,使用接收到的消息。所有这一切,我们都是通过在消费者身上使用流畅的API来完成的。 这是为一个或多个主题创建普通消费者的方式,没有分区拆分到不同的线程。正如您在示例项目中看到的那样,我已经封装了verticle地逻辑并将其部署为workerverticle。使用此解决方案,所有消息都将由同一个工作线程使用。classKafkaVerticleextendsAbstractVerticle{initializationOverridepublicvoidstart(){KafkaConsumer。create(vertx,kafkaConfig)。subscribe(topic)。handler(recordprocessinglogiccomeshere)。endHandler(vlog。info(Endofdata。Topic:{},this。topic))。exceptionHandler(elog。error(SingleKafkaconsumererror,e));}} 根据Kafka配置,消费者可能会自动提交偏移量。 但是手动触发动作呢?Vert。x消费者提供了完成这项工作的提交方法。 我们可以将它们称为单个消息或特定主题和分区的一组偏移量。 在为给定主题的所有分区创建消费者时,我们需要手动设置Vertx。 首先,我们需要知道我们想要处理哪些主题的多少个分区。 我们可以使用KafkaAdminClient来描述我们感兴趣的话题。 接下来,我们需要为每个主题分区对创建一个包含Kafka消费者的专用线程。 在线程内部,我们将消费者分配给所需的主题分区数据并指定处理程序,就像在普通消费者线程中一样。 根据Kafka的配置,消费者可能会自动提交offsets。 但是手动触发动作呢?Vert。x消费者提供了做这个工作的提交方法。我们可以为单个消息或者为特定主题和分区的一堆偏移量调用它们。 当涉及到为某一主题的所有分区创建消费者时。 我们需要手动设置Vertx。 首先,我们需要知道我们想处理哪些主题的多少个分区。我们可以调用KafkaAdminClient来描述我们感兴趣的主题。 接下来,我们需要为每个主题分区对创建一个专用Vertx线程,包含Kafka消费者。在顶点内,我们将消费者分配给所需的主题分区数据,并像普通消费者顶点那样指定处理程序。createrequirednumberofverticesIntStream。range(0,numberOfPartitions)。forEach(partition{vertx。deployVerticle(()KafkaPartitionedVerticle。create(topic,partition,kafkaConfig),deploymentOptions,asynclog。info(PartitionedKafkaconsumerdeployed。DeploymentId:{},async。result()));});insideKafkaPartitionedVerticleclassKafkaPartitionedVerticleextendsAbstractVerticle{initializationOverridepublicvoidstart(){KafkaConsumer。create(vertx,kafkaConfig)。assign(newTopicPartition(topic,partition),AsyncResult::result)。handler(recordprocessinglogiccomeshere)。endHandler(vlog。info(Endofdata。Topic:{},partition:{},this。topic,this。partition))。exceptionHandler(elog。error(PartitionedKafkaconsumererror,e));}} 正如我上面提到的,每个Kafka消费者都使用一个专门的处理器来处理收到的消息。根据我们的需要,我们可以一条一条地消费记录,也可以分批消费。在前一种情况下,我们使用handler方法定义一个函数,而对于后者,我们使用batchHandler方法。Kafka组件分别以KafkaConsumerRecord或KafkaConsumerRecords的形式提供记录。在这两种情况下,在引擎盖下,我们可以找到一个好的旧ConsumerRecord实例。 总结 哪一个更优越呢?我想说没有,除了一种情况。 如果你有一个项目考虑直接使用kafkaclients库,我会建议你不要这样做。这个库是一种连接到Kafka的驱动。在项目中使用它,我们需要接受的是,在某种程度上,我们将重新发明已经在其他工具中实现的轮子。 如果你的项目中的框架不限制你,我建议使用AkkaStreams。 否则,寻找已经存在的集成。如果你的服务是在Spring框架内开发的,那么应用AkkaStreams是没有意义的。或者在Vert。x应用程序中使用Micronaut。 如果有更好的方法,欢迎各位大佬指教。 原文:https:www。jdon。com58627