Testando Kafka 没有 Spring Boot com 测试容器
我们来测试 Spring Boot 在信息系统中使用 Apache Kafka 的应用。为了保证测试集成期间的相关功能,请使用图书馆测试容器,不允许使用测试自动化和隔离环境。
Apache Kafka是一个流行的流媒体发行平台,以真实的节奏传播信息。使用 Kafka 的应用程序将全面验证系统组件的功能和功能,并在集成测试期间进行基本验证。
[Testcontainers ( https://java.testcontainers.org/test_framework_integration/unit_5/ ) 是 Java 的参考书目,它简化了 Docker 测试自动化的测试。该设备可方便地配置测试环境,确保产品环境的复制和隔离。
Docker 安装前的重要事项。接下来,请注意,需要依赖于 arquivo pom.xml do seu projeto:
<dependencies>
<!-- Dependências do Spring Boot e Kafka -->
<!-- ... -->
<!-- Dependência do Testcontainers para Kafka -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Criando o teste com Testcontainers
Agora,我们对 Spring Boot 应用程序的集成测试进行了验证,并提供了与 Kafka 主题相关的连接和环境信息。
消费者和生产者的通用类 Começaremos criando 类
...
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
public String getPayload() {
return payload;
}
}
...
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
徽标是KafkaProducerIntegrationTest测试类和注释 com @RunWith(SpringRunner.class)和@SpringBootTest。Spring Boot 测试上下文的配置:
...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
@Autowired public KafkaTemplate<String, String> template;
@Autowired private KafkaConsumer consumer;
@Autowired private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Before
public void setup() {
consumer.resetLatch();
}
@Test
public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
String data = "Hello DevTo";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
@TestConfiguration
static class KafkaTestContainerConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
}
目前,请注意作为消费者配置和主题的应用程序.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: dev-to-test-group
test:
topic: dev-to-test-topic
没有示例,重要的是必需的类,包括 Kafka 类、JUnit 测试类和 Spring Boot 类。接下来,定义@RunWith(SpringRunner.class)来测试使用 Spring Runner 的执行情况,并允许在 Spring 上下文中进行初始化。
集成测试是在 KafkaProducerIntegrationTest 类中定义的。Primeiro,定义了 Kafka 使用或测试容器的容器。Kafka 容器的初始值为“confluenceinc/cp kafka:7.4.0”。该配置允许在 Kafka 环境中测试执行 Docker 容器的全部功能。
接下来,请注意 @Autowired 自动注入KafkaTemplate、KafkaConsumer和KafkaProducer所需的测试。Kafka 主题的命名与@Value("${test.topic}") 的具体说明相同。
前面的测试方法是setup() 和重新定义锁的执行方法。准备好消费者很快就会收到的信息。
集成测试集会很快就会执行。测试容器是在执行测试期间使用 Kafka 的容器 Docker 存储的。
您可以在 IDE 中执行或测试直接操作,或使用命令 mvn 测试。在执行过程中,您可以使用 Kafka 发送的日志进行操作。
结论
总结一下,测试容器有助于集成 Spring Boot 应用程序和 Apache Kafka 的测试。隔离测试环境和自动化,可以保证系统组件的功能和相关问题以及植入应用程序集成前的识别问题。
更多Java相关精彩内容,B站搜索“千锋教育”
- 点赞
- 收藏
- 关注作者
评论(0)