KSQL:Apache Kafka的开源Streaming SQL引擎

举报
Jonathan.Wei 发表于 2019/01/25 09:43:56 2019/01/25
【摘要】 KSQL是Apache Kafka的流式SQL引擎,它大大降低了流处理世界的门槛。 KSQL实现了非常有效的功能:使用数据领域中大多数社区已知的语义SQL实时读取,编写和转换数据!


KSQL-thumb.jpg

现代企业以数据为核心,随着数量的增加,这些数据正在快速变化。流处理允许企业实时利用这些信息,Netflix,Uber,Airbnb,PayPal和纽约时报等数万家公司使用Apache Kafka作为重塑其行业的首选流媒体平台。无论您是预订酒店还是航班,乘坐出租车,玩视频游戏,阅读报纸,在线购物或汇款,许多日常活动都是由Kafka在幕后提供支持。


然而,流处理的世界仍然具有很高的进入门槛。当今最流行的流处理技术,包括Apache Kafka的Streams API,仍然要求用户使用Java或Scala等编程语言编写代码。对编码技能的这一严格要求阻碍了许多公司将流处理的优势发挥到极致。但幸运的是,现在有更好的方法。


什么是KSQL?

KSQL是Apache Kafka的流式SQL引擎,它大大降低了流处理世界的门槛。

KSQL实现了非常有效的功能:使用数据领域中大多数社区已知的语义SQL实时读取,编写和转换数据!

KSQL-Image.png

KSQL解决了什么问题?

如前所述,KSQL解决了在Kafka上提供SQL接口的主要问题,而无需使用Python或Java等外部语言。

然而,人们可能会争辩说,之前通过在Oracle数据库或BigQuery等目标数据存储上进行的ETL操作解决了同样的问题。那么在KSQL方法中有什么不同?有什么好处?


我认为的主要区别在于连续查询的概念:随着新数据到达Kafka主题,KSQL转换会不断完成。另一方面,在数据库(或BigQuery等大数据平台)中完成的转换是一次性的,如果新数据到达,则必须再次执行相同的转换。


TweetFlow.gif

KSQL能做什么呢?

KSQL是开源的(Apache 2.0许可),并构建在Kafka的Streams API之上。这意味着它支持各种强大的流处理操作,包括过滤,转换,聚合,连接,窗口和会话。

通过这种方式,您可以实时检测异常和欺诈活动,监控基础架构和物联网设备,执行基于会话的用户活动分析,执行实时ETL等等。

从通用的角度来看,当数据流中需要动态地进行转换,集成和分析时,你应该使用KSQL。


实时监控和实时分析

KSQL的一个用途是定义实时计算的自定义业务级度量标准,您可以从中监视和提醒。例如,展示视频游戏特许经营权的并发在线玩家数量(“我们的玩家是否参与?最新游戏扩展是否增加了游戏时间?”)或报告电子商务网站的废弃购物车数量(“我们的在线商店的最新更新是否让客户更容易结账?”)

另一个用途是在KSQL中为您的业务应用程序定义正确性概念,然后检查它们是否在生产中运行时满足此要求


KSQL可以直接从原始事件流中定义适当的度量标准,无论这些是从数据库更新,应用程序,移动设备还是任何其他类型生成的:

CREATE TABLE possibly_failing_vehicles AS
   SELECT vehicle, COUNT(*)
   FROM vehicle_monitoring_stream
   WINDOW TUMBLING (SIZE 5 MINUTES)
   WHERE  event_type = 'ERROR'
   GROUP BY vehicle
   HAVING COUNT(*) > 2;


在线数据集成和丰富

公司完成的大多数数据处理属于数据丰富领域:从几个数据库中获取数据,转换数据,将其连接在一起,并将其存储到键值存储,搜索索引,缓存或其他数据服务系统。

KSQL与Kafka连接器一起用于Oracle,MySQL,Elasticsearch,HDFS或S3等系统时,可以实现从批量数据集成到实时数据集成的转变。

如下面的KSQL查询所示,您可以使用流表连接来丰富包含存储在表中的元数据的数据流,或者在将流加载到另一个系统之前对个人身份信息(PII)进行简单过滤。

CREATE STREAM vip_users AS
   SELECT user_id, user_country, web_page, action
   FROM website_clickstream c
   LEFT JOIN users u ON u.user_id = c.user_id
   WHERE u.level = 'Platinum';

安全和异常检测

KSQL查询可以将事件流转换为数字时间序列聚合,这些聚合使用Kafka-Elastic连接器注入系统(如Elastic),然后在实时仪表板(如Grafana)中可视化。安全用例通常与监视和分析类似。在这里,您不是要监控应用程序行为或业务行为,而是在寻找欺诈,滥用,垃圾邮件,入侵或其他不良行为的模式。


KSQL提供了一种简单而复杂的实时方法来定义这些模式并查询实时流:

