2 道直播系统设计题,建议收藏
先赞后看,Java进阶一大半
geeksforgeeks
站点给出了一张如何设计 ESPN
直播视频流系统的架构图。
各位hao,我是南哥,相信对你通关面试、拿下Offer有所帮助。
⭐⭐⭐一份南哥编写的《Java学习/进阶/面试指南》:https://github/JavaSouth
1. 直播礼物系统设计
1.1 表结构设计
视频直播领域的企业,比如抖音、快手、虎牙直播、B站直播,企业赚钱的源头往往靠的是粉丝在直播间刷礼物。你是不是像南哥一样只刷免费的小心心
呢?我看了下抖音的直播间,现在小心心还要充钱才能送!
赚钱的业务必须要重视起来,这必然不是一个小小模块,而是一个礼物系统设计。
特别用户送礼有个必要的用户需求,用户送礼是为了和主播互动,送了个嘉年华
,主播半小时才反应过来,那我们直播平台得被用户喷si。这就要求直播送礼的实时性了,虽然送礼内部包含了众多逻辑,看起来不可能快。
先看看下礼物系统的表设计。
(1)礼物表
CREATE TABLE `gifts` (
`gift_id` INT AUTO_INCREMENT PRIMARY KEY,
`gift_name` VARCHAR(255) NOT NULL,
`cost` INT NOT NULL,
`image_url` VARCHAR(255)
);
(2)用户礼物库存表
CREATE TABLE `user_gifts` (
`user_gift_id` INT AUTO_INCREMENT PRIMARY KEY,
`user_id` INT NOT NULL,
`gift_id` INT NOT NULL,
`quantity` INT DEFAULT 1,
`acquired_date` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '获得礼物日期'
);
(3)礼物消费记录表
CREATE TABLE `gift_consumption_records` (
`record_id` INT AUTO_INCREMENT PRIMARY KEY,
`user_id` INT NOT NULL,
`gift_id` INT NOT NULL,
`anchor_id` INT NOT NULL COMMENT '主播id',
`quantity` INT NOT NULL,
`consumed_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
1.2 送礼流程设计
简单来看,一次送礼请求需要经过的步骤可以简化为:
用户送礼 -> 礼物校验、资产校验 -> 用户扣费 -> 直播间礼物通知 -> 更新礼物排行榜、记录消费日志。
上文我有说了送礼要快/准/恨,这么长的业务链条,实时性要怎么保障?
(1)校验接口
用户点击送礼,App端先调用校验接口,校验用户的余额是否充足。这一点很重要,余额不够的则不走下面的流程,减少了大量无效的送礼请求。
// 校验接口
public boolean validateGiftAndBalance(int userId, int giftId, int quantity) {
// 查询用户余额
int userBalance = getUserBalance(userId);
// 查询礼物价格
int giftCost = getGiftCost(giftId);
// 校验用户余额是否充足
if (userBalance < giftCost * quantity) {
return false;
}
return true;
}
(2)消息队列
如果余额校验成功,App端将送礼请求发送到后端服务,后端服务把所有送礼请求都统一转发到消息队列Kafka上,同时返回成功
给客户端,但客户端仍然不进行礼物展示。
通过消息队列把送礼请求任务化,大大减少了送礼高峰对服务器资源的冲击。而用户送礼成功后的直播间礼物显示留在下一步中。
(3)异步处理
监听Kafka任务的后端服务会处理送礼请求,完成礼物校验、资产校验后,进行实际的用户扣费。
当扣费成功后,后续的流程还有:直播间礼物通知 -> 更新礼物排行榜、记录消费日志,甚至更多杂七杂八新增的业务逻辑。
但要保障实时性,扣费成功后的后续步骤完全可以异步化,异步进行直播间礼物通知、异步更新礼物排行榜、记录消费日志。
// 异步处理
public void processGiftAsync(int userId, int giftId, int quantity, int anchorId) {
CompletableFuture.runAsync(() -> {
// 直播间礼物通知
notifyLiveRoom(userId, giftId, quantity, anchorId);
// 更新礼物排行榜
updateGiftRanking(userId, giftId, quantity);
// 记录消费日志
recordGiftConsumption(userId, giftId, quantity, anchorId);
});
}
(4)多实例负载均衡
保证处理送礼请求的后端服务资源充足,根据实际送礼流量增加消费实例进行负载均衡。
1.3 直播间礼物通知
欸,用消息队列处理送礼请求,前面在送礼请求接口都返回成功给客户端了,直播间礼物还没有显示出来那什么时候才显示出来?
这里我们用到的技术是服务器主动推送技术,例如现如今很火的WebSocket实时推送
。WebSocket的创始人叫Michael Carter
,听说现在每天全球有超过 20 亿台设备在使用WebSocket。
Michael designed the initial WebSocket protocol for HTML5, a technology that is used on more than 2 billion devices across the world every day.
推送直播间礼物显示前,我们得先知道推送给谁,直播间所有用户、主播、送礼的粉丝都是推送的对象。
这些在直播间的用户和直播间是一对多的关系,不可能把这个关系存储到MySQL数据库,毕竟我们要快。业界一般把它存储在内存数据库:Redis。
# 用户、直播间是一对多关系的数据结构
live_room_users:room_id : [user_id, user_id]
# 例如
live_room_users:000 : [001, 002, 003]
live_room_users:111 : [004, 005, 005]
知道了推送对象,我们就可以异步进行推送通知。
// WebSocket通知房间里所有用户
public void notifyLiveRoom(int userId, int giftId, int quantity, int roomId) {
// 获取房间中所有用户
Set<Integer> users = getUsersInLiveRoom(roomId);
String message = String.format("User %d sent %d of gift %d", userId, quantity, giftId);
// 推送消息给所有用户
for (Integer user : users) {
webSocketService.sendMessageToUser(user, message);
}
}
1.4 送礼连击功能
用户在直播间送礼往往有一个习惯,第一节提到的免费小心心
礼物,用户会疯狂连击。一次送礼点击就作为一次送礼请求,很明显对我们的服务器资源很不友好。
在客户端设置一个时间窗口,只要用户在时间段内连续点击送礼按钮,客户端统计出点击次数,作为一次送礼请求。
// 批量送礼接口
public void batchSendGift(int userId, int giftId, int totalQuantity, int roomId, int anchorId) {
// 客户端统计点击次数,作为一次送礼请求
processGiftAsync(userId, giftId, totalQuantity, anchorId);
}
1.5 事务控制
礼物校验、资产校验、用户扣费,这些涉及资金的业务最好加上严格的事务控制,只要有一丁点出错,所有的操作进行回滚。
// 事务控制
@Transactional
public boolean processGiftTransaction(int userId, int giftId, int quantity, int anchorId) {
try {
// 礼物校验、资产校验
if (!validateGiftAndBalance(userId, giftId, quantity)) {
return false;
}
// 扣费
deductUserBalance(userId, giftId, quantity);
// 其他异步业务逻辑
processGiftAsync(userId, giftId, quantity, anchorId)
return true;
} catch (Exception e) {
throw new RuntimeException("Gift processing failed, transaction rolled back", e);
}
}
2. 直播弹幕设计
2.1 底层数据结构支持
南友们看看右下角的弹幕列表,这个弹幕列表就是我们要的攻克的对象,至于中间视频直播的走马灯弹幕
,它其实也是根据弹幕列表的数据来进行滚动。
南哥观察了下这个直播间,现在有41.5万
人在观看!
计算机世界实际上是现实世界的抽象,那弹幕列表我们要用什么数据结构支持。
我希望用的是Redis,Redis官方写着这么霸气的宣传语。
Get the world’s fastest in-memory database from the ones who built it
从构建者那里获取世界上最快的内存数据库
而底层数据结构我们使用Redis五大基本数据类型之一:Zset。Zset是一种有序集合类型,它有一个score值,score值用来存储用户发送弹幕的时间戳,那整个列表就会根据时间戳来进行排序。
而Zset元素的值就作为用户弹幕,例如上图的:剧本演了又演
。
有了底层数据结构支持,我们来说说这个弹幕列表有什么功能限制。大家有没注意到我们进入直播间,直播间是不会把所有的弹幕内容都
显示出来的,往往只是显示前10条。
那我们也给弹幕列表加上这个特性,在Redis的Zset结构设置只保留前10条的属性。
// 创建Zset数据结构并设置10条的限制
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public void addDanmaku(String roomId, String danmaku, long timestamp) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
// 只保留最新的10条弹幕
jedis.zremrangeByRank(key, 0, -(11));
}
}
2.2 弹幕列表查询
那用户进入直播间,弹幕列表是怎么查询出来的?
我们按最简单高效来,用户进入直播间,客户端调用API接口去查询出Redis里的弹幕列表。
有南友会问:这只是最近的前10条聊天记录,后面的呢?
别急,有两种方案。
(1)轮询查询
客户端轮询查询API接口,不断抓取出用户发出的新弹幕。具体细节的话,客户端第一次查询出的弹幕列表的数据结构是:[(时间戳1: 弹幕1), (时间戳2: 弹幕2)]
。
后续查询客户端继续轮询调用API接口,同时携带当前弹幕列表的最大时间戳入参。而后端服务就会根据该时间戳返回比该时间戳大的数据,用户发送的新的弹幕也就会显示出来。
// 轮询API接口
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public Set<String> getRecentDanmaku(String roomId, long lastTimestamp) {
String key = "room:" + roomId + ":danmaku";
return jedis.zrangeByScore(key, lastTimestamp + 1, Long.MAX_VALUE);
}
}
轮询API我们要设置多少时间轮询一次呢?我们先设置 3 秒
轮询更新一次弹幕列表,后续再根据用户反馈、服务器资源进行优化调整。
(2)WebSocket技术
视频直播间有个特点,主播和观众是无时不刻在进行互动聊天的,这就要求音视频要实时同步了。那直播弹幕就更应该实时,使用第一种轮询API的方法,可能会有 3 秒延迟
的情况发生。
要实时推送新的弹幕,我们可以使用WebSocket技术,客户端和WebSocket服务器保存长连接,用户只要发送新的弹幕消息,WebSocket服务器便会实时推送到客户端上。
第一种方法虽然粗暴,如果轮询查询会空,那本次查询就是一次资源浪费,对服务器资源不友好。但他简单高效,出错情况也少。
如果老板要你半个月上线这个弹幕列表功能,那第一种方法也未尝不可。后续我们再来根据实际情况作出升级调整的策略,例如升级为WebSocket技术。
2.3 系统流程
南哥画下整个系统的流程。
用户通过客户端发送弹幕,通过后端服务把弹幕消息发送到Kafka。使用Kafka消息我们就可以进行流量削峰,弹幕消息有时是成千上万 / 秒,把存储弹幕消息的任务通过消息队列缓存起来,一个个执行,减少服务器瞬时的高压力。
发送到Kafka后,负责监听弹幕Kafka消息的后端服务会把弹幕消息写入Redis。
// 监听写入Redis
public class DanmakuListener {
private Jedis jedis = new Jedis("localhost");
@KafkaListener(topics = "danmaku", groupId = "group_id")
public void listen(ConsumerRecord<String, String> record) {
String roomId = record.key();
String message = record.value();
long timestamp = System.currentTimeMillis();
String key = "room:" + roomId + ":danmaku";
// 将弹幕消息写入Redis Zset
jedis.zadd(key, timestamp, message);
// 保留最近10条
jedis.zremrangeByRank(key, 0, -11);
}
}
2.4 消息丢失问题
有这么一种情况,用户A发送了(1726406132, 弹幕A)
,用户B发送了(1726406150, 弹幕B)
,用户A先发送了弹幕,用户B再发送弹幕。
如果用户B的弹幕先写入到了Redis的Zset列表,其他用户进入直播间查询了第一个弹幕列表。那即使用户A后面成功写入了弹幕,其他用户也不会获取到用户A的弹幕。
因为客户端只会更新比弹幕B时间戳更大的弹幕消息。这怎么处理?
我们要解决的是弹幕B先于弹幕A成功写入的问题,不考虑其他特殊情况,可以给写入Redis的方法加上分布式锁的功能,保证先获取锁的弹幕消息写入的过程中,不会有其他弹幕消息写入的干扰。
// 在写入弹幕时获取分布式锁
public class DanmakuService {
private RedisLockUtil redisLock = new RedisLockUtil();
private Jedis jedis = new Jedis("localhost");
public void addDanmakuWithLock(String roomId, String danmaku, long timestamp) {
String lockKey = "lock:" + roomId;
try {
if (redisLock.acquireLock(lockKey)) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
jedis.zremrangeByRank(key, 0, -11);
}
} finally {
redisLock.releaseLock(lockKey);
}
}
}
public class RedisLockUtil {
private Jedis jedis = new Jedis("localhost");
private static final int EXPIRE_TIME = 5000; // 5秒
// 获取锁
public boolean acquireLock(String lockKey) {
long currentTime = System.currentTimeMillis();
String result = jedis.set(lockKey, String.valueOf(currentTime), "NX", "PX", EXPIRE_TIME);
return "OK".equals(result);
}
// 释放锁
public void releaseLock(String lockKey) {
jedis.del(lockKey);
}
}
我是南哥,南就南在Get到你的点赞点赞点赞。
看了就赞,Java进阶一大半。点赞 | 收藏 | 关注,各位的支持就是我创作的最大动力❤️
- 点赞
- 收藏
- 关注作者
评论(0)