【技术分享】FlinkSQL消费Kafka写Redis
- 前提条件
 
MRS集群中包含kafka\flink\redis且各个服务运行正常
- 数据流向&业务逻辑
 
本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。
- 样例数据
 
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
{"account": "A8","costs":"37"}
{"account": "A9","costs":"38"}
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
- 操作步骤
 
1、登录FIM,创建Redis集群
2、设置Redis参数REDIS_SECURITY_ENABLED值为false,保存并重启redis服务

3、设置kafka的参数allow.everyone.if.no.acl.found值为true,保存并重启kafka服务
4、给本次实验的集群用户,绑定具有FlinkServer管理操作权限

5、登录FlinkServer webui界面,创建FlinkSQL任务
6、编写FlinkSQL,校验语法格式
CREATE TABLE kafka_source (
  account varchar(10),
  costs int,
  ts AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_test',
  'properties.bootstrap.servers' = 'x.x.x.x:21005',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);CREATE SINK STREAM redis_sink(
  account varchar,
  costs int,
  PRIMARY KEY(account)
) WITH (
  'isSafeMode' = 'false',
  'clusterAddress' = 'x:22400',
  'redistype' = 'String',
  'type' = 'Redis',
  'isSSLMode' = 'false',
  'ttlDurationSecond' = '86400',
  'keyPrefix' = 'mykey'
);
INSERT INTO
  redis_sink
select
  account,
  SUM(costs)
FROM
  kafka_source
GROUP BY
  TUMBLE(ts, INTERVAL '10' SECOND),
  account;

7、启动任务

8、生成数据
kafka-console-producer.sh --broker-list x.x.x.x:21007 --topic topic_test --producer.config /opt/client/Kafka/kafka/config/producer.properties

- 结果验证
 
上述样例数据总共生成A7账户3次,每次消费金额36,经过Flink计算后总共108,查看Redis A7账户结果如下:

- 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)