CREATE TABLE possible_fraud AS
   SELECT card_number, COUNT(*)
   FROM authorization_attempts
   WINDOW TUMBLING (SIZE 5 SECONDS)
   GROUP BY card_number
   HAVING COUNT(*) > 3;


流和数据库

当然,KSQL的使用案例比我在这篇短篇文章中所展示的更多,例如监控车队(“未来几天卡车是否需要预测性维护?”)或分布式物联网设备和家庭自动化传感器(“为什么二楼的温度会上升?”),或者实时分析Oracle中的数据库更新。一些有创意的用户甚至使用KSQL 实时分析赛车遥测数据。


但是,让我们先从这些具体的例子后退一步。在我看来,更令人兴奋的是,通过将数据库从内向外转换,KSQL将流(kafka)和数据库(Oracle、MySQL和Friends)的世界结合在一起。在KSQL中,类似于Kafka的Streams API,有两个核心数据抽象:流和表。它们允许您以流或表格式处理数据。这一点很重要,因为在实践中,几乎每个想要实现的实时用例都需要流和表。

query.png

以下是一个稍微简单的例子:作为零售商,您可以使用KSQL将Kafka中的实时客户活动事件流(购买,地理位置更新等)聚合到不断更新的客户360度配置文件表中,加入了有关这些客户的其他内部和外部信息。然后,此整合的客户资料表可以为应用程序提供支持,例如通过KSQL或Kafka的Streams API检测金融交易流中的欺诈性付款,或者可以通过Kafka的Connect框架和即用型连接器实时流式传输其数据传统的RDBMS,如Oracle,PostgreSQL或MySQL,它们是您现有基础架构的一部分。由于Apache Kafka(分布式流媒体平台)的强大技术基础,所有这些都是实时,容错和大规模完成的。


KSQL提供了一种将Kafka保存为数据库的唯一方法:无需在Kafka中取出数据,转换和重新插入。每次转换都可以使用Kafka SQL完成。


如前所述,KSQL现在可用于开发人员预览Kafka数据,与更成熟的SQL产品相比,功能/功能列表在某种程度上受到限制。但是,在需要进行非常复杂的转换的情况下,一旦数据落在目标数据存储区中,仍然可以通过另一种语言(如Java)或专用ETL(或视图)来解决这些转换。


KSQL如何工作?

那么KSQL如何在幕后工作呢?要记住两个概念:流和表。

流是结构化数据的序列,一旦事件被引入到流是不可变的,这意味着它不能被更新或删除。想象一下从存储器中推出或拉出的物品数量:“例如,今天库存了200件ProductA,而100件ProductB被取出”。


另一方面,表表示基于来自流的事件的当前情况。例如:ProductA的库存总量是多少?表格中的事实是可变的,如果ProductA不再有库存,可以更新或删除ProductA的数量。

StreamVSTableAcc.gif


KSQL通过简单的SQL方言实现流和表的定义。来自不同来源的各种流和表可以直接在KSQL中连接,从而实现数据组合和转换。

在KSQL中创建的每个流或表将存储在单独的Topic中,允许使用常用的连接器或脚本从中提取信息。


KSQL内部结构

ksql_cluster-min.png

有一个KSQL服务器进程执行查询。一组KSQL进程作为一个集群来运行。可以通过启动KSQL服务器的更多实例来动态添加更多的处理能力。这些实例具有容错性:如果一个实例失效,另外几个会接过它处理的工作。使用交互式KSQL命令行客户软件来启动查询,客户软件通过REST API向集群发送命令。命令行让你可以检查可用的数据流和表,执行新的查询,检查运行中查询的状态,并终止运行中查询。在内部,KSQL是使用Kafka的Streams API构建的;它继承了Kafka的弹性可扩展性、先进的状态管理及容错功能,还支持Kafka最近推出的只处理一次(exactly-once proecessing)语义。KSQL服务器嵌入这个机制,另外添加了分布式SQL引擎(包括一些新颖的功能,比如提升查询性能的字节码自动生成)以及用于查询和控制的REST API。


启动KSQL

KSQL可以在standalone模式和client-server模式下工作,第一个用于开发和测试场景,第二个用于支持生产环境。


使用standalone模式,KSQL客户端和服务器托管在同一台机器上,位于同一个JVM中。另一方面,在Client-Server模式下,KSQL服务器池运行在远程计算机上,客户端通过HTTP进行连接。


这里使用独立模式,该过程在confluent文档中得到了很好的解释,包括三个步骤:

  •  克隆KSQL存储库

  •  编译代码

  •  使用local参数启动KSQL


./bin/ksql-cli local



参考链接:

https://www.oreilly.com/ideas/big-fast-easy-data-with-ksql

https://www.rittmanmead.com/blog/2017/10/ksql-streaming-sql-for-apache-kafka/


0.jpeg


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200