Flink高级特性和新特性(八)
【摘要】 Flink高级特性和新特性(八) BroadcastState 状态管理broadcast state 广播变量状态应用场景关联更新的规则,获取指定的数据(给ip得到经度纬度)=> 地图 API 获取到 省市区街道位置需求实时Flink DataStream 过滤出配置中(数据库)的用户,并在事件流中补全这批用户的基础信息。需求流程开发步骤package cn.itcast.flink.b...
Flink高级特性和新特性(八)
BroadcastState 状态管理
- broadcast state 广播变量状态
-
应用场景
关联更新的规则,获取指定的数据(给ip得到经度纬度)=> 地图 API 获取到 省市区街道位置
-
需求
实时Flink DataStream 过滤出配置中(数据库)的用户,并在事件流中补全这批用户的基础信息。
-
需求流程
- 开发步骤
package cn.itcast.flink.broadcast;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* Author itcast
* Date 2021/6/24 8:29
* 两个数据流 1.事件流 2.用户配置流 3.connect关联操作 4.打印输出 5.执行任务
* <String,String,String,Integer></>
* {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
* {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
* <String, String, Integer ></>
* 'user_2', '李四', 20
* 最终的数据流 6个 Tuple6<String,String,String,Integer,String,Integer></>
* (user_3,2019-08-17 12:19:47,browse,1,王五,33)
* (user_2,2019-08-17 12:19:48,click,1,李四,20)
*/
public class BroadcastStateDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//2.source
//-1.构建实时数据事件流-自定义随机
//<userID, eventTime, eventType, productID>
DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource());
//-2.构建配置流-从MySQL
//<用户id,<姓名,年龄>>
DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource());
//3.transformation
//-1.定义状态描述器
//MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
//new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
//-2.广播配置流
//BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc);
//-3.将事件流和广播流进行连接
//BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS)
//-4.处理连接后的流-根据配置流补全事件流中的用户的信息
.process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() {
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//读取出来 f0 为 userId
//事件流中读取用户 userId
String userId = value.f0;
//从ctx环境变量中通过 desc 读取出来广播状态
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
//如果广播状态不为空,get(null) 获取出来 配置数据Tuple2
if (broadcastState != null) {
Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
//判断 map 不为空则
if (map != null) {
Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId);
//取出姓名和年龄
//collect 收集 Tuple6
//3-4.处理(process)连接后的流-根据配置流补全事件流中的用户的信息,Tuple4和Tuple2合并
//处理每一条元素,processElement
out.collect(Tuple6.of(
userId,
value.f1,
value.f2,
value.f3,
stringIntegerTuple2.f0,
stringIntegerTuple2.f1
));
}
}
}
//value就是MySQLSource中每隔一段时间获取到的最新的map数据
//先根据状态描述器获取历史的广播状态 ctx.getBroadcastState(desc)
@Override
public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//再清空历史状态 broadcastState 数据
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
//最后将最新的广播流数据放到 state 中(更新状态数据) broadcastState.put(null,value)
broadcastState.clear();
broadcastState.put(null, value);
}
});
//处理广播中的元素
//4.sinks
result.print();
//5.execute
env.execute();
}
/**
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning){
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* <用户id,<姓名,年龄>>
*/
public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata?useSSL=false",
"root",
"123456");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
while (flag){
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next()){
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID, Tuple2.of(userName,userAge));
}
ctx.collect(map);
Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
}
}
}
- 实时的数据流和 动态变化的数据库中的配置流 进行 connect 操作, 打印输出
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)