拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 测试Kafka和Spring Boot

测试Kafka和Spring Boot

白鹭 - 2021-11-24 866 0 0

1.概述

Apache Kafka是一个功能强大的分布式容错流处理系统。

在本教程中,学习如何编写不依赖于运行外部Kafka服务器的可靠,独立的集成测试**。

首先,我们将开始研究如何使用和配置Kafka的嵌入式实例。然后,我们将看到如何利用测试中流行的框架Testcontainers

2.依存关系

当然,我们需要将标准的spring-kafka依赖项添加到我们的pom.xml

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka</artifactId>

 <version>2.6.3.RELEASE</version>

 </dependency>

然后,我们将需要另外两个依赖项专门用于我们的测试。首先,我们将添加spring-kafka-test工件

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka-test</artifactId>

 <version>2.6.3.RELEASE</version>

 <scope>test</scope>

 </dependency>

最后,我们将添加Testcontainers Kafka依赖项,该依赖项也可以在Maven Central上使用

<dependency>

 <groupId>org.testcontainers</groupId>

 <artifactId>kafka</artifactId>

 <version>1.15.0</version>

 <scope>test</scope>

 </dependency>

现在我们已经配置了所有必需的依赖项,我们可以使用Kafka编写一个简单的Spring Boot应用程序。

3.一个简单的Kafka生产者-消费者应用程序

在整个教程中,我们测试的重点将是一个简单的生产者-消费者Spring Boot Kafka应用程序。

让我们从定义应用程序入口点开始:

@SpringBootApplication

 @EnableAutoConfiguration

 public class KafkaProducerConsumerApplication {



 public static void main(String[] args) {

 SpringApplication.run(KafkaProducerConsumerApplication.class, args);

 }

 }

如我们所见,这是一个标准的Spring Boot应用程序。在可能的情况下,我们希望使用默认配置值。考虑到这一点,我们利用@EnableAutoConfiguration批注自动配置应用程序。

3.1。生产者设置

接下来,让我们考虑一个生产者bean,我们将使用它将消息发送到给定的Kafka主题:

@Component

 public class KafkaProducer {



 private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);



 @Autowired

 private KafkaTemplate<String, String> kafkaTemplate;



 public void send(String topic, String payload) {

 LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);

 kafkaTemplate.send(topic, payload);

 }

 }

上面定义的我们的KafkaProducer bean仅仅是KafkaTemplate类的包装。此类提供高级线程安全操作,例如将数据发送到所提供的主题,这正是我们在send方法中所做的

3.2。消费者设置

同样,我们现在将定义一个简单的消费者Bean,它将监听Kafka主题并接收消息:

@Component

 public class KafkaConsumer {



 private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);



 private CountDownLatch latch = new CountDownLatch(1);

 private String payload = null;



 @KafkaListener(topics = "${test.topic}")

 public void receive(ConsumerRecord<?, ?> consumerRecord) {

 LOGGER.info("received payload='{}'", consumerRecord.toString());

 setPayload(consumerRecord.toString());

 latch.countDown();

 }



 public CountDownLatch getLatch() {

 return latch;

 }



 public String getPayload() {

 return payload;

 }

 }

我们的简单使用者在receive方法上使用@KafkaListener批注来侦听有关给定主题的消息。稍后我们将看到如何从测试中配置test.topic

此外,receive方法将消息内容存储在我们的bean中,并减少latch变量的计数。此变量是一个简单的线程安全计数器字段,稍后我们将在测试中使用该字段,以确保我们成功接收到message

现在,我们已经使用Spring Boot实现了简单的Kafka应用程序,让我们看看如何编写集成测试。

4.关于测试的话

通常,在编写简洁的集成测试时,我们不应依赖于我们可能无法控制或突然停止工作的外部服务。这可能会对我们的测试结果产生不利影响。

同样,如果我们依赖外部服务(在这种情况下为运行中的Kafka经纪人),则可能无法按照我们希望通过测试的方式对其进行设置,控制和拆除。

4.1。应用属性

我们将从测试中使用一组非常简单的应用程序配置属性。我们将在src/test/resources/application.yml文件中定义以下属性:

spring:

 kafka:

 consumer:

 auto-offset-reset: earliest

 group-id: baeldung

 test:

 topic: embedded-test-topic

这是使用Kafka嵌入式实例或本地代理时所需的最少属性集。

其中大多数是不言自明的,但是我们应该特别强调的一个是消费者财产的auto-offset-reset: earliest 。此属性可确保我们的使用者组获取我们发送的消息,因为容器可能在发送完成后启动。

另外,我们为主题属性配置了值embedded-test-topic ,这是我们将在测试中使用的主题。

5.使用嵌入式Kafka进行测试

在本节中,我们将研究如何使用内存中的Kafka实例对我们的测试进行测试。这也称为嵌入式Kafka。

我们之前添加的依赖项spring-kafka-test包含一些有用的实用程序,以帮助测试我们的应用程序。最值得注意的是,它包含EmbeddedKafkaBroker

考虑到这一点,让我们继续编写我们的第一个集成测试:

