Java之消息队列(RabbitMQ与Kafka)

举报
喵手 发表于 2025/09/24 21:47:31 2025/09/24
【摘要】 开篇语哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,...

开篇语

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛

  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。

  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!

前言

消息队列是分布式系统中常用的中间件,用于实现异步通信、解耦、流量削峰等功能。它可以将发送方和接收方的操作异步化,提高系统的可扩展性和容错性。本文将介绍消息队列的基本概念、RabbitMQ和Kafka的使用,特别是Spring Kafka在消息生产和消费中的应用。

1. 消息队列的基本概念

消息队列的定义

消息队列(Message Queue)是一种通信模式,允许不同系统之间通过异步消息的方式进行交互。消息队列本质上是一个存储消息的容器,接收方(消费者)从队列中获取消息并进行处理,而发送方(生产者)将消息推送到队列中。消息队列的核心目的是解耦生产者与消费者,使得两者之间的通信不依赖于实时的交互。

消息队列的工作原理

  1. 生产者(Producer):生产者将消息发送到消息队列中。
  2. 消息队列(Queue):消息队列负责存储消息,保证消息的顺序和可靠性。
  3. 消费者(Consumer):消费者从消息队列中消费消息并进行处理。

消息队列的优势

  • 解耦:生产者和消费者之间不直接通信,降低了系统的耦合度。
  • 异步处理:消息队列支持异步处理,生产者可以快速发送消息而不需要等待消费者的响应。
  • 流量削峰:消息队列缓冲区可以临时存储过载的消息,防止系统因流量突增而崩溃。
  • 可靠性:消息队列提供消息确认和持久化机制,确保消息不会丢失。

常见的消息队列中间件有RabbitMQ、Kafka、ActiveMQ等。


2. RabbitMQ的配置与使用

RabbitMQ概述

RabbitMQ是一个开源的消息代理,它实现了AMQP(Advanced Message Queuing Protocol)协议,广泛用于消息的异步处理。RabbitMQ支持多种消息传递模式,如点对点(Queue)、发布/订阅(Exchange)等。

RabbitMQ的基本组件

  • Producer(生产者):消息的生产者,发送消息到队列。
  • Queue(队列):消息存储的地方,消费者从队列中获取消息进行处理。
  • Consumer(消费者):消息的消费者,从队列中获取消息并进行处理。
  • Exchange(交换机):接收生产者发送的消息,并根据规则将消息路由到队列中。

RabbitMQ的使用

1. 安装与启动

  • 安装:可以从[RabbitMQ官网]下载并安装RabbitMQ。
  • 启动:通过命令行或管理界面启动RabbitMQ服务,默认管理界面地址为http://localhost:15672/,用户名和密码默认为guest

2. 配置与生产者代码示例

使用RabbitMQ时,生产者将消息发送到队列,消费者从队列中接收消息并处理。

生产者代码示例:

import com.rabbitmq.client.*;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello RabbitMQ!";
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("Sent: " + message);
        }
    }
}

3. 消费者代码示例

消费者代码示例:

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("Waiting for messages. To exit press Ctrl+C");

            // 创建一个消费者并接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received: " + message);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

3. Kafka的基本概念与架构

Kafka概述

Kafka是一个分布式的、分布式日志存储系统,它主要用于高吞吐量、高可用性的消息队列应用。Kafka最初由LinkedIn开发,后来成为Apache开源项目。与RabbitMQ不同,Kafka的设计目标是以高吞吐量的日志消息流处理为核心,适用于大规模实时数据流处理。

Kafka的架构

Kafka的架构可以分为以下几个核心组件:

  • Producer(生产者):负责将消息发布到Kafka集群的特定主题(Topic)中。
  • Consumer(消费者):负责从Kafka的主题中读取消息进行处理。
  • Broker(代理):Kafka集群的核心组件,负责接收和存储消息。Kafka集群通常由多个Broker组成。
  • Topic(主题):消息的分类,一个Kafka集群可以有多个主题,每个主题包含多个分区(Partition)。
  • Zookeeper:Kafka使用Zookeeper来管理集群的元数据和成员状态。

Kafka的基本使用

1. 配置与生产者代码示例

Kafka的生产者负责将消息发送到指定的主题。

生产者代码示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        String topic = "test";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        String message = "Hello Kafka!";
        producer.send(new ProducerRecord<>(topic, message));
        System.out.println("Sent message: " + message);
        producer.close();
    }
}

2. 消费者代码示例

Kafka的消费者从指定的主题中消费消息。

消费者代码示例:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.Collections;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        String topic = "test";
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            consumer.poll(100).forEach(record -> {
                System.out.println("Received message: " + record.value());
            });
        }
    }
}

4. Spring Kafka与消息的生产与消费

Spring Kafka概述

Spring Kafka是Spring框架提供的一个项目,它简化了与Kafka的集成,提供了Kafka生产者和消费者的支持,使得开发者可以更加方便地使用Kafka。

Spring Kafka的配置与使用

在Spring应用中使用Kafka时,首先需要配置生产者和消费者。Spring Kafka通过@KafkaListener注解简化了消费者的配置,而生产者通过KafkaTemplate进行消息发送。

1. 配置Spring Kafka

pom.xml中添加Spring Kafka的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

2. 生产者配置与使用

Producer配置:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

3. 消费者配置与使用

Consumer配置:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test", groupId = "test-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

4. 配置文件

application.ymlapplication.properties中配置Kafka连接:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

总结

消息队列(如RabbitMQ和Kafka)是现代分布式系统中的重要组成部分,能够有效地解耦应用程序的组件,确保系统的高可用性和可扩展性。RabbitMQ适用于高性能的消息队列和传统的企业应用场景,而Kafka则适用于大数据和实时流处理场景。

通过Spring Kafka,可以轻松地将Kafka与Spring应用集成,提供更高效、可靠的消息处理能力。掌握消息队列的使用和配置,有助于开发更加高效、可伸缩的分布式系统。

… …

文末

好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。

… …

学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!

wished for you successed !!!


⭐️若喜欢我,就请关注我叭。

⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。


版权声明:本文由作者原创,转载请注明出处,谢谢支持!

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。