2021年大数据Flink(四十二):​​​​​​​BroadcastState

举报
Lansonli 发表于 2021/09/29 00:51:24 2021/09/29
【摘要】 ​​​​​目录 ​BroadcastState BroadcastState介绍 需求-实现配置动态更新 编码步骤 1.env 2.source 3.transformation 4.sink 5.execute 参考实现 实现代码   BroadcastState BroadcastState介...

​​​​​目录

BroadcastState

BroadcastState介绍

需求-实现配置动态更新

编码步骤

1.env

2.source

3.transformation

4.sink

5.execute

参考实现

实现代码


 

BroadcastState

BroadcastState介绍

在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State。Broadcast State 是 Flink 1.5 引入的新特性。

下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

  • 场景举例
  1. 动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。
  2. 实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。

 

  • API介绍

首先创建一个Keyed 或Non-Keyed 的DataStream,

然后再创建一个BroadcastedStream,

最后通过DataStream来连接(调用connect 方法)到Broadcasted Stream 上,

这样实现将BroadcastState广播到Data Stream 下游的每个Task中。

 

1.如果DataStream是Keyed Stream ,则连接到Broadcasted Stream 后, 添加处理ProcessFunction 时需要使用KeyedBroadcastProcessFunction 来实现, 下面是KeyedBroadcastProcessFunction 的API,代码如下所示:


  
  1. public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2.     public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3.     public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

l KS:表示Flink 程序从最上游的Source Operator 开始构建Stream,当调用keyBy 时所依赖的Key 的类型;上面泛型中的各个参数的含义,说明如下:

l IN1:表示非Broadcast 的Data Stream 中的数据记录的类型;

l IN2:表示Broadcast Stream 中的数据记录的类型;

l OUT:表示经过KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。

 

2.如果Data Stream 是Non-Keyed Stream,则连接到Broadcasted Stream 后,添加处理ProcessFunction 时需要使用BroadcastProcessFunction 来实现, 下面是BroadcastProcessFunction 的API,代码如下所示:


  
  1. public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
  2. public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
  3. public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
  4. }

具体如何使用上面的BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用KeyedBroadcastProcessFunction 为例进行详细说明。上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction 的泛型类型中的后3 个含义相同,只是没有调用keyBy 操作对原始Stream 进行分区操作,就不需要KS 泛型参数。

 

  • 注意事项

1) Broadcast State 是Map 类型,即K-V 类型。

2) Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读

3) Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。

4) Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。

5) Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。

 

需求-实现配置动态更新

 

 

实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息。

事件流:表示用户在某个时刻浏览或点击了某个商品,格式如下。


  
  1. {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
  2. {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}


配置数据: 表示用户的详细信息,在Mysql中,如下。


  
  1. DROP TABLE IF EXISTS `user_info`;
  2. CREATE TABLE `user_info`  (
  3.   `userID` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  4.   `userName` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  5.   `userAge` int(11) NULL DEFAULT NULL,
  6.   PRIMARY KEY (`userID`) USING BTREE
  7. ) ENGINE = MyISAM CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  8. -- ----------------------------
  9. -- Records of user_info
  10. -- ----------------------------
  11. INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
  12. INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
  13. INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
  14. INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
  15. SET FOREIGN_KEY_CHECKS = 1;

输出结果:

(user_3,2019-08-17 12:19:47,browse,1,王五,33)

(user_2,2019-08-17 12:19:48,click,1,李四,20)

 

 

 

编码步骤

1.env

2.source

  • -1.构建实时数据事件流-自定义随机

<userID, eventTime, eventType, productID>

  • -2.构建配置流-从MySQL

<用户id,<姓名,年龄>>

3.transformation

  • -1.定义状态描述器

MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =

new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

  • -2.广播配置流

BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);

  • -3.将事件流和广播流进行连接

BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);

  • -4.处理连接后的流-根据配置流补全事件流中的用户的信息

4.sink

5.execute

 

