并发数据结构设计演练
QuestDB是一个时间序列数据库,提供快速的摄取速度、InfluxDB 线路协议和 PGWire 支持以及 SQL 查询语法。QuestDB 主要是用 Java 编写的,我们学到了很多困难而有趣的教训。我们很高兴与您分享。
研究数据结构
并发数据结构设计很难。该博客提供了有关构建非常有利于读者的专用并发地图的指导。本文不仅会介绍另一种现成的数据结构。相反,我将引导您完成设计过程,同时解决实际问题。我什至会介绍我一路上遇到的死胡同。对于对并发编程感兴趣的程序员来说,这是一个侦探故事。
在本文结束时,我们将拥有一个用于在本机内存中存储数据 blob 的并发映射。该映射在读取路径上是无锁的,并且在内存分配方面也非常保守。让我们开始吧!
本文假设您具备 Java 或类 Java 编程语言的基础知识。
问题
我需要一个并发映射,其中键是字符串,值是固定大小的 blob(公共加密密钥)。这听起来像是JDK 中普通的旧式 ConcurentHashMap 的工作,但有一点不同:blob 必须在 Java 堆之外可用。
为什么?这样调用者就可以获取指向 blob 的指针并通过 JNI 将其传递给 Rust 代码。然后 Rust 代码使用公钥来验证数字签名。
这是该界面的简化版本:
interface ConcurrentString2KeyMap {
void set(String username, long keyPtr);
long get(String username);
void remove(String username);
}
该set()
方法接收用户名和指向密钥的指针。映射的寿命比它接收到的指针的寿命长,因此它必须将接收到的指针下的内存复制到自己的缓冲区中。换句话说:该get()
方法必须返回一个指向该内部缓冲区的指针,而不是用于 的原始指针set()
。
我可以假设该get()
方法将在热路径上频繁使用,而突变方法将很少被调用,并且永远不会在热路径上被调用。
读者大致将如何使用它:
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, PUBLIC_KEY_SIZE_BYTES, challengePtr, challengeLen, signaturePtr, signatureLen);
}
如果没有突变,我可以只实现一个预先填充的不可变查找目录,然后就到此为止了。然而,共享可变状态带来了两类挑战:
- 指针生命周期管理
- 地图内部的一致性
第一个问题归结为确保当 amap.get()
返回指针时,该指针必须保持有效,并且其后面的内存在需要的时间内不得更改。在我们的例子中,这意味着直到AuthCrypto.verifySignature()
返回。
第二个问题是关于并发数据结构设计的,我们稍后将更详细地讨论这一点。我们
先探讨第一个问题。
指针生命周期管理
如果我们的映射值只是 JVM 管理的常规对象,那么事情可能会很简单:map.get()
返回对对象的引用,然后它可能会忘记get()
曾经发生过这个调用。和方法只会删除映射对值对象的引用,并且永远不会更改已返回的对象remove()
。set()
简单的。但这不是我们的情况,我们正在使用堆外内存,并且必须自己管理它。
从根本上来说,有两种方法可以解决:
- 更改
get()
合约,使其不返回指针。相反,它从外部接收一个指针并将值复制到那里。 get()
仍然返回一个指针,但映射保证其背后的内存保持不变,直到调用者通知映射它已完成并且不再使用该指针。
选项 1:调用者拥有目标内存
第一个选项看起来很有趣。新合同可能如下所示:
interface ConcurrentString2KeyMap {
void set(String username, long srcKeyPtr);
void get(String username, long dstKeyPtr);
void remove(String username);
}
调用者将拥有该dstKeyPtr
指针,并且映射会将密钥从其内部复制到该指针,并忘记get()
曾经发生过此调用。
起初这听起来相当不错,直到我们意识到它只是把罐头踢了下去:它强制每个调用线程维护自己的缓冲区以传递给get()
. 如果调用者都是单线程的,这仍然很容易:每个调用对象都拥有一个要传递给 的缓冲区get()
。
但如果调用函数本身是并发的,那就会变得更加复杂。我们必须确保每个调用线程使用不同的缓冲区。
理想情况下,缓冲区应该在堆栈上分配,但这是 Java,所以这是不可能的。我们当然不希望
为每次调用在进程堆中分配/取消分配新的缓冲区。
那么还剩下什么呢?汇集?那很乱。
线程局部?更混乱且更难限制缓冲区的数量。
也许选项 1 并不像乍看起来那样有趣。
选项 2:生命周期通知
让我们探讨第二个选项。合同与原始提案中概述的相同:long get(String username)
。我们必须确保指针后面的内存保持不变,直到完成为止。
最简单的事情就是使用读写锁。
每个映射都会有一个关联的读写锁,然后读者在调用之前获取读锁get()
,并仅在从 返回后释放它AuthCrypto.verifySignature()
:
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
map.acquireReadLock();
try {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, PUBLIC_KEY_SIZE_BYTES, challengePtr, challengeLen, signaturePtr, signatureLen);
} finally {
map.releaseReadLock();
}
}
变异器只需在调用set()
or之前获取写锁remove()
。这种设计不仅推理简单,而且实现起来也很简单。
假设只set()
改变remove()
内部状态,我们可以采用单线程映射实现,它就会做到这一点。但有一个问题...它违反了我们最初的要求!
读者经常处于热路径上,我们希望他们保持无锁状态。所提出的设计会在地图更新时阻止读者,因此这是不行的。
我们可以做什么?我们可以将锁定模式更改为更细粒度 - 我们可以锁定特定条目,而不是锁定整个映射。虽然这会改善实际行为,但也会使地图设计复杂化,并且当更新相同的密钥时,读者仍然可能被阻止。
还有什么?我们可以使用乐观锁定模式,但这
会带来其自身的复杂性。
越来越明显的是,指针生命周期管理必须与内部映射实现协同工作。那么这次的演习就完全没有结果了吗?不完全的。
我们仍然可以重用一种设计思想:地图用户必须
明确通知他们不再使用指针。
让我们探索如何
设计地图内部!
为无锁读者设计地图
我认为自己是一位经验丰富的多面手。我对并发编程、分布式系统和各种其他领域有所了解,但我并不真正专注于任何特定主题。
是万事通却一事无成?大概。因此,当我在考虑合适的数据结构时,我做了每个通才在 2023 年都会做的事情:问 ChatGPT!
我很惊讶 GPT 意识到我的意思是写“单一作者”而不是“单一读者”,我认为这是 GPT 知道它在说什么的证据!🙂 所以我进一步阅读:我以前可能听说过 RCU,但我自己从未使用过它。我发现这个描述有点太模糊了,无法用作实施指南,而且无论如何,那是午餐时间。
写时复制插曲
当我走向一个吃午饭的地方时,我思考了更多,并有了一个想法。为什么不使用Copy-On-Write技术来实现持久映射?
这样,我就可以采用常规的单线程映射,并且变异器将克隆当前映射,执行其操作,然后以原子方式将这个新创建的映射设置为读者的映射。然后,读者将使用最新发布的地图。已发布的映射是不可变的,因此对于并发读取者来说始终是安全的,即使来自多个线程,也是无锁的。事实上,甚至无需等待。耶!
此外,当过时的(=不再是最新发布的地图)地图没有读者时,我们必须引入一种安全地重新分配地图内部缓冲区的机制。否则,我们就会泄漏内存。这是一个复杂的问题,但感觉只要有足够的奉献精神和原子引用计数器就可以轻松修复。
所以这一切听起来不错,但正如所预料的那样……仍然有一个问题。我们需要为每个突变的映射内容分配一块内存。我们说过突变很罕见,所以也许这没什么大不了的?也许不是,但 QuestDB 设计原则之一是对内存分配持保守态度,因为它们会消耗 CPU 周期、内存带宽,导致 CPU 缓存抖动,并且通常会引入不可预测的行为。
回到绘图板:地图回收
因此,我无法实现简单的“写时复制”映射,但我觉得我正走在实现目标的正确道路上:无锁读取器。在某些时候,我意识到,我可以只重复使用 2 个地图,而不是在发生更改时分配新地图:一个可供读者使用,另一个可供作者使用。
一旦地图向读者发布,只要至少有一个读者仍在访问它,它就保证是不可变的。
它看起来类似于:
class ConcurrentMap {
private InternalMap readerMap = new ...
private InternalMap writerMap = new ...
void set(String username, long keyPtr) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
long get(String username) {
return getMapForReaders().get(username);
}
void remove(String username) {
getMapForWriters().remove(username);
swapMaps();
}
}
这个想法看起来很简洁,但很明显,上面概述的代码存在许多问题和悬而未决的问题:
- 代码总是只改变一个映射,但我们显然需要保持两个映射同步。我们不能丢失更新。
- 多个变异线程可能会互相踩踏。
- 如果
swapMap()
无条件交换读取器和写入器映射,则执行两个连续突变的变异线程可以写入仍然有一些读取器的映射。这违反了我们的不变量:我们不能让读者和作者同时访问同一个内部映射。 - 如何实施
getMapForWriters()
和getMapForReaders()
?🙂
单人作家 FTW!
让我们从问题#2 开始——多个变异线程。
我们说过突变是罕见的,而且从来不会出现在热门路径上。因此,我们可以采取残酷的做法并使用简单的互斥体 - 以确保始终最多只有一个变异器。无论如何,单写入器原则可以简化并发算法的设计。
因此地图现在看起来像这样:
class ConcurrentMap {
private InternalMap readerMap = new ...
private InternalMap writerMap = new ...
private final Object writeMutex = new Object();
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
}
long get(String username) {
return getMapForReaders().get(username);
}
void remove(String username) {
synchronized(writeMutex) {
getMapForWriters().remove(username);
swapMaps();
}
}
}
那很简单。也许暴力,但很容易。
赛车线
让我们探讨一些更复杂的问题 - 问题#3 - 多个连续的写入操作。我这是什么意思?
考虑这种情况:
- 我们有 2 个实例
InternalMap
,我们称它们为m0
和m1
。 - 该字段
readerMap
引用地图m0
和writerMap
参考文献m1
。 - 读取器线程调用
map.get()
。从而getMapForReaders()
返回m0
。此时,读取器线程被操作系统暂停。 - 编写器线程调用
map.set()
. 从而getMapForWriters()
返回m1
。 - 作家修改
m1
并交换地图。 - 该字段
readerMap
现在引用地图m1
和writerMap
引用m0
。 - 另一位作家呼吁
map.set()
。这样就getMapForWriters()
返回了m0
,作者开始修改它。写入操作需要一段时间。 - 操作系统从 #3 恢复读取器线程,并开始读取
m0
(因为这是读取器在其线程暂停之前获得的地图!) - 此时,我们有一个读取器线程与写入器线程同时访问相同的内部地图实例 -> Boom 💥💥💥!
如果场景看起来太长且无聊并且您跳过了它,这里有一个简短的摘要:读者获得mapForReaders
并在下一刻这张地图变成writerMap
。所以曾经的 areaderMap
现在是 a writerMap
,因此下一个写操作可以随意改变它。除了陈旧的读者仍然认为同一张地图可以安全阅读。这是一个严重的并发错误!
我们如何才能防止上述的不良情况发生?我们已经在写入路径上使用了互斥体,这几乎是最糟糕的。几乎?!我们还能更恶心吗?我们当然可以!
每个内部映射都可以有一个读取器计数器,并且getMapForWriters()
在当前的读取器计数器mapForWriters
达到 0 之前不会返回。换句话说:写入器不会发生变异,writerMap
直到所有读取器表明他们不再使用该映射。
新来的读者怎么样?新读者根本不接触writerMap
,他们总是加载电流readerMap
,所以这不是问题。
说够了!让我们看一些代码:
class ConcurrentMap {
private InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
}
}
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived();
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
private InternalMap getMapForWriters() {
InternalMap map = writerMap;
while(map.hasReaders()) {
backoff();
}
return map;
}
void remove(String username) {
synchronized(writeMutex) {
getMapForWriters().remove(username);
swapMaps();
}
}
interface Reader {
long get(String username);
void readerGone();
}
static class InternalMap implement Reader {
private final AtomicInteger readerCounter;
public void readerGone() {
readerCounter.decrement();
}
public void readerArrived() {
readerCounter.increment();
}
public boolean hasReaders() {
return readerCounter.get() > 0;
}
// the rest of a single threaded map impl
}
}
上面的代码看起来比之前有问题的版本要复杂得多。
让我们简要回顾一下这些变化:
- 最明显的变化:方法
get()
不见了!相反,有一个新方法concurrentReader()
返回Reader
具有两个方法的接口:get()
和readerGone()
- 有一个骨架
InternalMap
。它不显示任何与映射相关的逻辑,因为它可以是类似映射结构的任何单线程实现。它仅表明每个内部地图都有自己的读者计数器。 - 我们第一次看到一个实现
getMapForWriters()
。除了等待所有陈旧的读者消失之外,它实际上没有做任何其他事情writerMap
。该backoff()
方法可以具有各种实现并使用诸如Thread#yield()
或 之类的原语LockSupport#parkNanos()
。
统计读者数量
让我们仔细看看每一个变化。
为什么我们要引入Reader
接口?这难道不是一种不必要的复杂化,也是 Java 文化中普遍存在的过度设计的一个例子吗?
好吧,也许吧,但它简化了读者通知映射他们将不再访问返回的指针的机制。
如何?每个内部地图都有自己的读者计数器。当读者不再需要先前返回的指针时,它必须在正确的内部映射上get()
调用。readerGone()
该Reader
接口正是这样做的——它知道正在使用哪个
实例。InternalMap
当线程调用 时reader.readerGone()
,它会减少该映射上的读取器计数器。
例如:
boolean verifySignature(CharSequence username, long challengePtr, int
challengeLen, long signaturePtr, int signatureLen) {
ConcurrentMap.Reader reader = map.concurrentReader();
try {
long keyPtr = map.get(username);
return AuthCrypto.verifySignature(keyPtr, [...]);
} finally {
reader.readerGone();
}
}
希望这能让我们更清楚为什么我们需要这个Reader
接口。
题外话:你还记得读写锁的设计思想吗?我决定不使用它,因为它可能会阻止读者。但锁的使用模式是这种通知机制的灵感来源。
避免先检查后行动的错误
我们重点关注concurrentReader()
方法的实现。
它看起来像这样:
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived();
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
它加载 current readerMap
,增加其读取器计数器,并且当且仅当该readerMap
字段仍然指向同一InternalMap
实例时将其返回给调用者。否则,它会递减读取器计数器以撤消增量,并从头开始重试所有操作。
为什么这么复杂?为什么我们需要重试机制?这是为了保护我们免受我们已经讨论过的陈旧读者的类似问题的影响。
考虑这个更简单的实现concurrentReader()
:
Reader concurrentReader() { // buggy!
InternalMap map = readerMap;
map.readerArrived(); // increment the reader counter
return map;
}
// getMapForWriters() shown for reference only
private InternalMap getMapForWriters() {
InternalMap map = writerMap;
while (map.hasReaders()) {
backoff();
}
return map;
}
细分下来,我们看到:
- 有一个编写器线程调用
map.set()
和一个读取器线程调用map.concurrentReader()
。 - 读取器线程加载当前的
readerMap
,但操作系统在增加读取器计数器之前暂停它。 - 编写器线程加载当前的
writerMap
,进行突变,并交换映射。这意味着旧的readerMap
现在是新的writerMap
。 - 此时,读者已经
InternalMap
在writerMap
字段中设置了一个实例。 - 还有另一个写入操作。
getMapForWriters()
立即返回当前值writerMap
,因为读取器计数器仍然为零。编写器线程开始改变地图。 - 操作系统恢复读取线程。该线程具有对同一内部映射的引用,该内部映射当前正被该线程从前一点改变。
- 读取器线程会增加内部映射读取器计数器,但这是徒劳的,因为写入器线程已经在改变映射。
- 读取器线程
concurrentReader()
返回一个由写入器线程同时变异的映射 -> Boom 💥💥💥!
额外的签到concurrentReader()
是为了防止上述情况的发生。它保证它增加了地图上的读者计数器,该计数器仍然是当前的readerMap
:
Reader concurrentReader() {
InternalMap map;
for (;;) {
map = readerMap;
map.readerArrived(); // increment the reader counter
if (map == readerMap) {
return map;
}
map.readerGone();
}
}
读取器线程仍然有可能增加读取器计数器,将 . 返回Reader
给调用者,并且在下一微秒内写入器线程交换映射,因此返回给调用者的映射实例现在被设置为writerMap
. 这是完全可能的,但不会造成任何损害。在读取器计数器达到零之前,写入
器将无法访问。writerMap
剩下什么?
至此我们解决了并发算法最难的部分,但是还有一些问题没有解决:
- 地图仍在丢失更新!我们有 2 个内部映射,但每次更新只改变一个映射。
- 一些较小的位:
- 交换地图的作者和加载地图的读者之间不存在“先发生”关系。
- 该
close()
方法未实现,因此映射可能会泄漏本机内存等。
处理丢失的更新
我们可以解决第一个问题。交换地图后,我们可以等到当前writerMap
没有读者再更新。
所以变异操作看起来像这样:
void set(String username, long keyPtr) {
synchronized(writeMutex) {
getMapForWriters().set(username, keyPtr);
swapMaps();
getMapForWriters().set(username, keyPtr);
}
}
这是一个安全的实现,因为可以getMapForWriters()
保证返回的映射没有读取器,并且在下一次交换之前不会有新的读取器到达。
另一方面,它效率低下:当我们在写入后切换映射时,新映射writerMap
可能会有陈旧的读取器,从而导致延迟,直到它们被清除。
有更好的选择吗?事实证明是有的!
我们可以更改第一个地图,交换它们并记住包括所有参数在内的操作。在下一次突变期间,我们将重播对 的操作mapForWriters
,如果
突变足够罕见,那么当我们重播操作时, 已writerMap
不再有任何读者。
我们看一下代码:
void set(String username, long keyPtr) {
synchronized(writeMutex) {
InternalMap map = getMapForWriters();
replayLastOperationOn(map);
map.set(username, keyPtr);
swapMaps();
rememberSetOperation(username, keyPtr);
}
}
看起来map.remove()
像这样:
void remove(String username) {
synchronized(writeMutex) {
InternalMap map = getMapForWriters();
replayLastOperationOn(map);
map.remove(username);
swapMaps();
rememberRemoveOperation(username);
}
}
rememberSetOperation()
必须将指针下的内存复制到自己的缓冲区,但我们只需要记住单个操作。鉴于我们的 blob 是固定大小的,它允许我们继续重用相同的重播缓冲区。零分配。
遵循 Java 内存模型规则
现在让我们做最后一个重要的改变。
看起来是这样的ConcurrentMap
:
class ConcurrentMap {
private InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
private final WriterOperation lastWriterOperation;
[...]
}
整个可变状态被封装在这 4 个对象中。字段writerMap
和lastWriterOperations
只能由持有互斥锁的编写器线程访问。但该readerMap
字段是由编写器线程设置,然后由读取器加载。
读取器是无锁的,在访问读取器映射之前它们不会获取任何互斥体。这是一场数据竞争,可能会导致可见性问题。
修复很简单,只需将其标记readerMap
为volatile
:
class ConcurrentMap {
private volatile InternalMap readerMap = new InternalMap();
private InternalMap writerMap = new InternalMap();
private final Object writeMutex = new Object();
private final WriterOperation lastWriterOperation;
[...]
编写器路径现在如下所示:
- 获取互斥锁
- 加载电流
writerMap
- 等到所有陈旧的读者都消失
- 重播上次操作
- 进行新的突变
- 交换
readerMap
和writerMap
- 记住该操作,以便在新突变期间可以在未触及的地图上重播该操作
现在被readerMap
标记为易失性,为我们提供了顺序一致性。
通俗地说,读者将看到编写器线程完成的最新地图交换。读者还可以保证看到在编写器线程将地图设置为 之前执行的所有更改mapForReaders
。就是这样!
概括
我们经历了设计读取路径上无锁的并发数据结构的过程。我们可以将我们应用的一些设计原则概括为以下规则:
- 单写入者规则:使用互斥体进行写入,确保任何时候只有一个写入者。
- 双地图:维护两张地图——一张给读者,另一张给作者。
- 指针交换机制:当写入器更新时,它会对两个映射进行操作
mapForWriters
,然后交换两个映射的角色。 - 读者计数器:每个地图都有一个原子
readerCounter
。当读者开始阅读时,计数器会在阅读完成后递增和递减。mapForWriters
这可以确保在上一次交换的所有活动读取器完成读取之前没有人访问。 - 更改跟踪:编写者在更新时
mapForWriters
记录他们的修改。这很重要,因为我们需要将这些更改复制到mapForReaders
,而某些读者可能仍在使用它。我们不会等待所有读者转向新地图,而是记录更改并将其应用到后续更新中。鉴于我们在更新后切换地图,到下一次更新时,mapForWriters
旧读者可能已经不再存在,从而可以立即进行更改应用。
下一步是什么?
我们有并发映射的有效实现,但尚未准备好投入生产。还有一些问题需要解决:
- 该
close()
方法未实现,因此映射可能会泄漏本机内存。解决这个问题很简单,我将其作为练习留给读者。 - 没有测试!有多种方法可以测试并发数据结构。您可以使用压力测试,在其中生成大量线程并让它们以随机方式改变映射,然后检查映射和一些不变量的一致性。您可以学习TLA+并编写地图的正式模型,然后对其进行验证。
- 性能优化。当前的实现在两个内部映射上使用相同的写入路径。这很可能不是最好的选择。例如,没有理由计算哈希码两次。没有理由两次定位桶。第一个
set()
可以记住使用了哪个桶,第二个set()
可以重复使用它。我们还可以向编写器路径添加批处理:当前实现在每次突变后交换映射。当编写者在紧密循环中改变映射时,这是低效的。
致谢
完成实施后,我非常兴奋,想与世界分享。我天真地以为我是第一个想出这个想法的人。我错了。
首先,我在令人惊叹的Concurrency Freaks博客中发现了双实例锁定模式。这种模式与我在这里描述的模式非常相似。它还使用两种内部结构,读者可以在它们之间交替使用。它使用读写锁来保护映射被改变。假设只有一个写入者,那么在任何给定时间都至少有一个内部映射可供读取。这为读者提供了锁定自由。
公平地说,双实例锁定
模式更容易推理。它更好地分解问题。但我仍然认为我的贡献是延迟重播上次操作的技巧 - 如果写入者足够稀有,那么写入者根本不会被阻止。
- 点赞
- 收藏
- 关注作者
评论(0)