Kafka也能实现消息延时了?

HuaweiCloudDeveloper 发表于 2022/06/27 16:42:16 2022/06/27
【摘要】 Kafka作为一个中间件组件,具有与其他中间件组件通用的功能 (异步处理、系统解耦、流量削峰、日志处理), 但在某些特殊的功能方面,每个中间件拥有其独特的特性,其中Kafka作为一个具有高吞吐、高性能的中间件, 它也有其不足的地方,在某些应用场景下面要求中间件实现消息延时的功能,但Kafka本身是不具备这种能力的。 此DEMO项目实现了每条消息实现自定义的延时,详细内容可阅读文章进行了解。

《目录》

  1. 背景
  2. 开发环境
  3. 云服务介绍
  4. 方案设计
    1. 方案简述
    2. 方案架构图
    3. 时序图
  5. 代码参数指南
  6. 代码实现
  7. 结果反馈

1、背景

Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

2、开发环境

开发环境.png

3、云服务介绍

分布式消息服务Kafka版: 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。

4、方案设计

i、方案简述

此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。

ii、方案架构图

Kafka消息延时方案架构图

Kafka消息延时架构图.png
Kafka消息延时实现思路

  1. 生产者将生产消息存入topic_delay主题中进行存储。
  2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。
  3. 取值判断是否满足延时要求。
    a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。
    b. 如果不满足延时要求,则等待自定义时间后重试判断。
  4. 消费者最终从topic_out主题中拉取消息进行消费。

iii、方案时序图

Kafka消息延时方案时序图

Kafka消息延时时序图.png

5、代码参数指南

本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。

Delay.java参数详情

  1. delay:自定义延时时间,单位ms。
  2. topic_delay变量:用于临时存储消息的topic名称。
  3. topic_out变量:用于消费者拉取消息消费的topic名称。
  4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs

6、代码实现

实现代码可参考Kafka消息延时

package com.dms.delay;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Hello world!
 *
 */
public class Delay {

    //缓存队列
    public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue();
    //延迟时间(20秒),可根据需要设置延迟大小
    public static long delay = 20000L;

    /**
     *入口
     * @param args
     */
    public static void main( String[] args )
    {
        //延时主题(用于控制延时缓冲)
        String topic_delay = "topic_delay";
        //输出主题(直接供消费者消费)
        String topic_out = "topic_out";
        /*
        消费线程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                //消费者配置。请根据需要自行设置Kafka配置
                Properties props = new Properties();
                props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
                props.setProperty("group.id", "test");
                props.setProperty("enable.auto.commit", "true");
                props.setProperty("auto.commit.interval.ms", "1000");
                props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                //创建消费者
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                //指定消费主题
                consumer.subscribe(Arrays.asList(topic_delay));
                while (true) {
                    //轮询消费
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
                    //遍历当前轮询批次拉取到的消息
                    for (ConsumerRecord<String, String> record : records){
                        System.out.println(record);
                        //将消息添加到缓存队列
                        link.add(record);
                    }
                }
            }
        }).start();
        /*
        生产线程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                //生产者配置(请根据需求自行配置)
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
                props.put("linger.ms", 1);
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                //创建生产者
                Producer<String, String> producer = new KafkaProducer<>(props);
                //持续从缓存队列中获取消息
                while(true){
                    //如果缓存队列为空则放缓取值速度
                    if(link.isEmpty()){
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        continue;
                    }
                    //获取缓存队列栈顶消息
                    ConsumerRecord<String, String> record = link.peek();
                   //获取该消息时间戳
                    long timestamp = record.timestamp();
                    Date now = new Date();
                    long nowTime = now.getTime();
                    if(timestamp+ Delay.delay <nowTime){
                        //获取消息值
                        String value = record.value();
                        //生产者发送消息到输出主题
                        producer.send(new ProducerRecord<String, String>(topic_out, "",value));
                        //从缓存队列中移除该消息
                        link.poll();
                    }else {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }).start();
    }
}

7、结果反馈

延时结果.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。