后起之秀Apache Pulsar框架入门

举报
jackwangcumt 发表于 2021/11/16 12:21:10 2021/11/16
【摘要】 Apache Pulsar 是下一代云原生分布式消息流平台,它集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Apache Pulsar 非常灵活,可以解决Apache Kafka 的大部分痛点。

1 Apache Pulsar概述


      根据百度百科的定义,Apache Pulsar 是下一代云原生分布式消息流平台,它集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Apache Pulsar 非常灵活,可以解决Apache Kafka 的大部分痛点。Apache Pulsar 可以应用于分布式日志应用场景,也可以应用于纯消息传递系统(如RabbitMQ )场景。

     其官网地址为: https://pulsar.apache.org 。官网截图如下:

1.jpg

Apache Pulsar 消息队列的使用场景包括异步处理,应用解耦,流量削锋和消息通讯等场景。Apache Pulsar框架的核心特性如下:

原生支持多集群,可跨集群备份
非常低的消息延迟,小于5ms
支持水平扩展,可扩展到百万级消息主题Topic
支持多种客户端编程语言,如Java、Go、Python、Node.js等
Pulsar Functions提供了原生的流数据处理
多租户支持
持久化的消息存储

2 Apache Pulsar入门


      首先可以从官网下载Apache Pulsar二进制安装包,这里我们采用Linux操作系统来进行演示示例的部署,此处部署的就是 Broker ,相当于Server 。打开Linux操作系统的Shell终端命令,执行如下命令进行下载:

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.1/apache-pulsar-2.8.1-bin.tar.gz

     当然了,我们也可以手动下载,下载完成后进行解压和切换目录操作:

tar xvfz apache-pulsar-2.8.1-bin.tar.gz
cd apache-pulsar-2.8.1

     然后查看目录,信息如下所示:

jack@jackpc:~/kzsoft/apache-pulsar-2.8.1$ ls
bin   data      instances  LICENSE   logs    README
conf  examples  lib        licenses  NOTICE

    其中的bin目录包含一些启动命令等,而conf目录包含相关的配置项。比如启动端口等,默认的是6650 。注意:Apache Pulsar依赖于JDK1.8+ ,因此需要首先在操作系统上安装JDK,然后保证操作系统上有超过 2G JVM heap内存空间。且 Broker 仅支持运行在 64 位 操作系统中。 

    在conf配置文件目录中,查看内部文件如下:

bkenv.sh                          functions-logging      pulsar_env.sh
bookkeeper.conf                   functions_worker.yml   pulsar_tools_env.sh
broker.conf                       global_zookeeper.conf  schema_example.conf
client.conf                       log4j2-scripts         standalone.conf
discovery.conf                    log4j2.yaml            websocket.conf
filesystem_offload_core_site.xml  presto                 zookeeper.conf
functions_log4j2.xml              proxy.conf

其中的broker.conf是Broker启动的一些参数,其中端口相关的配置如下:

### --- General broker settings --- ###
# Broker data port
brokerServicePort=6650

# Port to use to server HTTP request
webServicePort=8080

在bin目录下,以单机模式启动Broker,命令如下:

./pulsar standalone

至此,有了Apache Pulsar Broker服务后,可以用Apache Pulsar Client端进行消息的发布和订阅。这里以Java为例,项目为Maven构建的,首先在pom.xml添加客户端依赖性:

<dependency>
	<groupId>org.apache.pulsar</groupId>
	<artifactId>pulsar-client</artifactId>
	<version>2.8.1</version>
</dependency>

首先给出消息的生产者定义,核心代码如下所示:

package com.example.demo.pulsar;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;

public class PProducer {

    public static void main(String[] args) throws Exception{

        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://192.168.0.21:6650")
                .build();

        Producer<byte[]> producer = client.newProducer()
                .topic("topic-demo")
                .create();

        while (true){
            // 发送消息到指定的broker和topic
            producer.send(("Msg from Producer @" +System.currentTimeMillis()).getBytes());
            Thread.sleep(1000);
        }
    }
}

PulsarClient是一个客户端实现类,其中可以通过服务地址来连接到Broker,格式为 pulsar://brokerIP:port 。而Producer为生产者对象,可以创建主题,并通过send方法发送消息。

下面给出消息的消费者定义,核心代码如下所示:

package com.example.demo.pulsar;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;

public class PConsumer {

    public static void main(String[] args) throws Exception{
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://192.168.0.21:6650")
                .build();

        Consumer consumer = client.newConsumer()
                .topic("topic-demo")
                .subscriptionName("subscription-demo")
                .subscribe();

        while (true) {
            //获取消息
            Message msg = consumer.receive();
            try {

                System.out.println("Message received: " + new String(msg.getData()));
                //向broker确认消息已收到
                consumer.acknowledge(msg);
            } catch (Exception e) {
                //消息处理失败则稍后再试
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

启动二者,通过Broker消息就可以从生产者发送,到消费者消费。Producer发布消息到topic,Consumer可以订阅topic,处理发布过来的消息,在处理完成后发送确认consumer.acknowledge(msg)。Pulsar默认会保留所有的消息,即使Consumer断开连接。 只有在Consumer确认消息被成功处理后,保留下来的消息才会被丢弃。当Consumer 在一定时间内没有成功消费消息,而想再次消费该条消息,那么这个Consumer可以发送一个否定确认(negativeAcknowledge)到Broker,即consumer.negativeAcknowledge(msg)。

IDEA调试如上示例,输出如下所示:

07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x979bb5e0, L:/192.168.0.21:63067 - R:/192.168.0.21:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@3e764a42
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [topic-demo][subscription-demo] Received message: 44/811
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] processing message num - 0 in batch
07:24:29.639 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] enqueued messages in batch. queue size - 0, available queue size - 2147483647
Message received: Msg from Producer @1637018669636
07:24:29.700 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker - [ConsumerBase{subscription='subscription-demo', consumerName='6b929', topic='topic-demo'}] Flushing pending acks to broker: last-cumulative-ack: org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker$LastCumulativeAck@59cde8b3 -- individual-acks: [] -- individual-batch-index-acks: {}
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [/192.168.0.21:6650] Received cmd MESSAGE
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0x979bb5e0, L:/192.168.0.21:63067 - R:/192.168.0.21:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@3e764a42
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [topic-demo][subscription-demo] Received message: 44/812
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] processing message num - 0 in batch
07:24:30.643 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [subscription-demo] [6b929] enqueued messages in batch. queue size - 0, available queue size - 2147483647
Message received: Msg from Producer @1637018670640

最后,说明一下,如果Linux安装在Virtual Box虚拟机中,为了可以让其主机访问到此Broker,则需要做一个端口映射,界面如下:

3.jpg

2.jpg

其中的主机端口和子系统端口为Broker的端口6650,而主机IP为宿主机器IP,子系统IP为虚拟机Linux IP。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。