一文入门流处理开发
一、Flink介绍
Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。
二、部署环境
操作系统环境:
flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。
JDK:要求Java 7或者更高
三、下载软件
jdk1.8.0_144
flink-1.4.2-bin-hadoop26-scala_2.11.tgz
四、部署步骤
1、JDK安装步骤此处省略,安装后验证下JDK环境
$ java -version
openjdk version "1.8.0_144"
OpenJDKRuntimeEnvironment(build 1.8.0_144-b01)
OpenJDK64-BitServer VM (build 25.144-b01, mixed mode)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
2、安装部署flink 本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。
找到下载的flink压缩包,进行解压
$ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz
- 1
首先是local模式,最为简单。
$ cd flink-1.4.2
$ bin/start-local.sh
Starting job manager
- 1
- 2
- 3
- 4
- 5
我们可以通过查看日志确认是否启动成功
$ tailf flink-csap-taskmanager-0-XXXX.log
- 1
2018-05-0310:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache- User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c
2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager- StartingTaskManager actor at akka://flink/user/taskmanager#-524742300.
2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager- TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc@ LY1F-R021707-VM14.local(dataPort=55234)
2018-05-0310:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager- TaskManager has 1 task slot(s).
2018-05-0310:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager- Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2018-05-0310:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager- Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2018-05-0310:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager- Successful registration at JobManager(akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.
2018-05-0310:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager- Determined BLOB server address to be localhost/127.0.0.1:52382.Starting BLOB cache.
2018-05-0310:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache- Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b
2018-05-0310:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache- Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。可以打开页面查看到相关信息,说明local模式部署是没问题的。
下面来看一下standlone部署方式。
安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。
修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINKTMHEAP参数类覆盖值钱的设置。
我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:
192.168.0.100
192.168.0.101
.
.
.
192.168.0.150
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群
$ bin/start-cluster.sh
- 1
查看日志是否成功。
以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。
flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:
objectSocketWindowWordCount{
def main(args: Array[String]) : Unit= {
// the port to connect to
val port: Int= try{
ParameterTool.fromArgs(args).getInt("port")
} catch{
case e: Exception=> {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//
get input data by connecting to the socket
val text = env.socketTextStream(
"localhost"
, port,
'\n'
)
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split(
"\\s"
) }
.map { w =>
WordWithCount
(w,
1
) }
.keyBy(
"word"
)
.timeWindow(
Time
.seconds(
5
),
Time
.seconds(
1
))
.sum(
"count"
)
// print the results with a single thread, rather than in parallel
windowCounts.
print
().setParallelism(
1
)
env.execute(
"Socket Window WordCount"
)
}
// Data type for words with count
caseclass
WordWithCount
(word:
String
, count:
Long
)
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
这个demo是监控端口,然后对端口输入单子进行wordcount的程序。
运行demo,首先打开一个窗口进行端口数据输入:
$ nc -l 9001
hello
hello
word
world
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
然后运行demo监控端口单词输入统计:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
- 1
运行后可以看到结果统计:
$ more flink-csap-taskmanager-0-XXX.out.1
hello : 1
hello : 1
word : 1
world : 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
五、IEDA开发环境搭建
1、安装java环境
此处略去,这个你已经会了~
2、安装maven
参考Maven安装与配置
3、配置IDEA
参考如何使用IntelliJ IDEA 配置Maven
4、pom文件设置
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
5、代码示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: qincf
* Date: 2018/11/02
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在目标主机1.1.1.1机器上执行nc -l 9000
*/
publicclassStreamingWindowWordCount{
publicstaticvoid main(String[] args) throwsException{
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch(Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
//计算数据
DataStream<WordWithCount> windowCount = text.flatMap(newFlatMapFunction<String, WordWithCount>() {
publicvoid flatMap(String value, Collector<WordWithCount> out) throwsException{
String[] splits = value.split("\\s");
for(String word:splits) {
out.collect(newWordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//获取可视化JSON
System.out.println(env.getExecutionPlan());
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
publicstaticclassWordWithCount{
publicString word;
publiclong count;
publicWordWithCount(){}
publicWordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
publicString toString() {
return"WordWithCount{"+
"word='"+ word + '\''+
", count="+ count +
'}';
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
6、测试步骤
首先在1.1.1.1机器上使用nc命令模拟数据发送
nc -l
9000
- 1
- 2
然后在IEDA中运营StreamingWindowWordCount程序 在主机上输入字符
[root@data01]# nc -l 9000
a
a
b
c
d
d
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
此时运行程序后,IDEA中会打印处结果
WordWithCount{word='a', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='a', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='b', count=1}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
大家会看到,wordcount的结果。仔细看还有一串json输出,这部分是什么呢?代码中加了一个打印执行计划的部分:
/获取可视化JSON
System.out.println(env.getExecutionPlan());
- 1
- 2
- 3
Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:
现在你已经搭建好Flink开发环境了,可以开启你的流处理旅程了,更多教程可以参考Flink官网。
文章来源: dataclub.blog.csdn.net,作者:数据社,版权归原作者所有,如需转载,请联系作者。
原文链接:dataclub.blog.csdn.net/article/details/106424750
- 点赞
- 收藏
- 关注作者
评论(0)