2 道直播系统设计题,建议收藏

举报
JavaSouth南哥 发表于 2024/12/19 18:09:04 2024/12/19
【摘要】 简单来看,一次送礼请求需要经过的步骤可以简化为:用户送礼 -> 礼物校验、资产校验 -> 用户扣费 -> 直播间礼物通知 -> 更新礼物排行榜、记录消费日志。

先赞后看,Java进阶一大半

geeksforgeeks 站点给出了一张如何设计 ESPN 直播视频流系统的架构图。

在这里插入图片描述

各位hao,我是南哥,相信对你通关面试、拿下Offer有所帮助。

⭐⭐⭐一份南哥编写的《Java学习/进阶/面试指南》:https://github/JavaSouth

1. 直播礼物系统设计

1.1 表结构设计

视频直播领域的企业,比如抖音、快手、虎牙直播、B站直播,企业赚钱的源头往往靠的是粉丝在直播间刷礼物。你是不是像南哥一样只刷免费的小心心呢?我看了下抖音的直播间,现在小心心还要充钱才能送!

在这里插入图片描述

赚钱的业务必须要重视起来,这必然不是一个小小模块,而是一个礼物系统设计。

特别用户送礼有个必要的用户需求,用户送礼是为了和主播互动,送了个嘉年华,主播半小时才反应过来,那我们直播平台得被用户喷si。这就要求直播送礼的实时性了,虽然送礼内部包含了众多逻辑,看起来不可能快。

先看看下礼物系统的表设计。

(1)礼物表

CREATE TABLE `gifts` (
  `gift_id` INT AUTO_INCREMENT PRIMARY KEY,
  `gift_name` VARCHAR(255) NOT NULL,
  `cost` INT NOT NULL,
  `image_url` VARCHAR(255)
);

(2)用户礼物库存表

CREATE TABLE `user_gifts` (
  `user_gift_id` INT AUTO_INCREMENT PRIMARY KEY,
  `user_id` INT NOT NULL,
  `gift_id` INT NOT NULL,
  `quantity` INT DEFAULT 1,
  `acquired_date` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '获得礼物日期'
);

(3)礼物消费记录表

