Java之消息队列(RabbitMQ与Kafka)
开篇语
哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛
今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。
我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!
前言
消息队列是分布式系统中常用的中间件,用于实现异步通信、解耦、流量削峰等功能。它可以将发送方和接收方的操作异步化,提高系统的可扩展性和容错性。本文将介绍消息队列的基本概念、RabbitMQ和Kafka的使用,特别是Spring Kafka在消息生产和消费中的应用。
1. 消息队列的基本概念
消息队列的定义
消息队列(Message Queue)是一种通信模式,允许不同系统之间通过异步消息的方式进行交互。消息队列本质上是一个存储消息的容器,接收方(消费者)从队列中获取消息并进行处理,而发送方(生产者)将消息推送到队列中。消息队列的核心目的是解耦生产者与消费者,使得两者之间的通信不依赖于实时的交互。
消息队列的工作原理
- 生产者(Producer):生产者将消息发送到消息队列中。
- 消息队列(Queue):消息队列负责存储消息,保证消息的顺序和可靠性。
- 消费者(Consumer):消费者从消息队列中消费消息并进行处理。
消息队列的优势
- 解耦:生产者和消费者之间不直接通信,降低了系统的耦合度。
- 异步处理:消息队列支持异步处理,生产者可以快速发送消息而不需要等待消费者的响应。
- 流量削峰:消息队列缓冲区可以临时存储过载的消息,防止系统因流量突增而崩溃。
- 可靠性:消息队列提供消息确认和持久化机制,确保消息不会丢失。
常见的消息队列中间件有RabbitMQ、Kafka、ActiveMQ等。
2. RabbitMQ的配置与使用
RabbitMQ概述
RabbitMQ是一个开源的消息代理,它实现了AMQP(Advanced Message Queuing Protocol)协议,广泛用于消息的异步处理。RabbitMQ支持多种消息传递模式,如点对点(Queue)、发布/订阅(Exchange)等。
RabbitMQ的基本组件
- Producer(生产者):消息的生产者,发送消息到队列。
- Queue(队列):消息存储的地方,消费者从队列中获取消息进行处理。
- Consumer(消费者):消息的消费者,从队列中获取消息并进行处理。
- Exchange(交换机):接收生产者发送的消息,并根据规则将消息路由到队列中。
RabbitMQ的使用
1. 安装与启动
- 安装:可以从[RabbitMQ官网]下载并安装RabbitMQ。
- 启动:通过命令行或管理界面启动RabbitMQ服务,默认管理界面地址为
http://localhost:15672/
,用户名和密码默认为guest
。
2. 配置与生产者代码示例
使用RabbitMQ时,生产者将消息发送到队列,消费者从队列中接收消息并处理。
生产者代码示例:
import com.rabbitmq.client.*;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Sent: " + message);
}
}
}
3. 消费者代码示例
消费者代码示例:
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages. To exit press Ctrl+C");
// 创建一个消费者并接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
3. Kafka的基本概念与架构
Kafka概述
Kafka是一个分布式的、分布式日志存储系统,它主要用于高吞吐量、高可用性的消息队列应用。Kafka最初由LinkedIn开发,后来成为Apache开源项目。与RabbitMQ不同,Kafka的设计目标是以高吞吐量的日志消息流处理为核心,适用于大规模实时数据流处理。
Kafka的架构
Kafka的架构可以分为以下几个核心组件:
- Producer(生产者):负责将消息发布到Kafka集群的特定主题(Topic)中。
- Consumer(消费者):负责从Kafka的主题中读取消息进行处理。
- Broker(代理):Kafka集群的核心组件,负责接收和存储消息。Kafka集群通常由多个Broker组成。
- Topic(主题):消息的分类,一个Kafka集群可以有多个主题,每个主题包含多个分区(Partition)。
- Zookeeper:Kafka使用Zookeeper来管理集群的元数据和成员状态。
Kafka的基本使用
1. 配置与生产者代码示例
Kafka的生产者负责将消息发送到指定的主题。
生产者代码示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
String topic = "test";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String message = "Hello Kafka!";
producer.send(new ProducerRecord<>(topic, message));
System.out.println("Sent message: " + message);
producer.close();
}
}
2. 消费者代码示例
Kafka的消费者从指定的主题中消费消息。
消费者代码示例:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.Collections;
public class KafkaConsumerExample {
public static void main(String[] args) {
String topic = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
consumer.poll(100).forEach(record -> {
System.out.println("Received message: " + record.value());
});
}
}
}
4. Spring Kafka与消息的生产与消费
Spring Kafka概述
Spring Kafka是Spring框架提供的一个项目,它简化了与Kafka的集成,提供了Kafka生产者和消费者的支持,使得开发者可以更加方便地使用Kafka。
Spring Kafka的配置与使用
在Spring应用中使用Kafka时,首先需要配置生产者和消费者。Spring Kafka通过@KafkaListener
注解简化了消费者的配置,而生产者通过KafkaTemplate
进行消息发送。
1. 配置Spring Kafka
pom.xml中添加Spring Kafka的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.0</version>
</dependency>
2. 生产者配置与使用
Producer配置:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
3. 消费者配置与使用
Consumer配置:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
4. 配置文件
在application.yml
或application.properties
中配置Kafka连接:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
总结
消息队列(如RabbitMQ和Kafka)是现代分布式系统中的重要组成部分,能够有效地解耦应用程序的组件,确保系统的高可用性和可扩展性。RabbitMQ适用于高性能的消息队列和传统的企业应用场景,而Kafka则适用于大数据和实时流处理场景。
通过Spring Kafka,可以轻松地将Kafka与Spring应用集成,提供更高效、可靠的消息处理能力。掌握消息队列的使用和配置,有助于开发更加高效、可伸缩的分布式系统。
… …
文末
好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。
… …
学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!
wished for you successed !!!
⭐️若喜欢我,就请关注我叭。
⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。
版权声明:本文由作者原创,转载请注明出处,谢谢支持!
- 点赞
- 收藏
- 关注作者
评论(0)