【技术分享】FlinkSQL消费Kafka写Redis

举报
南派三叔 发表于 2021/09/30 17:08:54 2021/09/30
【摘要】 前提条件 MRS集群中包含kafka\flink\redis且各个服务运行正常 数据流向&业务逻辑 本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。 样例数据 {"account": "A6","costs":"35"} {"account": "A7","costs":"36"...

  • 前提条件

MRS集群中包含kafka\flink\redis且各个服务运行正常

  • 数据流向&业务逻辑

本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。

  • 样例数据

{"account": "A6","costs":"35"}

{"account": "A7","costs":"36"}

{"account": "A8","costs":"37"}

{"account": "A9","costs":"38"}

{"account": "A6","costs":"35"}

{"account": "A7","costs":"36"}

  • 操作步骤

1、登录FIM,创建Redis集群

2、设置Redis参数REDIS_SECURITY_ENABLED值为false,保存并重启redis服务


3、设置kafka的参数allow.everyone.if.no.acl.found值为true,保存并重启kafka服务

4、给本次实验的集群用户,绑定具有FlinkServer管理操作权限


5、登录FlinkServer webui界面,创建FlinkSQL任务

6、编写FlinkSQL,校验语法格式

CREATE TABLE kafka_source (
  account varchar(10),
  costs int,
  ts AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_test',
  'properties.bootstrap.servers' = 'x.x.x.x:21005',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);CREATE SINK STREAM redis_sink(
  account varchar,
  costs int,
  PRIMARY KEY(account)
) WITH (
  'isSafeMode' = 'false',
  'clusterAddress' = 'x:22400',
  'redistype' = 'String',
  'type' = 'Redis',
  'isSSLMode' = 'false',
  'ttlDurationSecond' = '86400',
  'keyPrefix' = 'mykey'
);
INSERT INTO
  redis_sink
select
  account,
  SUM(costs)
FROM
  kafka_source
GROUP BY
  TUMBLE(ts, INTERVAL '10' SECOND),
  account;

7、启动任务

8、生成数据

 kafka-console-producer.sh --broker-list x.x.x.x:21007 --topic topic_test --producer.config /opt/client/Kafka/kafka/config/producer.properties


  • 结果验证

上述样例数据总共生成A7账户3次,每次消费金额36,经过Flink计算后总共108,查看Redis A7账户结果如下:


 


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。