[流式计算]处理时间序列数据需要注意的5个要点

举报
Jonathan.Wei 发表于 2019/02/18 11:06:55 2019/02/18
【摘要】 现如今,实时计算开始火热,包括阿里对flink的大力支持,让实时计算成为了大家眼中的焦点。我们可以将所要处理的数据分为有序数据和无序数据。无序数据不要求数据的顺序一致性,在做处理的时候可以根据自己对吞吐量的需求,加大并发度,例如网站日志;而有序数据一般属于对数据有严格要求的场景,必须保证数据的强时序性,例如银行交易义务。在数据量较小以及满足业务需求的情况下,保证kafka topic的数据强...

Social_Network_Analysis_Visualization-620x342.png

现如今,实时计算开始火热,包括阿里对flink的大力支持,让实时计算成为了大家眼中的焦点。

我们可以将所要处理的数据分为有序数据和无序数据。无序数据不要求数据的顺序一致性,在做处理的时候可以根据自己对吞吐量的需求,加大并发度,例如网站日志;而有序数据一般属于对数据有严格要求的场景,必须保证数据的强时序性,例如银行交易义务。

在数据量较小以及满足业务需求的情况下,保证kafka topic的数据强顺序要求只需配置单个分区即可满足,但数据量较大的情况下,并发吞吐就会跟不上。

本文主要讲解在较大数据量的情况下,使用Kafka和Spark处理流时间序列数据时所要记住的几个要点。
以下是给定的实力数据结构以及要点。

1_NE_ZiCnXpj-aXski8fjxRw.png

1、使用正确的分区键分发数据

Kafka将Topic的每个分区的日志存储在单独的日志目录中,且kafka只保证单个分区内部数据的顺序。因此,如果你的应用程序要求以与生成数据相同的顺序使用数据,则选择正确的分区键非常重要。

在示例应用程序中,我们可以选择user_id作为我们的分区键,因为我们需要按照每个用户的上下文顺序存储事件。默认情况下,Kafka使用Topic中可用分区的键和编号的Hash值来决定放置哪个分区。你可以使用自定义分区程序更改此设置,或在推送到主题时将每个事件映射到特定分区。默认行为适用于大多数应用程序。

对于4个分区的主题,每个分区都有对应Key子集的日志,其顺序与推送顺序相同。虽然多个数据源可以以不同的间隔写入主题,但是两个源不应该为单个密钥生成数据,因为时间戳和写入间隔可能不同。

0_6LaCgWbMefYz5QdG.png

2、根据执行者缩放分区

更多分区意味着Kafka的吞吐量更高。关于理解主题的推荐分区有一个很好的帖子。在流式上下文中,Spark不会根据作业可用的执行程序创建使用者组。而是Spark的ConsumerCoordinator为Topic的每个分区创建一个消费者组,并作为任务分发给所有执行者,并为每个批次分配偏移量。

在下面的屏幕截图中,您可以看到900个分区的主题(我的设置建议最多),Spark创建了所有可用执行程序并行处理的相同数量的任务。大量分区会增加每个批处理的处理时间,从而导致流式传输管道的延迟。

0_eNaCPpHgmqS0_AFW.png

在此示例应用程序中,与相同数据量的900个分区相比,处理一批8个分区中的12K记录所花费的时间较少。

Kafka支持在创建主题后更改主题的分区数。但请注意,一旦放大,就无法缩小分区。如果您必须缩小,则必须删除并重新创建主题。

3、不要使用对数据进行混洗/重新分区的功能。

当你按列或sortBy应用sort时,Spark会重新分区Dataframe/Rdd。此默认行为会在将消息重新调整到不同分区时更改消息的顺序。

您可以使用repartitionAndSortWithinPartitions支持使用自定义分区程序,可以在排序数据时使用。这篇博文详细解释了如何使用该技术将数据映射到按排序顺序的分区。

4、限制消息量并设置正确的批处理间隔

默认情况下,Spark会在执行每个批处理时为主题的每个分区提取所有可用消息。当单个批处理中的数据突发超过执行程序内存时,此行为将使您的spark作业失败。对于在生产中运行的Streaming
应用程序,建议设置backpressure&maxRatePerPartitionfor以限制每个批处理的数据量。

$ spark-submit --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=5000 --executor-cores 4 --num-executors 4 --class me.sathish.example.spark.SortedStreamProcessor --master yarn kafka_stream_processing_sorted_order_2.11-1.0.jar config.yml

将应用程序的批处理间隔设置为平均处理时间前几批,以避免将来批处理排队。虽然排队不会对正在运行的应用程序产生直接影响,但是管道的延迟会导致无法正常处理。

5、保存已处理数据的所有分区的偏移量并从保存的偏移量重新启动

流应用程序运行很长时间,业务逻辑不断变化。因此,在实际生产场景中,除了作业失败之外,我们可能必须停止当前的应用程序并安排具有更新逻辑的新版本。为了确保在这种情况下没有数据丢失,请确保正常关闭应用程序并从保存的偏移量重新启动。Spark支持三种方式保存消息的偏移量。

与kafka本身的检查点和保存偏移相比,我更喜欢将偏移保存到外部源的方法,因为它不受代码更改的影响,并且还具有在发生意外时修改偏移的灵活性。

总结

本文中所提到的流式处理在处理时间序列数据时所需要记住的5个要点,不单单只针对Kafka+Spark的情况,同样适合于其他流式处理队列和流式处理计算框架。例如kafka+flink、kafka+storm等。

关注公众号

0.jpeg


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。