参考实现


  
  1. package cn.lanson.action;
  2. import org.apache.flink.api.common.state.BroadcastState;
  3. import org.apache.flink.api.common.state.MapStateDescriptor;
  4. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  5. import org.apache.flink.api.common.typeinfo.Types;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.api.java.tuple.Tuple4;
  8. import org.apache.flink.api.java.tuple.Tuple6;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
  11. import org.apache.flink.streaming.api.datastream.BroadcastStream;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  16. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  17. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  18. import org.apache.flink.util.Collector;
  19. import java.sql.Connection;
  20. import java.sql.DriverManager;
  21. import java.sql.PreparedStatement;
  22. import java.sql.ResultSet;
  23. import java.text.SimpleDateFormat;
  24. import java.util.Date;
  25. import java.util.HashMap;
  26. import java.util.Map;
  27. import java.util.Random;
  28. /**
  29.  * Author Lansonli
  30.  * Desc
  31.  * 需求:
  32.  * 使用Flink的BroadcastState来完成
  33.  * 事件流和配置流(需要广播为State)的关联,并实现配置的动态更新!
  34.  */
  35. public class BroadcastStateConfigUpdate {
  36.     public static void main(String[] args) throws Exception{
  37.         //1.env
  38.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  39.         //2.source
  40.         //-1.构建实时的自定义随机数据事件流-数据源源不断产生,量会很大
  41.         //<userID, eventTime, eventType, productID>
  42.         DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());
  43.         //-2.构建配置流-从MySQL定期查询最新的,数据量较小
  44.         //<用户id,<姓名,年龄>>
  45.         DataStreamSource<Map<String, Tuple2<String, Integer>>> configDS = env.addSource(new MySQLSource());
  46.         //3.transformation
  47.         //-1.定义状态描述器-准备将配置流作为状态广播
  48.         MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
  49.                 new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
  50.         //-2.将配置流根据状态描述器广播出去,变成广播状态流
  51.         BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
  52.         //-3.将事件流和广播流进行连接
  53.         BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
  54.         //-4.处理连接后的流-根据配置流补全事件流中的用户的信息
  55.         SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = connectDS
  56.                 //BroadcastProcessFunction<IN1, IN2, OUT>
  57.                 .process(new BroadcastProcessFunction<
  58.                 //<userID, eventTime, eventType, productID> //事件流
  59.                 Tuple4<String, String, String, Integer>,
  60.                 //<用户id,<姓名,年龄>> //广播流
  61.                 Map<String, Tuple2<String, Integer>>,
  62.                 //<用户id,eventTime,eventType,productID,姓名,年龄> //需要收集的数据
  63.                 Tuple6<String, String, String, Integer, String, Integer>>() {
  64.             //处理事件流中的元素
  65.             @Override
  66.             public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
  67.                 //取出事件流中的userId
  68.                 String userId = value.f0;
  69.                 //根据状态描述器获取广播状态
  70.                 ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  71.                 if (broadcastState != null) {
  72.                     //取出广播状态中的map<用户id,<姓名,年龄>>
  73.                     Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
  74.                     if (map != null) {
  75.                         //通过userId取map中的<姓名,年龄>
  76.                         Tuple2<String, Integer> tuple2 = map.get(userId);
  77.                         //取出tuple2中的姓名和年龄
  78.                         String userName = tuple2.f0;
  79.                         Integer userAge = tuple2.f1;
  80.                         out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));
  81.                     }
  82.                 }
  83.             }
  84.             //处理广播流中的元素
  85.             @Override
  86.             public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
  87.                 //value就是MySQLSource中每隔一段时间获取到的最新的map数据
  88.                 //先根据状态描述器获取历史的广播状态
  89.                 BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  90.                 //再清空历史状态数据
  91.                 broadcastState.clear();
  92.                 //最后将最新的广播流数据放到state中(更新状态数据)
  93.                 broadcastState.put(null, value);
  94.             }
  95.         });
  96.         //4.sink
  97.         result.print();
  98.         //5.execute
  99.         env.execute();
  100.     }
  101.     /**
  102.      * <userID, eventTime, eventType, productID>
  103.      */
  104.     public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>{
  105.         private boolean isRunning = true;
  106.         @Override
  107.         public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
  108.             Random random = new Random();
  109.             SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  110.             while (isRunning){
  111.                 int id = random.nextInt(4) + 1;
  112.                 String user_id = "user_" + id;
  113.                 String eventTime = df.format(new Date());
  114.                 String eventType = "type_" + random.nextInt(3);
  115.                 int productId = random.nextInt(4);
  116.                 ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
  117.                 Thread.sleep(500);
  118.             }
  119.         }
  120.         @Override
  121.         public void cancel() {
  122.             isRunning = false;
  123.         }
  124.     }
  125.     /**
  126.      * <用户id,<姓名,年龄>>
  127.      */
  128.     public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
  129.         private boolean flag = true;
  130.         private Connection conn = null;
  131.         private PreparedStatement ps = null;
  132.         private ResultSet rs = null;
  133.         @Override
  134.         public void open(Configuration parameters) throws Exception {
  135.             conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
  136.             String sql = "select `userID`, `userName`, `userAge` from `user_info`";
  137.             ps = conn.prepareStatement(sql);
  138.         }
  139.         @Override
  140.         public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
  141.             while (flag){
  142.                 Map<String, Tuple2<String, Integer>> map = new HashMap<>();
  143.                 ResultSet rs = ps.executeQuery();
  144.                 while (rs.next()){
  145.                     String userID = rs.getString("userID");
  146.                     String userName = rs.getString("userName");
  147.                     int userAge = rs.getInt("userAge");
  148.                     //Map<String, Tuple2<String, Integer>>
  149.                     map.put(userID,Tuple2.of(userName,userAge));
  150.                 }
  151.                 ctx.collect(map);
  152.                 Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
  153.             }
  154.         }
  155.         @Override
  156.         public void cancel() {
  157.             flag = false;
  158.         }
  159.         @Override
  160.         public void close() throws Exception {
  161.             if (conn != null) conn.close();
  162.             if (ps != null) ps.close();
  163.             if (rs != null) rs.close();
  164.         }
  165.     }
  166. }

