DIS Consumer 的概念科普及使用优势

举报
dis_tenji 发表于 2020/08/29 11:46:45 2020/08/29
【摘要】 您可使用 DIS Consumer 开发从 DIS 通道消费数据的应用程序。尽管您可使用 Java SDK 或者 API 从 DIS 通道中获取数据,但我们建议您使用 DIS Consumer 来开发消费者应用程序。主题使用 DIS Consumer 的优势安装DIS ConsumerDIS Consumer主要概念读取DIS通道数据配置DIS Consumer使用 DIS Consumer...

您可使用 DIS Consumer 开发从 DIS 通道消费数据的应用程序。尽管您可使用 Java SDK 或者 API 从 DIS 通道中获取数据,但我们建议您使用 DIS Consumer 来开发消费者应用程序。

主题

使用 DIS Consumer 的优势

安装DIS Consumer

DIS Consumer主要概念

读取DIS通道数据

配置DIS Consumer

使用 DIS Consumer 的优势

以下列表说明了使用 DIS Consumer 开发从通道消费数据的应用程序的一些主要优势。

  • 易于使用

    直接订阅通道,无需关注重平衡(Rebalance)、数据消费等,可以将精力更多放在数据处理的逻辑上。

  • 检查点(Checkpoint)管理

    支持自动或手动提交检查点(Checkpoint),存储在服务端,简化检查点(Checkpoint)管理。

安装DIS Consumer

目前 DIS Consumer 只支持 JAVA,如果您正在使用 Maven 管理程序依赖,在 pom.xml 文件中添加以下依赖即可:

<dependency>
    <groupId>com.huaweicloud.dis</groupId>
    <artifactId>huaweicloud-dis-kafka-adapter</artifactId>
    <version>1.2.10</version>
</dependency>

要查找最新版本的 DIS Consumer,请使用以下 Maven 搜索链接:

强烈建议您使用最新版本的 DIS Consumer,可以保证最近的 BUG 已被修复或者使用到最新的特性。

DIS Consumer主要概念

以下部分包含使用 DIS Consumer 需要了解的概念和术语。

消费组(应用程序组)

消费组(应用程序组)指多个消费者实例组成一个组来共同消费一组主题,不仅可以加速消费速率,还具有扩展性和容错性等优势。

  • 消费者下可以有一个或者多个消费者,消费者可以是一个进程,也可以是一个线程;

  • 同一个消费组订阅的通道的每个分区只能分配给这个消费组下的某一个消费者实例(当然该分区还可以被分配给其他消费组)。

比如有两个通道:通道1和通道2,两个通道下分别有两个分区;两个消费组:消费组1和消费组2,消费组1有两个消费者,消费组2有三个消费者,两个消费组都分别订阅了两个通道,分区分配如下:

消费组1

|--消费者1:通道1分区1、通道1分区2

|--消费者2:通道2分区1、通道2分区2

消费组2

|--消费者1:通道1分区1

|--消费者2:通道1分区2

|--消费者3:通道2分区1、通道2分区2

image.png

消费者(应用程序)

订阅 DIS 通道消息的客户端应用程序就被称为消费者(Consumer),消费者能够同时订阅多个主题的消息。

检查点(Checkpoint)

消费者(应用程序)消费数据时,记录已消费数据的最新序列号作为检查点。当重新消费数据时,可根据此检查点继续消费。

重平衡(Rebalance)

重平衡(Rebalance)本质上是一种协议,规定了一个消费组下的所有消费者实例如何达成一致来分配订阅通道的每个分区。比如某个消费组下有10个消费者,它订阅了一个具有100个分区的通道。正常情况下,平均每个消费者实例会分配到10个分区。这个分配的过程就叫重平衡。

以下几种情况会触发重平衡:

  1. 消费组内成员发生变更(比如有新的消费者加入,或者有消费者离开);

  2. 订阅的通道数发生变化(比如使用正则表达式的方式订阅通道,新的满足此正则表达式的通道创建时就会导致订阅的通道数发生变化);

  3. 订阅的通道分区数发生变化(主动扩缩容或自动扩缩容都会导致通道分区发生变化)。

