一文入门流处理开发

举报
数据社 发表于 2022/09/25 04:25:57 2022/09/25
【摘要】 一、Flink介绍 Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引...

一、Flink介绍

Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

二、部署环境

操作系统环境:

flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。

JDK:要求Java 7或者更高

三、下载软件

jdk1.8.0_144

flink-1.4.2-bin-hadoop26-scala_2.11.tgz

四、部署步骤

1、JDK安装步骤此处省略,安装后验证下JDK环境

$ java -version

openjdk version "1.8.0_144"

OpenJDKRuntimeEnvironment(build 1.8.0_144-b01)

OpenJDK64-BitServer VM (build 25.144-b01, mixed mode)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2、安装部署flink 本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。

找到下载的flink压缩包,进行解压

$ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz

  
 
  • 1

首先是local模式,最为简单。

$ cd flink-1.4.2

$ bin/start-local.sh

Starting job manager

  
 
  • 1
  • 2
  • 3
  • 4
  • 5

我们可以通过查看日志确认是否启动成功

$ tailf flink-csap-taskmanager-0-XXXX.log

  
 
  • 1
2018-05-0310:07:53,718 INFO  org.apache.flink.runtime.filecache.FileCache- User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c

2018-05-0310:07:53,725 INFO  org.apache.flink.runtime.taskmanager.TaskManager- StartingTaskManager actor at akka://flink/user/taskmanager#-524742300.

2018-05-0310:07:53,725 INFO  org.apache.flink.runtime.taskmanager.TaskManager- TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc@ LY1F-R021707-VM14.local(dataPort=55234)

2018-05-0310:07:53,726 INFO  org.apache.flink.runtime.taskmanager.TaskManager- TaskManager has 1 task slot(s).

2018-05-0310:07:53,727 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]

2018-05-0310:07:53,730 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)

2018-05-0310:07:53,848 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Successful registration at JobManager(akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.

2018-05-0310:07:53,851 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Determined BLOB server address to be localhost/127.0.0.1:52382.Starting BLOB cache.

2018-05-0310:07:53,858 INFO  org.apache.flink.runtime.blob.PermanentBlobCache- Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b

2018-05-0310:07:53,861 INFO  org.apache.flink.runtime.blob.TransientBlobCache- Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。可以打开页面查看到相关信息,说明local模式部署是没问题的。

下面来看一下standlone部署方式。

安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。

修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINKTMHEAP参数类覆盖值钱的设置。

我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:

192.168.0.100

192.168.0.101

.

.

.

192.168.0.150

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群

$ bin/start-cluster.sh

  
 
  • 1

查看日志是否成功。

以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。

flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:

objectSocketWindowWordCount{



def main(args: Array[String]) : Unit= {



// the port to connect to

        val port: Int= try{

ParameterTool.fromArgs(args).getInt("port")

} catch{

case e: Exception=> {

System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")

return

}

}



// get the execution environment

        val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment



// 
get input data by connecting to the socket

        val text = env.socketTextStream(
"localhost"
, port, 
'\n'
)



// parse the data, group it, window it, and aggregate the counts

        val windowCounts = text

.flatMap { w => w.split(
"\\s"
) }

.map { w => 
WordWithCount
(w, 
1
) }

.keyBy(
"word"
)

.timeWindow(
Time
.seconds(
5
), 
Time
.seconds(
1
))

.sum(
"count"
)



// print the results with a single thread, rather than in parallel

        windowCounts.
print
().setParallelism(
1
)



        env.execute(
"Socket Window WordCount"
)

}



// Data type for words with count

caseclass
WordWithCount
(word: 
String
, count: 
Long
)

}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108

这个demo是监控端口,然后对端口输入单子进行wordcount的程序。

运行demo,首先打开一个窗口进行端口数据输入:

$ nc -l 9001

hello

hello

word

world

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

然后运行demo监控端口单词输入统计:

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001

  
 
  • 1

运行后可以看到结果统计:

$ more flink-csap-taskmanager-0-XXX.out.1

hello : 1

hello : 1

word : 1

world : 1

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

五、IEDA开发环境搭建

1、安装java环境
此处略去,这个你已经会了~

2、安装maven
参考Maven安装与配置

3、配置IDEA
参考如何使用IntelliJ IDEA 配置Maven
4、pom文件设置

</properties>

<dependencies>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>${scala.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

5、代码示例

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;



/**

 * Author: qincf

 * Date: 2018/11/02

 * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来

 *       先在目标主机1.1.1.1机器上执行nc -l 9000

 */

publicclassStreamingWindowWordCount{

publicstaticvoid main(String[] args) throwsException{

//定义socket的端口号

int port;

try{

ParameterTool parameterTool = ParameterTool.fromArgs(args);

            port = parameterTool.getInt("port");

}catch(Exception e){

System.err.println("没有指定port参数,使用默认值9000");

            port = 9000;

}

//获取运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//连接socket获取输入的数据

DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");

//计算数据

DataStream<WordWithCount> windowCount = text.flatMap(newFlatMapFunction<String, WordWithCount>() {

publicvoid flatMap(String value, Collector<WordWithCount> out) throwsException{

String[] splits = value.split("\\s");

for(String word:splits) {

out.collect(newWordWithCount(word,1L));

}

}

})//打平操作,把每行的单词转为<word,count>类型的数据

//针对相同的word数据进行分组

.keyBy("word")

//指定计算数据的窗口大小和滑动窗口大小

.timeWindow(Time.seconds(2),Time.seconds(1))

.sum("count");

//获取可视化JSON

System.out.println(env.getExecutionPlan());

//把数据打印到控制台,使用一个并行度

        windowCount.print().setParallelism(1);

//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

        env.execute("streaming word count");







}



/**

     * 主要为了存储单词以及单词出现的次数

     */

publicstaticclassWordWithCount{

publicString word;

publiclong count;

publicWordWithCount(){}

publicWordWithCount(String word, long count) {

this.word = word;

this.count = count;

}



@Override

publicString toString() {

return"WordWithCount{"+

"word='"+ word + '\''+

", count="+ count +

'}';

}

}



}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151

6、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

nc -l 
9000

  
 
  • 1
  • 2

然后在IEDA中运营StreamingWindowWordCount程序 在主机上输入字符

[root@data01]# nc -l 9000

a

a

b

c

d

d

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

此时运行程序后,IDEA中会打印处结果

WordWithCount{word='a', count=1}

WordWithCount{word='a', count=2}

WordWithCount{word='b', count=1}

WordWithCount{word='d', count=1}

WordWithCount{word='c', count=1}

WordWithCount{word='c', count=1}

WordWithCount{word='a', count=1}

WordWithCount{word='d', count=1}

WordWithCount{word='b', count=1}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

大家会看到,wordcount的结果。仔细看还有一串json输出,这部分是什么呢?代码中加了一个打印执行计划的部分:

/获取可视化JSON

System.out.println(env.getExecutionPlan());

  
 
  • 1
  • 2
  • 3

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:

现在你已经搭建好Flink开发环境了,可以开启你的流处理旅程了,更多教程可以参考Flink官网。

文章来源: dataclub.blog.csdn.net,作者:数据社,版权归原作者所有,如需转载,请联系作者。

原文链接:dataclub.blog.csdn.net/article/details/106424750

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。