实现代码


  
  1. package cn.lanson.action;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.state.BroadcastState;
  4. import org.apache.flink.api.common.state.MapStateDescriptor;
  5. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  6. import org.apache.flink.api.common.typeinfo.Types;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple4;
  9. import org.apache.flink.api.java.tuple.Tuple6;
  10. import org.apache.flink.configuration.Configuration;
  11. import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
  12. import org.apache.flink.streaming.api.datastream.BroadcastStream;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  17. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  18. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  19. import org.apache.flink.util.Collector;
  20. import java.sql.Connection;
  21. import java.sql.DriverManager;
  22. import java.sql.PreparedStatement;
  23. import java.sql.ResultSet;
  24. import java.text.SimpleDateFormat;
  25. import java.util.Date;
  26. import java.util.HashMap;
  27. import java.util.Map;
  28. import java.util.Random;
  29. /**
  30. * Author Lansonli
  31. * Desc 需求:
  32. * 1.实时日志事件流:<userID, eventTime, eventType, productID> 哪个用户 在 什么时间,对 哪个商品 进行了 什么操作
  33. * 2.用户信息流(配置流/规则流): <用户id,<姓名,年龄>> 用户的详细信息
  34. * 3.将较小的信息流(配置流/规则流)作为状态广播到各个节点,便于对实时日志事件流中的用户信息进行补全!--其实就是做 状态广播 并要支持状态更新
  35. */
  36. public class BroadcastStateDemo {
  37. public static void main(String[] args) throws Exception {
  38. //TODO 1.env
  39. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  40. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  41. //TODO 2.source
  42. DataStream<Tuple4<String, String, String, Integer>> logDS = env.addSource(new MySource());
  43. DataStream<Map<String, Tuple2<String, Integer>>> userInfoDS = env.addSource(new MySQLSource());
  44. //TODO 3.transformation
  45. //--1.定义状态描述器(要将userInfoDS作为状态进行广播,key可以指定其他值,也可以不需要key/void/null)
  46. //下面的数据结构较为复杂,可以简化,这里只是给大家演示一下复杂嵌套类型的声明而已
  47. MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = new MapStateDescriptor<>("user", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
  48. //--2.根据状态描述器将userInfoDS作为状态进行广播
  49. BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = userInfoDS.broadcast(descriptor);
  50. //--3.将实时日志事件流和广播流进行连接
  51. BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS = logDS.connect(broadcastDS);
  52. //--4.处理连接流中的数据
  53. SingleOutputStreamOperator<Object> resultDS = connectDS.process(new BroadcastProcessFunction<
  54. Tuple4<String, String, String, Integer>, //实时日志事件流:<userID, eventTime, eventType, productID> 哪个用户 在 什么时间,对 哪个商品 进行了 什么操作
  55. Map<String, Tuple2<String, Integer>>, //用户信息流(配置流/规则流): <用户id,<姓名,年龄>> 用户的详细信息
  56. Object>() {
  57. //处理元素
  58. @Override
  59. public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Object> out) throws Exception {
  60. String userId = value.f0;
  61. //拿到状态
  62. ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  63. Map<String, Tuple2<String, Integer>> userMap = broadcastState.get(null);
  64. if (userMap != null) {
  65. Tuple2<String, Integer> user = userMap.get(userId);
  66. String name = user.f0;
  67. Integer age = user.f1;
  68. out.collect(Tuple6.of(userId, name, age, value.f1, value.f3, value.f2));
  69. }
  70. }
  71. //处理广播状态
  72. @Override
  73. public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Object> out) throws Exception {
  74. BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
  75. //清理广播数据后再重新广播新数据
  76. broadcastState.clear();
  77. broadcastState.put(null, value);
  78. }
  79. });
  80. //TODO 4.sink
  81. resultDS.print();
  82. //TODO 5.execute
  83. env.execute();
  84. }
  85. /**
  86. * 1.实时日志事件流:<userID, eventTime, eventType, productID> 哪个用户 在 什么时间,对 哪个商品 进行了 什么操作
  87. * <userID, eventTime, eventType, productID>
  88. */
  89. public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
  90. private boolean isRunning = true;
  91. @Override
  92. public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
  93. Random random = new Random();
  94. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  95. while (isRunning){
  96. int id = random.nextInt(4) + 1;
  97. String user_id = "user_" + id;
  98. String eventTime = df.format(new Date());
  99. String eventType = "type_" + random.nextInt(3);
  100. int productId = random.nextInt(4);
  101. ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
  102. Thread.sleep(500);
  103. }
  104. }
  105. @Override
  106. public void cancel() {
  107. isRunning = false;
  108. }
  109. }
  110. /**
  111. * 2.用户信息流(配置流/规则流): <用户id,<姓名,年龄>> 用户的详细信息
  112. * <用户id,<姓名,年龄>>
  113. */
  114. public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
  115. private boolean flag = true;
  116. private Connection conn = null;
  117. private PreparedStatement ps = null;
  118. private ResultSet rs = null;
  119. @Override
  120. public void open(Configuration parameters) throws Exception {
  121. conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
  122. String sql = "select `userID`, `userName`, `userAge` from `user_info`";
  123. ps = conn.prepareStatement(sql);
  124. }
  125. @Override
  126. public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
  127. while (flag){
  128. Map<String, Tuple2<String, Integer>> map = new HashMap<>();
  129. ResultSet rs = ps.executeQuery();
  130. while (rs.next()){
  131. String userID = rs.getString("userID");
  132. String userName = rs.getString("userName");
  133. int userAge = rs.getInt("userAge");
  134. //Map<String, Tuple2<String, Integer>>
  135. map.put(userID, Tuple2.of(userName,userAge));
  136. }
  137. ctx.collect(map);
  138. Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
  139. }
  140. }
  141. @Override
  142. public void cancel() {
  143. flag = false;
  144. }
  145. @Override
  146. public void close() throws Exception {
  147. if (conn != null) conn.close();
  148. if (ps != null) ps.close();
  149. if (rs != null) rs.close();
  150. }
  151. }
  152. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116381356

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。