以下过程表示消费组内有消费者离开时触发重平衡:

image.png

读取DIS通道数据

以下部分主要说明 DIS Consumer 的示例代码。

最简代码

以下是使用 DIS Consumer 订阅通道并下载通道数据的最简代码。

// YOU AK

String ak = "YOU_AK";

// YOU SK

String sk = "YOU_SK";

// YOU ProjectId

String projectId = "YOU_PROJECT_ID";

// YOU DIS Stream

String streamName = "YOU_STREAM_NAME";

// 消费组ID,用于记录检查点和进行负载均衡

String groupId = "YOU_GROUP_ID";

// DIS region

String region = "YOU_Region";


Properties props = new Properties();

props.setProperty(DISConfig.PROPERTY_AK, ak);

props.setProperty(DISConfig.PROPERTY_SK, sk);

props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);

props.setProperty(DISConfig.PROPERTY_REGION_ID, region);

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.LATEST.name());


// 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可

// props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");


Consumer<String, String> consumer = new DISKafkaConsumer<>(props);

// 订阅指定需要消费的通道

consumer.subscribe(Collections.singleton(streamName));


while (true)

{

// 拉取数据

ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);


if (!records.isEmpty())

{

for (TopicPartition partition : records.partitions())

{

List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

for (ConsumerRecord<String, String> record : partitionRecords)

{

System.out.println("Value: " + record.value() + ", Partition: " + record.partition() + ", Offset: "

+ record.offset() + ", Key: " + record.key());


// 其它业务逻辑

}

}

}

}

提交检查点

以上示例在消费完数据之后,没有主动提交检查点,也没有开启自动提交检查点(关于 DIS Consumer 的配置请查阅配置DIS Consumer),那么如果消费者程序异常退出了,那么有可能导致重复消费或者漏消费的情况。

// YOU AK

String ak = "YOU_AK";

// YOU SK

String sk = "YOU_SK";

// YOU ProjectId

String projectId = "YOU_PROJECT_ID";

// YOU DIS Stream

String streamName = "YOU_STREAM_NAME";

// 消费组ID,用于记录检查点和进行负载均衡

String groupId = "YOU_GROUP_ID";

// DIS region

String region = "YOU_Region";


Properties props = new Properties();

props.setProperty(DISConfig.PROPERTY_AK, ak);

props.setProperty(DISConfig.PROPERTY_SK, sk);

props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);

props.setProperty(DISConfig.PROPERTY_REGION_ID, region);

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.LATEST.name());


// 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可

// props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");


Consumer<String, String> consumer = new DISKafkaConsumer<>(props);

// 订阅指定需要消费的通道

consumer.subscribe(Collections.singleton(streamName));


while (true)

{

// 拉取数据

ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);


if (!records.isEmpty())

{

for (TopicPartition partition : records.partitions())

{

List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

for (ConsumerRecord<String, String> record : partitionRecords)

{

System.out.println("Value: " + record.value() + ", Partition: " + record.partition() + ", Offset: "

+ record.offset() + ", Key: " + record.key());

}

}


// 数据处理完成之后异步提交当前检查点(也可使用同步提交方法 commitSync 进行同步提交)

consumer.commitAsync(new OffsetCommitCallback()

{

@Override

public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)

{

if (e == null)

{

System.out.println("Success to commit offset.");

}

else

{

System.out.println("Failed to commit offset, errorMessage: " + e.getMessage());

}

}

});

}

}

使用通配符订阅通道

在订阅通道时,您可以不用指定具体的通道名称而使用通配符,例如订阅 stream.* 表示会消费 stream1, stream2, stream_123 等等通道。已有/新增/删除的通道,只要匹配通配符,就可被消费组消费。

// YOU AK

String ak = "YOU_AK";

// YOU SK

String sk = "YOU_SK";

// YOU ProjectId

String projectId = "YOU_PROJECT_ID";

