Testando Kafka 没有 Spring Boot com 测试容器

举报
千锋教育 发表于 2023/07/05 18:49:02 2023/07/05
【摘要】 我们来测试 Spring Boot 在信息系统中使用 Apache Kafka 的应用。为了保证测试集成期间的相关功能,请使用图书馆测试容器,不允许使用测试自动化和隔离环境。Apache Kafka是一个流行的流媒体发行平台,以真实的节奏传播信息。使用 Kafka 的应用程序将全面验证系统组件的功能和功能,并在集成测试期间进行基本验证。[Testcontainers ( https://ja...

企业微信截图_20230705175547.jpg

我们来测试 Spring Boot 在信息系统中使用 Apache Kafka 的应用。为了保证测试集成期间的相关功能,请使用图书馆测试容器,不允许使用测试自动化和隔离环境。

Apache Kafka是一个流行的流媒体发行平台,以真实的节奏传播信息。使用 Kafka 的应用程序将全面验证系统组件的功能和功能,并在集成测试期间进行基本验证。

[Testcontainers ( https://java.testcontainers.org/test_framework_integration/unit_5/ ) 是 Java 的参考书目,它简化了 Docker 测试自动化的测试。该设备可方便地配置测试环境,确保产品环境的复制和隔离。

Docker 安装前的重要事项。接下来,请注意,需要依赖于 arquivo pom.xml do seu projeto:

<dependencies>
    <!-- Dependências do Spring Boot e Kafka -->
    <!-- ... -->
    <!-- Dependência do Testcontainers para Kafka -->
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.16.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>

Criando o teste com Testcontainers

Agora,我们对 Spring Boot 应用程序的集成测试进行了验证,并提供了与 Kafka 主题相关的连接和环境信息。

消费者和生产者的通用类 Começaremos criando 类

...
@Component
public class KafkaConsumer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

  private CountDownLatch latch = new CountDownLatch(1);

  private String payload;

  @KafkaListener(topics = "${test.topic}")
  public void receive(ConsumerRecord<?, ?> consumerRecord) {
    LOGGER.info("received payload='{}'", consumerRecord.toString());

    payload = consumerRecord.toString();
    latch.countDown();
  }

  public CountDownLatch getLatch() {
    return latch;
  }

  public void resetLatch() {
    latch = new CountDownLatch(1);
  }

  public String getPayload() {
    return payload;
  }
}


...

@Component
public class KafkaProducer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

  @Autowired private KafkaTemplate<String, String> kafkaTemplate;

  public void send(String topic, String payload) {
    LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
    kafkaTemplate.send(topic, payload);
  }
}


徽标是KafkaProducerIntegrationTest测试类和注释 com @RunWith(SpringRunner.class)@SpringBootTest。Spring Boot 测试上下文的配置:

...
@RunWith(SpringRunner.class)
@Import(com.fantinel.kafka.KafkaProducerIntegrationTest.KafkaTestContainerConfiguration.class)
@SpringBootTest(classes = Application.class)
@DirtiesContext
public class KafkaProducerIntegrationTest {

  @ClassRule
  public static KafkaContainer kafka =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

  @Autowired public KafkaTemplate<String, String> template;

  @Autowired private KafkaConsumer consumer;

  @Autowired private KafkaProducer producer;

  @Value("${test.topic}")
  private String topic;

  @Before
  public void setup() {
    consumer.resetLatch();
  }

  @Test
  public void givenKafkaDockerContainer_whenSendingMessage_thenMessageReceived() throws Exception {
    String data = "Hello DevTo";

    producer.send(topic, data);

    boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
    assertTrue(messageConsumed);
    assertThat(consumer.getPayload(), containsString(data));
  }

  @TestConfiguration
  static class KafkaTestContainerConfiguration {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "devto");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
      Map<String, Object> configProps = new HashMap<>();
      configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
      configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }
  }
}


目前,请注意作为消费者配置和主题的应用程序.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: dev-to-test-group
test:
  topic: dev-to-test-topic


没有示例,重要的是必需的类,包括 Kafka 类、JUnit 测试类和 Spring Boot 类。接下来,定义@RunWith(SpringRunner.class)来测试使用 Spring Runner 的执行情况,并允许在 Spring 上下文中进行初始化。

集成测试是在 KafkaProducerIntegrationTest 类中定义的。Primeiro,定义了 Kafka 使用或测试容器的容器。Kafka 容器的初始值为“confluenceinc/cp kafka:7.4.0”。该配置允许在 Kafka 环境中测试执行 Docker 容器的全部功能。

接下来,请注意 @Autowired 自动注入KafkaTemplateKafkaConsumerKafkaProducer所需的测试。Kafka 主题的命名与@Value("${test.topic}") 的具体说明相同。

前面的测试方法是setup() 和重新定义锁的执行方法。准备好消费者很快就会收到的信息。

集成测试集会很快就会执行。测试容器是在执行测试期间使用 Kafka 的容器 Docker 存储的。

您可以在 IDE 中执行或测试直接操作,或使用命令 mvn 测试。在执行过程中,您可以使用 Kafka 发送的日志进行操作。

结论

总结一下,测试容器有助于集成 Spring Boot 应用程序和 Apache Kafka 的测试。隔离测试环境和自动化,可以保证系统组件的功能和相关问题以及植入应用程序集成前的识别问题。

更多Java相关精彩内容,B站搜索“千锋教育”

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。