1.概述
Apache Kafka是一个功能强大的分布式容错流处理系统。
在本教程中,学习如何编写不依赖于运行外部Kafka服务器的可靠,独立的集成测试**。
首先,我们将开始研究如何使用和配置Kafka的嵌入式实例。然后,我们将看到如何利用测试中流行的框架Testcontainers 。
2.依存关系
当然,我们需要将标准的spring-kafka
依赖项添加到我们的pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.3.RELEASE</version>
</dependency>
然后,我们将需要另外两个依赖项专门用于我们的测试。首先,我们将添加spring-kafka-test
工件:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
最后,我们将添加Testcontainers Kafka依赖项,该依赖项也可以在Maven Central上使用:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.0</version>
<scope>test</scope>
</dependency>
现在我们已经配置了所有必需的依赖项,我们可以使用Kafka编写一个简单的Spring Boot应用程序。
3.一个简单的Kafka生产者-消费者应用程序
在整个教程中,我们测试的重点将是一个简单的生产者-消费者Spring Boot Kafka应用程序。
让我们从定义应用程序入口点开始:
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
如我们所见,这是一个标准的Spring Boot应用程序。在可能的情况下,我们希望使用默认配置值。考虑到这一点,我们利用@EnableAutoConfiguration
批注自动配置应用程序。
3.1。生产者设置
接下来,让我们考虑一个生产者bean,我们将使用它将消息发送到给定的Kafka主题:
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
上面定义的我们的KafkaProducer
bean仅仅是KafkaTemplate
类的包装。此类提供高级线程安全操作,例如将数据发送到所提供的主题,这正是我们在send
方法中所做的。
3.2。消费者设置
同样,我们现在将定义一个简单的消费者Bean,它将监听Kafka主题并接收消息:
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
}
我们的简单使用者在receive
方法上使用@KafkaListener
批注来侦听有关给定主题的消息。稍后我们将看到如何从测试中配置test.topic
。
此外,receive方法将消息内容存储在我们的bean中,并减少latch
变量的计数。此变量是一个简单的线程安全计数器字段,稍后我们将在测试中使用该字段,以确保我们成功接收到message 。
现在,我们已经使用Spring Boot实现了简单的Kafka应用程序,让我们看看如何编写集成测试。
4.关于测试的话
通常,在编写简洁的集成测试时,我们不应依赖于我们可能无法控制或突然停止工作的外部服务。这可能会对我们的测试结果产生不利影响。
同样,如果我们依赖外部服务(在这种情况下为运行中的Kafka经纪人),则可能无法按照我们希望通过测试的方式对其进行设置,控制和拆除。
4.1。应用属性
我们将从测试中使用一组非常简单的应用程序配置属性。我们将在src/test/resources/application.yml
文件中定义以下属性:
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
这是使用Kafka嵌入式实例或本地代理时所需的最少属性集。
其中大多数是不言自明的,但是我们应该特别强调的一个是消费者财产的auto-offset-reset: earliest
。此属性可确保我们的使用者组获取我们发送的消息,因为容器可能在发送完成后启动。
另外,我们为主题属性配置了值embedded-test-topic
,这是我们将在测试中使用的主题。
5.使用嵌入式Kafka进行测试
在本节中,我们将研究如何使用内存中的Kafka实例对我们的测试进行测试。这也称为嵌入式Kafka。
我们之前添加的依赖项spring-kafka-test
包含一些有用的实用程序,以帮助测试我们的应用程序。最值得注意的是,它包含EmbeddedKafkaBroker
类。
考虑到这一点,让我们继续编写我们的第一个集成测试:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
让我们来看一下测试的关键部分。首先,我们先用两个漂亮的标准Spring注释装饰测试类:
-
@SpringBootTest
批注将确保我们的测试引导Spring应用程序上下文 - 我们还使用
@DirtiesContext
批注,以确保清除此上下文并在不同测试之间重置
至关重要的部分到了,我们使用@EmbeddedKafka
批注将EmbeddedKafkaBroker
的实例注入我们的测试中。此外,我们可以使用一些属性来配置嵌入式Kafka节点:
-
partitions
–这是每个主题使用的分区数。为了使事情变得简单明了,我们只希望在测试中使用一个 -
brokerProperties
– Kafka经纪人的其他属性。同样,我们保持简单,并指定纯文本侦听器和端口号
接下来,我们自动连接consumer
和producer
类,并配置一个主题以使用application.properties
的值。
对于拼图的最后一部分,我们仅向测试主题发送一条消息,并验证是否已收到该消息并包含测试主题的名称。
运行测试时,我们将在冗长的Spring输出中看到:
...
12:45:35.099 [main] INFO cbkafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO cbkafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
这确认我们的测试正常运行。太棒了!现在,我们有一种使用内存中的Kafka代理编写独立的,独立的集成测试的方法。
6.使用TestContainers测试Kafka
有时,我们可能会看到真正的外部服务与专门为测试目的提供的服务的嵌入式内存实例之间的细微差异。尽管不太可能,但也有可能是我们测试中使用的端口被占用,从而导致故障。
考虑到这一点,在本节中,我们将看到我们以前使用Testcontainers框架进行测试的方法的一种变化。我们将通过集成测试了解如何实例化和管理托管在Docker容器中的外部Apache Kafka代理。
让我们定义另一个集成测试,该测试与上一节中看到的非常相似:
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
让我们看看这次的差异。我们声明的是kafka
字段,它是标准的JUnit @ClassRule
。该字段是KafkaContainer
类的实例,该类将准备和管理运行Kafka的容器的生命周期。
为了避免端口冲突,当我们的Docker容器启动时,Testcontainers会动态分配端口号。因此,我们使用KafkaTestContainersConfiguration
类提供自定义的使用者和生产者工厂配置:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// more standard configuration
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// more standard configuration
return new DefaultKafkaProducerFactory<>(configProps);
}
然后,我们在测试开始时通过@Import
批注引用此配置。
原因是我们需要一种将服务器地址注入到应用程序中的方法,如前所述,该地址是动态生成的。我们通过调用getBootstrapServers()
方法实现此目的,该方法将返回引导服务器位置:
bootstrap.servers = [PLAINTEXT://localhost:32789]
现在,当我们运行测试时,我们应该看到Testcontainers做以下几件事:
- 检查我们的本地Docker设置。
- 必要时
confluentinc/cp-kafka:5.4.3
docker映像 - 启动一个新容器并等待其准备就绪
- 最后,在测试完成后关闭并删除容器
再次通过检查测试输出来确认:
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
快点!使用Kafka docker容器的有效集成测试。
7.结论
在本文中,我们了解了几种使用Spring Boot测试Kafka应用程序的方法。在第一种方法中,我们看到了如何配置和使用本地内存Kafka代理。
然后,我们从测试中看到了如何使用Testcontainers来设置在docker容器内运行的外部Kafka代理。
0 评论