Apache Pulsar C#客户端程序入门

举报
jackwangcumt 发表于 2021/11/18 07:55:26 2021/11/18
【摘要】 Apache Pulsar 支持多种客户端编程语言,如Java、Go、Python、Node.js和C#等。下面以C#为例,讲解如何构建Apache Pulsar 客户端消息订阅发布程序。

1 Apache Pulsar概述


      根据百度百科的定义,Apache Pulsar 是下一代云原生分布式消息流平台,它集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Apache Pulsar 非常灵活,可以解决Apache Kafka 的大部分痛点。

       关于Apache Pulsar 的安装过程可参考之前的博文《后起之秀Apache Pulsar框架入门》,这篇博文会讲解如何在Linux操作系统上安装Apache Pulsar Broker ,而客户端程序通过Broker 进行消息中转。因此,本文所述的示例,则必须以成功安装和启动Apache Pulsar Broker 为前提。Apache Pulsar 支持多种客户端编程语言,如Java、Go、Python、Node.js和C#等。下面以C#为例,讲解如何构建Apache Pulsar 客户端消息订阅发布程序。

2 前提条件


      本示例依赖于如下前置条件:1)Visual Studio 2019社区版;2)Apache Pulsar Broker 。 关于Visual Studio 2019社区版的安装,这里不再赘述。首先用Visual Studio 2019社区版新建一个.NET5版本的控制台程序。并添加依赖项:DotPulsar 。同时添加消息生产者和消费者的类文件,项目结构示例如下:

7.jpg

3 Apache Pulsar C#客户端


      本示例使用的Pulsar C# 客户端 ,即DotPulsar,可以在 C# 中创建生产者和消费者。这里需要注意的是: 根据官网的说法,C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的。 具体可以参考官网地址: https://pulsar.apache.org/docs/zh-CN/client-libraries-dotnet/ 。

      下面给出消费者(PConsumer)核心代码如下:

using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using DotPulsar.Internal;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace PulsarConsoleApp
{
    public class PConsumer
    {
        public async Task RunAsync()
        {
            var client = PulsarClient.Builder()
                .ServiceUrl(new Uri("pulsar://192.168.0.21:6650"))
                .RetryInterval(TimeSpan.FromSeconds(3))
                .Build();

         
            var consumer = client.NewConsumer()
                      .StateChangedHandler(PMonitor)
                      .Topic("topic-demo")
                      .SubscriptionName("subscription-demo")
                      .Create();

            try
            {
                await foreach (var message in consumer.Messages())
                {
                    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Value().ToArray()));
                    await consumer.Acknowledge(message);
                }
            }
            catch (Exception ex) 
            {
                Console.WriteLine(ex.Message);
            }

        }

        //状态监控
        private void PMonitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
        {
            var stateMessage = stateChanged.ConsumerState switch
            {
                ConsumerState.Active => "is active",
                ConsumerState.Inactive => "is inactive",
                ConsumerState.Disconnected => "is disconnected",
                ConsumerState.Closed => "has closed",
                ConsumerState.ReachedEndOfTopic => "has reached end of topic",
                ConsumerState.Faulted => "has faulted",
                _ => $"has an unknown state '{stateChanged.ConsumerState}'"
            };

            var topic = stateChanged.Consumer.Topic;
            Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
        }

    }
}

PulsarClient使用 Builder 创建一个 Pulsar C# 客户端,ServiceUrl设置 Pulsar 集群的服务URL。RetryInterval设置重试操作或重新连接之前的等待时间。

下面给出生产者(PProducer)核心代码如下:

using DotPulsar;
using DotPulsar.Extensions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace PulsarConsoleApp
{
    //C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的
    public class PProducer
    {
        public void Run()
        {
            var client = PulsarClient.Builder()
                .ServiceUrl(new Uri("pulsar://192.168.0.21:6650"))
                .RetryInterval(TimeSpan.FromSeconds(3))
                .Build();

            var producer = client.NewProducer()
                     .StateChangedHandler(PMonitor) //绑定监听方法
                     .Topic("topic-demo")
                     .Create();

            while (true)
            {
                var data = Encoding.UTF8.GetBytes("Msg from C# Producer @"+ System.DateTime.Now.ToString());
                 producer.Send(data);
                Thread.Sleep(1000);
            }
          
        }

       //状态监控
        private void PMonitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
        {
            var stateMessage = stateChanged.ProducerState switch
            {
                ProducerState.Connected => "is connected",
                ProducerState.Disconnected => "is disconnected",
                ProducerState.PartiallyConnected => "is partially connected",
                ProducerState.Closed => "has closed",
                ProducerState.Faulted => "has faulted",
                _ => $"has an unknown state '{stateChanged.ProducerState}'"
            };

            var topic = stateChanged.Producer.Topic;
            Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
        }

    }
}

生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到Broker 上。因此,生产者需要通过PulsarClient的ServiceUrl来设置 Pulsar 集群的服务URL。同时需要指定当前消息的主题。

最后给出启动示例的核心代码:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace PulsarConsoleApp
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            Thread thread = new Thread(new ThreadStart(startProducer));
            thread.Start();
            await startConsumer();
            Console.WriteLine("====================");
        }

        private static void startProducer()
        {
            PProducer pProducer = new PProducer();
            pProducer.Run();
        }

        private static async Task startConsumer()
        {
            //异步方法
            PConsumer pConsumer = new PConsumer();
            await pConsumer.RunAsync();
        }
    }
}

这里采用Thead开启另外一个线程的原因是,生产者PProducer会阻塞当前线程,导致消费者PConsumer无法启动,而不能演示消息的发布和消费功能。启动后,输出如下所示:

8.jpg

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。