【技术分享】FlinkSQL消费kafka数据写入Redis
【摘要】 前提条件MRS集群中包含kafka\flink\redis且各个服务运行正常 数据流向&业务逻辑本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。 样例数据{"account": "A6","costs":"35"}{"account": "A7","costs":"36"}{"acco...
- 前提条件
MRS集群中包含kafka\flink\redis且各个服务运行正常
- 数据流向&业务逻辑
本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。
- 样例数据
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
{"account": "A8","costs":"37"}
{"account": "A9","costs":"38"}
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
- 操作步骤
1、登录FIM,创建Redis集群
2、设置Redis参数REDIS_SECURITY_ENABLED值为false,保存并重启redis服务
3、设置kafka的参数allow.everyone.if.no.acl.found值为true,保存并重启kafka服务
4、给本次实验的集群用户,绑定具有FlinkServer管理操作权限
5、登录FlinkServer webui界面,创建FlinkSQL任务
6、编写FlinkSQL,校验语法格式
CREATE TABLE kafka_source (
account varchar(10),
costs int,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'topic_test',
'properties.bootstrap.servers' = 'x.x.x.x:21005',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);CREATE SINK STREAM redis_sink(
account varchar,
costs int,
PRIMARY KEY(account)
) WITH (
'isSafeMode' = 'false',
'clusterAddress' = 'x:22400',
'redistype' = 'String',
'type' = 'Redis',
'isSSLMode' = 'false',
'ttlDurationSecond' = '86400',
'keyPrefix' = 'mykey'
);
INSERT INTO
redis_sink
select
account,
SUM(costs)
FROM
kafka_source
GROUP BY
TUMBLE(ts, INTERVAL '10' SECOND),
account;
7、启动任务
8、生成数据
kafka-console-producer.sh --broker-list x.x.x.x:21007 --topic topic_test --producer.config /opt/client/Kafka/kafka/config/producer.properties
- 结果验证
上述样例数据总共生成A7账户3次,每次消费金额36,经过Flink计算后总共108,查看Redis A7账户结果如下:
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)