Apache Pulsar C#客户端程序入门
1 Apache Pulsar概述
根据百度百科的定义,Apache Pulsar 是下一代云原生分布式消息流平台,它集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Apache Pulsar 非常灵活,可以解决Apache Kafka 的大部分痛点。
关于Apache Pulsar 的安装过程可参考之前的博文《后起之秀Apache Pulsar框架入门》,这篇博文会讲解如何在Linux操作系统上安装Apache Pulsar Broker ,而客户端程序通过Broker 进行消息中转。因此,本文所述的示例,则必须以成功安装和启动Apache Pulsar Broker 为前提。Apache Pulsar 支持多种客户端编程语言,如Java、Go、Python、Node.js和C#等。下面以C#为例,讲解如何构建Apache Pulsar 客户端消息订阅发布程序。
2 前提条件
本示例依赖于如下前置条件:1)Visual Studio 2019社区版;2)Apache Pulsar Broker 。 关于Visual Studio 2019社区版的安装,这里不再赘述。首先用Visual Studio 2019社区版新建一个.NET5版本的控制台程序。并添加依赖项:DotPulsar 。同时添加消息生产者和消费者的类文件,项目结构示例如下:
3 Apache Pulsar C#客户端
本示例使用的Pulsar C# 客户端 ,即DotPulsar,可以在 C# 中创建生产者和消费者。这里需要注意的是: 根据官网的说法,C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的。 具体可以参考官网地址: https://pulsar.apache.org/docs/zh-CN/client-libraries-dotnet/ 。
下面给出消费者(PConsumer)核心代码如下:
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using DotPulsar.Internal;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace PulsarConsoleApp
{
public class PConsumer
{
public async Task RunAsync()
{
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://192.168.0.21:6650"))
.RetryInterval(TimeSpan.FromSeconds(3))
.Build();
var consumer = client.NewConsumer()
.StateChangedHandler(PMonitor)
.Topic("topic-demo")
.SubscriptionName("subscription-demo")
.Create();
try
{
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Value().ToArray()));
await consumer.Acknowledge(message);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
//状态监控
private void PMonitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ConsumerState switch
{
ConsumerState.Active => "is active",
ConsumerState.Inactive => "is inactive",
ConsumerState.Disconnected => "is disconnected",
ConsumerState.Closed => "has closed",
ConsumerState.ReachedEndOfTopic => "has reached end of topic",
ConsumerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ConsumerState}'"
};
var topic = stateChanged.Consumer.Topic;
Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
}
}
}
PulsarClient使用 Builder 创建一个 Pulsar C# 客户端,ServiceUrl设置 Pulsar 集群的服务URL。RetryInterval设置重试操作或重新连接之前的等待时间。
下面给出生产者(PProducer)核心代码如下:
using DotPulsar;
using DotPulsar.Extensions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace PulsarConsoleApp
{
//C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的
public class PProducer
{
public void Run()
{
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://192.168.0.21:6650"))
.RetryInterval(TimeSpan.FromSeconds(3))
.Build();
var producer = client.NewProducer()
.StateChangedHandler(PMonitor) //绑定监听方法
.Topic("topic-demo")
.Create();
while (true)
{
var data = Encoding.UTF8.GetBytes("Msg from C# Producer @"+ System.DateTime.Now.ToString());
producer.Send(data);
Thread.Sleep(1000);
}
}
//状态监控
private void PMonitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ProducerState switch
{
ProducerState.Connected => "is connected",
ProducerState.Disconnected => "is disconnected",
ProducerState.PartiallyConnected => "is partially connected",
ProducerState.Closed => "has closed",
ProducerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ProducerState}'"
};
var topic = stateChanged.Producer.Topic;
Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
}
}
}
生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到Broker 上。因此,生产者需要通过PulsarClient的ServiceUrl来设置 Pulsar 集群的服务URL。同时需要指定当前消息的主题。
最后给出启动示例的核心代码:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PulsarConsoleApp
{
public class Program
{
public static async Task Main(string[] args)
{
Thread thread = new Thread(new ThreadStart(startProducer));
thread.Start();
await startConsumer();
Console.WriteLine("====================");
}
private static void startProducer()
{
PProducer pProducer = new PProducer();
pProducer.Run();
}
private static async Task startConsumer()
{
//异步方法
PConsumer pConsumer = new PConsumer();
await pConsumer.RunAsync();
}
}
}
这里采用Thead开启另外一个线程的原因是,生产者PProducer会阻塞当前线程,导致消费者PConsumer无法启动,而不能演示消息的发布和消费功能。启动后,输出如下所示:
- 点赞
- 收藏
- 关注作者
评论(0)