// YOU DIS Stream

String streamName = "YOU_STREAM_NAME";

// 消费组ID,用于记录检查点和进行负载均衡

String groupId = "YOU_GROUP_ID";

// DIS region

String region = "YOU_Region";


Properties props = new Properties();

props.setProperty(DISConfig.PROPERTY_AK, ak);

props.setProperty(DISConfig.PROPERTY_SK, sk);

props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);

props.setProperty(DISConfig.PROPERTY_REGION_ID, region);

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.LATEST.name());


// 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可

// props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");


Consumer<String, String> consumer = new DISKafkaConsumer<>(props);

// 使用通配符订阅通道,并指定消费组发生负载均衡时的监听器

consumer.subscribe(Pattern.compile("stream.*"), new ConsumerRebalanceListener()

{

@Override

public void onPartitionsRevoked(Collection<TopicPartition> collection)

{

// 负载均衡开始前

System.out.println("onPartitionsRevoked.");

}


@Override

public void onPartitionsAssigned(Collection<TopicPartition> collection)

{

// 负载均衡完成后

System.out.println("onPartitionsAssigned.");

}

});


while (true)

{

// 拉取数据

ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);


if (!records.isEmpty())

{

for (TopicPartition partition : records.partitions())

{

List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

for (ConsumerRecord<String, String> record : partitionRecords)

{

System.out.println("Value: " + record.value() + ", Partition: " + record.partition() + ", Offset: "

+ record.offset() + ", Key: " + record.key());

}

}


// 数据处理完成之后异步提交当前检查点(也可使用同步提交方法 commitSync 进行同步提交)

consumer.commitAsync(new OffsetCommitCallback()

{

@Override

public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)

{

if (e == null)

{

System.out.println("Success to commit offset.");

}

else

{

System.out.println("Failed to commit offset, errorMessage: " + e.getMessage());

}

}

});

}

}

配置DIS Consumer

DIS Consumer 的默认配置已经适合大多数的应用场景,但您可能希望修改某些配置项来满足具体的业务需求,可以通过以下方式来修改 DIS Consumer 的配置,例如:

props.setProperty("enable.auto.commit", "true");

props.setProperty("auto.commit.interval.ms", "10000");


Producer<String, String> producer = new DISKafkaProducer<>(props);

同时,也支持在配置文件 dis.properties 中去定义配置项,例如:

enable.auto.commit=trueauto.commit.interval.ms=10000

以下配置为 DIS Producer 支持的主要配置项说明。

表1 配置项说明

配置项

类型

说明

默认值

key.serializer

String

key 对象的序列化类。

StringSerializer

value.serializer

String

value 对象的序列化类。

StringSerializer

group.id

String

消费组ID


client.id

String

客户端ID


enable.auto.commit

String

是否开启自动提交检查点。

  • true:开启自动提交。

  • false:关闭自动提交。

false

auto.commit.interval.ms

String

开启自动提交检查点时,自动提交的时间间隔,单位毫秒。

5000

auto.offset.reset

String

此参数用于没有初始检查点或者检查点不正确的情况下,自动设置 offset 位置:

  • earliest:从最老的数据开始读取,类似于获取数据游标时的 TRIM_HORIZON 类型。

  • latest:从最新的数据开始读取,类似于获取数据游标时的 LATEST 类型。

  • none:如果在消费者组中没有发现前一个检查点,就向消费者抛出一个异常。


在消费组存在检查点且检查点合法的情况下,优先从检查点的 offset 开始读取。

latest

max.fetch.threads

String

消费者拉取数据的最大线程数。

50

heartbeat.periodic.heartbeat

String

是否开启定时心跳,如果关闭定时心跳,那么在慢消费场景下(消费完一批数据之后很长时间才处理完)有可能会导致很频繁的负载均衡,从而影响消费速度。

  • 开启定时心跳。

  • 关闭定时心跳。

true

heartbeat.interval.ms

String

开启定时心跳时,定时心跳的发送周期,单位是毫秒。

10000



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。