后起之秀Apache Pulsar框架入门
1 Apache Pulsar概述
根据百度百科的定义,Apache Pulsar 是下一代云原生分布式消息流平台,它集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Apache Pulsar 非常灵活,可以解决Apache Kafka 的大部分痛点。Apache Pulsar 可以应用于分布式日志应用场景,也可以应用于纯消息传递系统(如RabbitMQ )场景。
其官网地址为: https://pulsar.apache.org 。官网截图如下:
Apache Pulsar 消息队列的使用场景包括异步处理,应用解耦,流量削锋和消息通讯等场景。Apache Pulsar框架的核心特性如下:
原生支持多集群,可跨集群备份
非常低的消息延迟,小于5ms
支持水平扩展,可扩展到百万级消息主题Topic
支持多种客户端编程语言,如Java、Go、Python、Node.js等
Pulsar Functions提供了原生的流数据处理
多租户支持
持久化的消息存储
2 Apache Pulsar入门
首先可以从官网下载Apache Pulsar二进制安装包,这里我们采用Linux操作系统来进行演示示例的部署,此处部署的就是 Broker ,相当于Server 。打开Linux操作系统的Shell终端命令,执行如下命令进行下载:
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz
当然了,我们也可以手动下载,下载完成后进行解压和切换目录操作:
tar xvfz apache-pulsar-2.8.1-bin.tar.gz
cd apache-pulsar-2.8.1
然后查看目录,信息如下所示:
jack@jackpc:~/kzsoft/apache-pulsar-2.8.1$ ls
bin data instances LICENSE logs README
conf examples lib licenses NOTICE
其中的bin目录包含一些启动命令等,而conf目录包含相关的配置项。比如启动端口等,默认的是6650 。注意:Apache Pulsar依赖于JDK1.8+ ,因此需要首先在操作系统上安装JDK,然后保证操作系统上有超过 2G JVM heap内存空间。且 Broker 仅支持运行在 64 位 操作系统中。
在conf配置文件目录中,查看内部文件如下:
bkenv.sh functions-logging pulsar_env.sh
bookkeeper.conf functions_worker.yml pulsar_tools_env.sh
broker.conf global_zookeeper.conf schema_example.conf
client.conf log4j2-scripts standalone.conf
discovery.conf log4j2.yaml websocket.conf
filesystem_offload_core_site.xml presto zookeeper.conf
functions_log4j2.xml proxy.conf
其中的broker.conf是Broker启动的一些参数,其中端口相关的配置如下:
### --- General broker settings --- ###
# Broker data port
brokerServicePort=6650
# Port to use to server HTTP request
webServicePort=8080
在bin目录下,以单机模式启动Broker,命令如下:
./pulsar standalone
至此,有了Apache Pulsar Broker服务后,可以用Apache Pulsar Client端进行消息的发布和订阅。这里以Java为例,项目为Maven构建的,首先在pom.xml添加客户端依赖性:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.8.1</version>
</dependency>
首先给出消息的生产者定义,核心代码如下所示:
package com.example.demo.pulsar;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
public class PProducer {
public static void main(String[] args) throws Exception{
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.0.21:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("topic-demo")
.create();
while (true){
// 发送消息到指定的broker和topic
producer.send(("Msg from Producer @" +System.currentTimeMillis()).getBytes());
Thread.sleep(1000);
}
}
}
PulsarClient是一个客户端实现类,其中可以通过服务地址来连接到Broker,格式为 pulsar://brokerIP:port 。而Producer为生产者对象,可以创建主题,并通过send方法发送消息。
下面给出消息的消费者定义,核心代码如下所示:
package com.example.demo.pulsar;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
public class PConsumer {
public static void main(String[] args) throws Exception{
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.0.21:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("topic-demo")
.subscriptionName("subscription-demo")
.subscribe();
while (true) {
//获取消息
Message msg = consumer.receive();
try {
System.out.println("Message received: " + new String(msg.getData()));
//向broker确认消息已收到
consumer.acknowledge(msg);
} catch (Exception e) {
//消息处理失败则稍后再试
consumer.negativeAcknowledge(msg);
}
}
}
}
启动二者,通过Broker消息就可以从生产者发送,到消费者消费。Producer发布消息到topic,Consumer可以订阅topic,处理发布过来的消息,在处理完成后发送确认consumer.acknowledge(msg)。Pulsar默认会保留所有的消息,即使Consumer断开连接。 只有在Consumer确认消息被成功处理后,保留下来的消息才会被丢弃。当Consumer 在一定时间内没有成功消费消息,而想再次消费该条消息,那么这个Consumer可以发送一个否定确认(negativeAcknowledge)到Broker,即consumer.negativeAcknowledge(msg)。
IDEA调试如上示例,输出如下所示:
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x979bb5e0, L:/192.168.0.21:63067 - R:/192.168.0.21:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@3e764a42
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [topic-demo][subscription-demo] Received message: 44/811
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] processing message num - 0 in batch
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] enqueued messages in batch. queue size - 0, available queue size - 2147483647
Message received: Msg from Producer @1637018669636
07:24:29.700 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker - [ConsumerBase{subscription='subscription-demo', consumerName='6b929', topic='topic-demo'}] Flushing pending acks to broker: last-cumulative-ack: org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker$LastCumulativeAck@59cde8b3 -- individual-acks: [] -- individual-batch-index-acks: {}
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/192.168.0.21:6650] Received cmd MESSAGE
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x979bb5e0, L:/192.168.0.21:63067 - R:/192.168.0.21:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@3e764a42
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [topic-demo][subscription-demo] Received message: 44/812
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] processing message num - 0 in batch
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] enqueued messages in batch. queue size - 0, available queue size - 2147483647
Message received: Msg from Producer @1637018670640
最后,说明一下,如果Linux安装在Virtual Box虚拟机中,为了可以让其主机访问到此Broker,则需要做一个端口映射,界面如下:
其中的主机端口和子系统端口为Broker的端口6650,而主机IP为宿主机器IP,子系统IP为虚拟机Linux IP。
- 点赞
- 收藏
- 关注作者
评论(0)