【华为鸿蒙开发技术】仓颉编程语言中的同步机制指南
仓颉编程语言中的同步机制
在并发编程中,确保多个线程对共享资源的安全访问是至关重要的。仓颉编程语言为此提供了多种同步机制,以避免数据竞争和不一致的状态。本文将深入探讨仓颉语言中的三种主要同步机制:原子操作、可重入互斥锁和监视器。
原子操作
原子操作是一种基本的同步机制,确保在多线程环境下对数据的安全访问。仓颉语言支持对整数类型、布尔类型和引用类型的原子操作。对于整数类型,仓颉提供了以下操作:
操作 | 功能 |
---|---|
load |
读取变量的当前值 |
store(val) |
写入新值 |
swap(val) |
交换当前值并返回交换前的值 |
compareAndSwap(old, new) |
比较当前值是否等于old ,如果是则替换为new ,返回是否成功 |
fetchAdd(val) |
对当前值加上val ,返回加操作之前的值 |
fetchSub(val) |
对当前值减去val ,返回减操作之前的值 |
以 AtomicInt8
类型为例,其原子操作的实现如下:
class AtomicInt8 {
public func load(): Int8
public func store(val: Int8): Unit
public func swap(val: Int8): Int8
public func compareAndSwap(old: Int8, new: Int8): Bool
public func fetchAdd(val: Int8): Int8
public func fetchSub(val: Int8): Int8
// ... 其他操作
}
示例:计数器
以下是一个使用原子操作实现计数器的示例代码:
import std.sync.*
import std.time.*
import std.collection.*
let count = AtomicInt64(0)
main(): Int64 {
let list = ArrayList<Future<Int64>>()
// 创建1000个线程
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // 暂停1毫秒
count.fetchAdd(1)
}
list.append(fut)
}
// 等待所有线程完成
for (f in list) {
f.get()
}
let val = count.load()
println("count = ${val}")
return 0
}
输出结果应为:
count = 1000
可重入互斥锁
可重入互斥锁 ReentrantMutex
允许同一线程在持有锁的情况下再次获取该锁,而不会造成死锁。这对于需要在多个函数中共享锁的情况非常有用。
主要成员函数
lock()
: 获取锁,如果锁不可用,则阻塞当前线程。unlock()
: 解锁,如果有其他线程在等待该锁,则唤醒其中一个。tryLock()
: 尝试获取锁,返回是否成功。
示例:计数器保护
以下是一个使用 ReentrantMutex
来保护全局计数器的示例:
import std.sync.*
import std.time.*
import std.collection.*
var count: Int64 = 0
let mtx = ReentrantMutex()
main(): Int64 {
let list = ArrayList<Future<Unit>>()
// 创建1000个线程
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond)
mtx.lock()
count++
mtx.unlock()
}
list.append(fut)
}
// 等待所有线程完成
for (f in list) {
f.get()
}
println("count = ${count}")
return 0
}
输出结果应为:
count = 1000
监视器
监视器是一种高级同步结构,它将互斥锁和条件变量结合在一起,允许线程在某些条件下阻塞自己,等待其他线程的通知。仓颉语言的 Monitor
类提供了如下方法:
wait(timeout!)
: 阻塞当前线程,等待通知。notify()
: 唤醒一个在监视器上等待的线程。notifyAll()
: 唤醒所有在监视器上等待的线程。
示例:生产者-消费者
以下是一个使用监视器实现简单的生产者-消费者模型的示例:
import std.sync.*
import std.time.*
var mon = Monitor()
var buffer: Array<Int64> = Array<Int64>(10, { _ => 0 })
var count: Int64 = 0
func producer() {
while (true) {
synchronized(mon) {
while (count == 10) {
mon.wait() // 等待消费者消费
}
buffer[count] = count
count++
mon.notifyAll() // 通知消费者
}
}
}
func consumer() {
while (true) {
synchronized(mon) {
while (count == 0) {
mon.wait() // 等待生产者生产
}
count--
mon.notifyAll() // 通知生产者
}
}
}
main(): Int64 {
spawn { producer() }
spawn { consumer() }
return 0
}
仓颉语言的同步机制详解
原子操作(Atomic Operations)
原子操作是并发编程中至关重要的一部分。仓颉语言提供了多种原子操作,以确保对共享数据的安全访问。原子操作的特点是不可分割,即在执行过程中不会被其他线程打断,从而避免了数据竞争问题。
支持的原子操作
仓颉的原子操作主要针对整数、布尔和引用类型,具体包括:
- 整数类型:如
Int8
,Int16
,Int32
,Int64
,UInt8
,UInt16
,UInt32
,UInt64
。这些类型的原子操作支持基本的读、写、交换和算术运算。 - 布尔类型和引用类型的原子操作仅支持读、写和交换。
以下是原子操作的一些常用方法:
操作 | 功能 |
---|---|
load |
读取当前值 |
store(val) |
写入新值 |
swap(val) |
交换当前值并返回旧值 |
compareAndSwap(old, new) |
如果当前值等于 old ,则替换为 new ,返回 true 否则返回 false |
fetchAdd(val) |
进行加法运算并返回修改前的值 |
fetchSub(val) |
进行减法运算并返回修改前的值 |
fetchAnd(val) |
执行与运算并返回修改前的值 |
fetchOr(val) |
执行或运算并返回修改前的值 |
fetchXor(val) |
执行异或运算并返回修改前的值 |
示例代码
import std.sync.*
let count = AtomicInt64(0)
main(): Int64 {
let list = ArrayList<Future<Int64>>()
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1ms.
count.fetchAdd(1)
}
list.append(fut)
}
for (f in list) {
f.get()
}
let val = count.load()
println("count = ${val}")
return 0
}
在这个示例中,多个线程同时对 count
进行加1操作,确保最终的结果为1000。
可重入互斥锁(ReentrantMutex)
可重入互斥锁是保护临界区的另一种有效机制。它允许一个线程多次获取同一个锁而不会导致死锁,确保在临界区的代码中,只有一个线程可以执行。
ReentrantMutex 的主要功能
- 加锁 (
lock
):阻塞当前线程,直到锁可用。 - 解锁 (
unlock
):释放锁,唤醒其他阻塞线程。 - 尝试加锁 (
tryLock
):尝试获取锁,如果无法获取则返回false
。
使用示例
import std.sync.*
var count: Int64 = 0
let mtx = ReentrantMutex()
main(): Int64 {
let list = ArrayList<Future<Unit>>()
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond)
mtx.lock()
count++
mtx.unlock()
}
list.append(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}")
return 0
}
这个例子中,多个线程对共享变量 count
进行加1操作,使用 ReentrantMutex
确保线程安全。
Monitor
Monitor 是一种结合了互斥锁和条件变量的同步机制,允许线程阻塞并等待其他线程的信号,以便安全地访问共享资源。
Monitor 的主要方法
- wait:阻塞当前线程并释放锁,等待通知。
- notify:唤醒一个等待线程。
- notifyAll:唤醒所有等待线程。
使用示例
import std.sync.*
var mon = Monitor()
var flag: Bool = true
main(): Int64 {
let fut = spawn {
mon.lock()
while (flag) {
println("New thread: before wait")
mon.wait()
println("New thread: after wait")
}
mon.unlock()
}
sleep(10 * Duration.millisecond)
mon.lock()
println("Main thread: set flag")
flag = false
mon.unlock()
mon.lock()
println("Main thread: notify")
mon.notifyAll()
mon.unlock()
fut.get()
return 0
}
在这个示例中,主线程和子线程通过 Monitor
进行同步,确保安全地共享和更新 flag
变量。
条件变量与 MultiConditionMonitor
MultiConditionMonitor
是一种更复杂的同步机制,允许在同一个监视器上创建多个条件变量,适合于复杂的线程间协作场景。
主要方法
- newCondition:创建新的条件变量。
- wait:在指定条件上等待。
- notify / notifyAll:唤醒等待在特定条件上的线程。
示例实现
import std.sync.*
class BoundedQueue {
let m: MultiConditionMonitor = MultiConditionMonitor()
var notFull: ConditionID
var notEmpty: ConditionID
var count: Int64 = 0
var head: Int64 = 0
var tail: Int64 = 0
let items: Array<Object> = Array<Object>(100, { i => Object() })
init() {
synchronized(m) {
notFull = m.newCondition()
notEmpty = m.newCondition()
}
}
public func put(x: Object) {
synchronized(m) {
while (count == 100) {
m.wait(notFull)
}
items[head] = x
head++
if (head == 100) head = 0
count++
m.notify(notEmpty)
}
}
public func get(): Object {
synchronized(m) {
while (count == 0) {
m.wait(notEmpty)
}
let x: Object = items[tail]
tail++
if (tail == 100) tail = 0
count--
m.notify(notFull)
return x
}
}
}
这个例子实现了一个有界的 FIFO 队列,保证了在多线程环境下的线程安全性。
线程局部变量(ThreadLocal)
线程局部变量允许每个线程都有独立的存储空间,以避免线程间的干扰。使用 ThreadLocal
类可以方便地创建和管理这些变量。
示例代码
import std.sync.*
main(): Int64 {
let tl = ThreadLocal<Int64>()
let fut1 = spawn {
tl.set(123)
println("tl in spawn1 = ${tl.get().getOrThrow()}")
}
let fut2 = spawn {
tl.set(456)
println("tl in spawn2 = ${tl.get().getOrThrow()}")
}
fut1.get()
fut2.get()
return 0
}
在这个示例中,每个线程设置并打印自己的局部变量,确保它们互不干扰。
通过以上的实现和示例,仓颉语言为并发编程提供了强大的同步工具和机制,能够有效解决数据竞争问题,确保多线程环境下的安全和效率。
- 点赞
- 收藏
- 关注作者
评论(0)