《Flink原理、实战与性能优化》 —2.4.2 运行程序
2.4.2 运行程序
启动Scala Shell交互式解释器后,就可以进行Flink流式应用或批量应用的开发。需要注意的是,Flink已经在启动的执行环境中初始化好了相应的Environment,分别使用“benv”和“senv”获取批量计算环境和流式计算环境,然后使用对应环境中的API开发Flink应用。以下代码实例分别是用批量和流式实现WordCount的应用,读者可以直接在启动Flink Scala Shell客户端后执行并输出结果。
通过Scala-Shell运行批量计算程序,调用benv完成对单词数量的统计。
scala> val textBatch = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer")
scala> val counts = textBatch
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
scala> counts.print()
通过Scala-Shell运行流式计算,调用senv完成对单词数量的统计。
scala> val textStreaming = senv.fromElements(
"flink has Stateful Computations over Data Streams")
scala> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
scala> countsStreaming.print()
scala> senv.execute("Streaming Wordcount")
注意 用户在使用交互式解释器方式进行应用开发的过程中,流式作业和批量作业中的一些操作(例如写入文件)并不会立即执行,而是需要用户在程序的最后执行env.execute(“appname”)命令,这样整个程序才能触发运行。
- 点赞
- 收藏
- 关注作者
评论(0)