CREATE TABLE `gift_consumption_records` (
  `record_id` INT AUTO_INCREMENT PRIMARY KEY,
  `user_id` INT NOT NULL,
  `gift_id` INT NOT NULL,
  `anchor_id` INT NOT NULL COMMENT '主播id',
  `quantity` INT NOT NULL,
  `consumed_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

1.2 送礼流程设计

简单来看,一次送礼请求需要经过的步骤可以简化为:

用户送礼 -> 礼物校验、资产校验 -> 用户扣费 -> 直播间礼物通知 -> 更新礼物排行榜、记录消费日志。

上文我有说了送礼要快/准/恨,这么长的业务链条,实时性要怎么保障?

(1)校验接口

用户点击送礼,App端先调用校验接口,校验用户的余额是否充足。这一点很重要,余额不够的则不走下面的流程,减少了大量无效的送礼请求。

// 校验接口
public boolean validateGiftAndBalance(int userId, int giftId, int quantity) {
    // 查询用户余额
    int userBalance = getUserBalance(userId);
    // 查询礼物价格
    int giftCost = getGiftCost(giftId);
    
    // 校验用户余额是否充足
    if (userBalance < giftCost * quantity) {
        return false;
    }
    return true;
}

(2)消息队列

如果余额校验成功,App端将送礼请求发送到后端服务,后端服务把所有送礼请求都统一转发到消息队列Kafka上,同时返回成功给客户端,但客户端仍然不进行礼物展示。

通过消息队列把送礼请求任务化,大大减少了送礼高峰对服务器资源的冲击。而用户送礼成功后的直播间礼物显示留在下一步中。

(3)异步处理

监听Kafka任务的后端服务会处理送礼请求,完成礼物校验、资产校验后,进行实际的用户扣费。

当扣费成功后,后续的流程还有:直播间礼物通知 -> 更新礼物排行榜、记录消费日志,甚至更多杂七杂八新增的业务逻辑。

但要保障实时性,扣费成功后的后续步骤完全可以异步化,异步进行直播间礼物通知、异步更新礼物排行榜、记录消费日志。

// 异步处理
public void processGiftAsync(int userId, int giftId, int quantity, int anchorId) {
    CompletableFuture.runAsync(() -> {
        // 直播间礼物通知
        notifyLiveRoom(userId, giftId, quantity, anchorId);
        // 更新礼物排行榜
        updateGiftRanking(userId, giftId, quantity);
        // 记录消费日志
        recordGiftConsumption(userId, giftId, quantity, anchorId);
    });
}

(4)多实例负载均衡

保证处理送礼请求的后端服务资源充足,根据实际送礼流量增加消费实例进行负载均衡。

1.3 直播间礼物通知

欸,用消息队列处理送礼请求,前面在送礼请求接口都返回成功给客户端了,直播间礼物还没有显示出来那什么时候才显示出来?

这里我们用到的技术是服务器主动推送技术,例如现如今很火的WebSocket实时推送。WebSocket的创始人叫Michael Carter,听说现在每天全球有超过 20 亿台设备在使用WebSocket。

Michael designed the initial WebSocket protocol for HTML5, a technology that is used on more than 2 billion devices across the world every day.

推送直播间礼物显示前,我们得先知道推送给谁,直播间所有用户、主播、送礼的粉丝都是推送的对象。

这些在直播间的用户和直播间是一对多的关系,不可能把这个关系存储到MySQL数据库,毕竟我们要快。业界一般把它存储在内存数据库:Redis。

# 用户、直播间是一对多关系的数据结构
live_room_users:room_id : [user_id, user_id]
# 例如
live_room_users:000 : [001, 002, 003]
live_room_users:111 : [004, 005, 005]

知道了推送对象,我们就可以异步进行推送通知。

// WebSocket通知房间里所有用户
public void notifyLiveRoom(int userId, int giftId, int quantity, int roomId) {
    // 获取房间中所有用户
    Set<Integer> users = getUsersInLiveRoom(roomId);
    String message = String.format("User %d sent %d of gift %d", userId, quantity, giftId);

    // 推送消息给所有用户
    for (Integer user : users) {
        webSocketService.sendMessageToUser(user, message);
    }
}

1.4 送礼连击功能

用户在直播间送礼往往有一个习惯,第一节提到的免费小心心礼物,用户会疯狂连击。一次送礼点击就作为一次送礼请求,很明显对我们的服务器资源很不友好

在客户端设置一个时间窗口,只要用户在时间段内连续点击送礼按钮,客户端统计出点击次数,作为一次送礼请求。

在这里插入图片描述

// 批量送礼接口
public void batchSendGift(int userId, int giftId, int totalQuantity, int roomId, int anchorId) {
    // 客户端统计点击次数,作为一次送礼请求
    processGiftAsync(userId, giftId, totalQuantity, anchorId);
}

1.5 事务控制

礼物校验、资产校验、用户扣费,这些涉及资金的业务最好加上严格的事务控制,只要有一丁点出错,所有的操作进行回滚。

// 事务控制
@Transactional
public boolean processGiftTransaction(int userId, int giftId, int quantity, int anchorId) {
    try {
        // 礼物校验、资产校验
        if (!validateGiftAndBalance(userId, giftId, quantity)) {
            return false;
        }
        // 扣费
        deductUserBalance(userId, giftId, quantity);
        
        // 其他异步业务逻辑
        processGiftAsync(userId, giftId, quantity, anchorId)
        return true;
    } catch (Exception e) {
        throw new RuntimeException("Gift processing failed, transaction rolled back", e);
    }
}

2. 直播弹幕设计

2.1 底层数据结构支持

南友们看看右下角的弹幕列表,这个弹幕列表就是我们要的攻克的对象,至于中间视频直播的走马灯弹幕,它其实也是根据弹幕列表的数据来进行滚动。

南哥观察了下这个直播间,现在有41.5万人在观看!

在这里插入图片描述

计算机世界实际上是现实世界的抽象,那弹幕列表我们要用什么数据结构支持。

我希望用的是Redis,Redis官方写着这么霸气的宣传语。

Get the world’s fastest in-memory database from the ones who built it

从构建者那里获取世界上最快的内存数据库

而底层数据结构我们使用Redis五大基本数据类型之一:Zset。Zset是一种有序集合类型,它有一个score值,score值用来存储用户发送弹幕的时间戳,那整个列表就会根据时间戳来进行排序。

而Zset元素的值就作为用户弹幕,例如上图的:剧本演了又演

有了底层数据结构支持,我们来说说这个弹幕列表有什么功能限制。大家有没注意到我们进入直播间,直播间是不会把所有的弹幕内容都
显示出来的,往往只是显示前10条。

那我们也给弹幕列表加上这个特性,在Redis的Zset结构设置只保留前10条的属性。

// 创建Zset数据结构并设置10条的限制
public class DanmakuService {

    private Jedis jedis = new Jedis("localhost");

    public void addDanmaku(String roomId, String danmaku, long timestamp) {
        String key = "room:" + roomId + ":danmaku";
        jedis.zadd(key, timestamp, danmaku);
        // 只保留最新的10条弹幕
        jedis.zremrangeByRank(key, 0, -(11));
    }
}

2.2 弹幕列表查询

那用户进入直播间,弹幕列表是怎么查询出来的?

我们按最简单高效来,用户进入直播间,客户端调用API接口去查询出Redis里的弹幕列表。

有南友会问:这只是最近的前10条聊天记录,后面的呢?

别急,有两种方案。

(1)轮询查询

客户端轮询查询API接口,不断抓取出用户发出的新弹幕。具体细节的话,客户端第一次查询出的弹幕列表的数据结构是:[(时间戳1: 弹幕1), (时间戳2: 弹幕2)]

后续查询客户端继续轮询调用API接口,同时携带当前弹幕列表的最大时间戳入参。而后端服务就会根据该时间戳返回比该时间戳大的数据,用户发送的新的弹幕也就会显示出来。

// 轮询API接口
public class DanmakuService {

    private Jedis jedis = new Jedis("localhost");

    public Set<String> getRecentDanmaku(String roomId, long lastTimestamp) {
        String key = "room:" + roomId + ":danmaku";
        return jedis.zrangeByScore(key, lastTimestamp + 1, Long.MAX_VALUE);
    }
}

轮询API我们要设置多少时间轮询一次呢?我们先设置 3 秒轮询更新一次弹幕列表,后续再根据用户反馈、服务器资源进行优化调整。

(2)WebSocket技术

视频直播间有个特点,主播和观众是无时不刻在进行互动聊天的,这就要求音视频要实时同步了。那直播弹幕就更应该实时,使用第一种轮询API的方法,可能会有 3 秒延迟的情况发生。

要实时推送新的弹幕,我们可以使用WebSocket技术,客户端和WebSocket服务器保存长连接,用户只要发送新的弹幕消息,WebSocket服务器便会实时推送到客户端上。

第一种方法虽然粗暴,如果轮询查询会空,那本次查询就是一次资源浪费,对服务器资源不友好。但他简单高效,出错情况也少。

如果老板要你半个月上线这个弹幕列表功能,那第一种方法也未尝不可。后续我们再来根据实际情况作出升级调整的策略,例如升级为WebSocket技术。

2.3 系统流程

南哥画下整个系统的流程。

在这里插入图片描述

用户通过客户端发送弹幕,通过后端服务把弹幕消息发送到Kafka。使用Kafka消息我们就可以进行流量削峰,弹幕消息有时是成千上万 / 秒,把存储弹幕消息的任务通过消息队列缓存起来,一个个执行,减少服务器瞬时的高压力。

发送到Kafka后,负责监听弹幕Kafka消息的后端服务会把弹幕消息写入Redis。

// 监听写入Redis
public class DanmakuListener {

    private Jedis jedis = new Jedis("localhost");

    @KafkaListener(topics = "danmaku", groupId = "group_id")
    public void listen(ConsumerRecord<String, String> record) {
        String roomId = record.key();
        String message = record.value();
        long timestamp = System.currentTimeMillis();
        
        String key = "room:" + roomId + ":danmaku";
        // 将弹幕消息写入Redis Zset
        jedis.zadd(key, timestamp, message);
        // 保留最近10条
        jedis.zremrangeByRank(key, 0, -11);
    }
}

2.4 消息丢失问题

有这么一种情况,用户A发送了(1726406132, 弹幕A),用户B发送了(1726406150, 弹幕B),用户A先发送了弹幕,用户B再发送弹幕。

如果用户B的弹幕先写入到了Redis的Zset列表,其他用户进入直播间查询了第一个弹幕列表。那即使用户A后面成功写入了弹幕,其他用户也不会获取到用户A的弹幕。

因为客户端只会更新比弹幕B时间戳更大的弹幕消息。这怎么处理?

我们要解决的是弹幕B先于弹幕A成功写入的问题,不考虑其他特殊情况,可以给写入Redis的方法加上分布式锁的功能,保证先获取锁的弹幕消息写入的过程中,不会有其他弹幕消息写入的干扰。

// 在写入弹幕时获取分布式锁
public class DanmakuService {

    private RedisLockUtil redisLock = new RedisLockUtil();
    
    private Jedis jedis = new Jedis("localhost");

    public void addDanmakuWithLock(String roomId, String danmaku, long timestamp) {
        String lockKey = "lock:" + roomId;
        try {
            if (redisLock.acquireLock(lockKey)) {
                String key = "room:" + roomId + ":danmaku";
                jedis.zadd(key, timestamp, danmaku);
                jedis.zremrangeByRank(key, 0, -11);
            }
        } finally {
            redisLock.releaseLock(lockKey);
        }
    }
}
public class RedisLockUtil {

    private Jedis jedis = new Jedis("localhost");
    
    private static final int EXPIRE_TIME = 5000; // 5秒

    // 获取锁
    public boolean acquireLock(String lockKey) {
        long currentTime = System.currentTimeMillis();
        String result = jedis.set(lockKey, String.valueOf(currentTime), "NX", "PX", EXPIRE_TIME);
        return "OK".equals(result);
    }

    // 释放锁
    public void releaseLock(String lockKey) {
        jedis.del(lockKey);
    }
}

我是南哥,南就南在Get到你的点赞点赞点赞。

在这里插入图片描述

看了就赞,Java进阶一大半。点赞 | 收藏 | 关注,各位的支持就是我创作的最大动力❤️

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。