@SpringBootTest

 @DirtiesContext

 @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })

 class EmbeddedKafkaIntegrationTest {



 @Autowired

 private KafkaConsumer consumer;



 @Autowired

 private KafkaProducer producer;



 @Value("${test.topic}")

 private String topic;



 @Test

 public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()

 throws Exception {

 producer.send(topic, "Sending with own simple KafkaProducer");

 consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);



 assertThat(consumer.getLatch().getCount(), equalTo(0L));

 assertThat(consumer.getPayload(), containsString("embedded-test-topic"));

 }

 }

让我们来看一下测试的关键部分。首先,我们先用两个漂亮的标准Spring注释装饰测试类:

  • @SpringBootTest批注将确保我们的测试引导Spring应用程序上下文
  • 我们还使用@DirtiesContext批注,以确保清除此上下文并在不同测试之间重置

至关重要的部分到了,我们使用@EmbeddedKafka批注将EmbeddedKafkaBroker的实例注入我们的测试中。此外,我们可以使用一些属性来配置嵌入式Kafka节点:

  • partitions –这是每个主题使用的分区数。为了使事情变得简单明了,我们只希望在测试中使用一个
  • brokerProperties – Kafka经纪人的其他属性。同样,我们保持简单,并指定纯文本侦听器和端口号

接下来,我们自动连接consumerproducer类,并配置一个主题以使用application.properties的值。

对于拼图的最后一部分,我们仅向测试主题发送一条消息,并验证是否已收到该消息并包含测试主题的名称

运行测试时,我们将在冗长的Spring输出中看到:

...

 12:45:35.099 [main] INFO cbkafka.embedded.KafkaProducer -

 sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'

 ...

 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]

 INFO cbkafka.embedded.KafkaConsumer - received payload=

 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,

 CreateTime = 1605267935099, serialized key size = -1,

 serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),

这确认我们的测试正常运行。太棒了!现在,我们有一种使用内存中的Kafka代理编写独立的,独立的集成测试的方法

6.使用TestContainers测试Kafka

有时,我们可能会看到真正的外部服务与专门为测试目的提供的服务的嵌入式内存实例之间的细微差异。尽管不太可能,但也有可能是我们测试中使用的端口被占用,从而导致故障

考虑到这一点,在本节中,我们将看到我们以前使用Testcontainers框架进行测试的方法的一种变化。我们将通过集成测试了解如何实例化和管理托管在Docker容器中的外部Apache Kafka代理。

让我们定义另一个集成测试,该测试与上一节中看到的非常相似:

@RunWith(SpringRunner.class)

 @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)

 @SpringBootTest(classes = KafkaProducerConsumerApplication.class)

 @DirtiesContext

 public class KafkaTestContainersLiveTest {



 @ClassRule

 public static KafkaContainer kafka =

 new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));



 @Autowired

 private KafkaConsumer consumer;



 @Autowired

 private KafkaProducer producer;



 @Value("${test.topic}")

 private String topic;



 @Test

 public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived()

 throws Exception {

 producer.send(topic, "Sending with own controller");

 consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);



 assertThat(consumer.getLatch().getCount(), equalTo(0L));

 assertThat(consumer.getPayload(), containsString("embedded-test-topic"));

 }

 }

让我们看看这次的差异。我们声明的是kafka字段,它是标准的JUnit @ClassRule该字段是KafkaContainer类的实例,该类将准备和管理运行Kafka的容器的生命周期。

为了避免端口冲突,当我们的Docker容器启动时,Testcontainers会动态分配端口号。因此,我们使用KafkaTestContainersConfiguration类提供自定义的使用者和生产者工厂配置:

@Bean

 public Map<String, Object> consumerConfigs() {

 Map<String, Object> props = new HashMap<>();

 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");

 // more standard configuration

 return props;

 }



 @Bean

 public ProducerFactory<String, String> producerFactory() {

 Map<String, Object> configProps = new HashMap<>();

 configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());

 // more standard configuration

 return new DefaultKafkaProducerFactory<>(configProps);

 }

然后,我们在测试开始时通过@Import批注引用此配置。

原因是我们需要一种将服务器地址注入到应用程序中的方法,如前所述,该地址是动态生成的。我们通过调用getBootstrapServers()方法实现此目的,该方法将返回引导服务器位置:

bootstrap.servers = [PLAINTEXT://localhost:32789]

现在,当我们运行测试时,我们应该看到Testcontainers做以下几件事:

  • 检查我们的本地Docker设置。
  • 必要时confluentinc/cp-kafka:5.4.3 docker映像
  • 启动一个新容器并等待其准备就绪
  • 最后,在测试完成后关闭并删除容器

再次通过检查测试输出来确认:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]

 - Creating container for image: confluentinc/cp-kafka:5.4.3

 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]

 - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]

 - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

快点!使用Kafka docker容器的有效集成测试。

7.结论

在本文中,我们了解了几种使用Spring Boot测试Kafka应用程序的方法。在第一种方法中,我们看到了如何配置和使用本地内存Kafka代理。

然后,我们从测试中看到了如何使用Testcontainers来设置在docker容器内运行的外部Kafka代理。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *