【Python从入门到精通】(二十一)Python并发编程互斥锁的运用以及线程通信
您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦。
本文主要介绍利用互斥锁处理多线程安全的处理以及线程通信。
干货满满,建议收藏,需要用到时常看看。 小伙伴们如有问题及需要,欢迎踊跃留言哦~ ~ ~。
前言
上一篇文章文章简单的介绍了并发编程的基本概念,了解了线程的基本运用。【Python从入门到精通】(二十)Python并发编程的基本概念-线程的使用以及生命周期。这篇文章将重点介绍如何运用互斥锁处理并发问题以及如果进行线程通信。
线程安全的问题
多线程用起来挺爽的,就是一不小心就可能会出现并发问题。比如:两个小孩争抢同一个糖果,两个线程对同一个共享资源进行访问。举个栗子吧!现在某个账户中有2000块钱。甲乙丙(三个线程)同时从该账户中取钱,每个人线程分别取800块。按照期望的总有一个线程取钱失败。因为账户的余额不能为负数。但是在并发情况下极有可能三个人都取钱成功,最终的账户余额为负数。
import threading
class Account:
# 定义构造函数
def __init__(self, account_no, balance):
"""
:param account_no: 账户
:param balance: 余额
"""
self.account_no = account_no
self._balance = balance
def draw(self, draw_amount):
"""
:param draw_amount: 需要取的钱
:return:
"""
if self._balance > draw_amount:
print(threading.current_thread().getName() + '从' + self.account_no + " 取钱成功,账户取出金额是:" + str(
draw_amount) + "\n")
self._balance = self._balance - draw_amount
print('账户余额是', self._balance)
else:
print(threading.current_thread().getName() + '从' + self.account_no + " 取钱失败\n")
# 两个线程并发取钱
account = Account('账户一', 2000)
threading.Thread(target=account.draw, name='线程一', args=(800,)).start()
threading.Thread(target=account.draw, name='线程二', args=(800,)).start()
threading.Thread(target=account.draw, name='线程三', args=(800,)).start()
极有可能出现如下运行结果:
线程一从账户一 取钱成功,账户取出金额是:800
线程二从账户一 取钱成功,账户取出金额是:800
账户余额是 1200
线程三从账户一 取钱成功,账户取出金额是:800
账户余额是 400
账户余额是 -400
这就是对共享资源不加控制引发的线程安全问题。线程安全问题出现一般有如下两种情况:
- 可见性问题: 由于CPU缓存导致的可见性问题,导致线程一对共享变量A的操作不能立即被线程二看到,这就导致线程二取到的还是旧值。
- 原子性问题: 比如:
self._balance = self._balance - draw_amount
这个语句并不是一个原子的CPU执行命令。这个语句的执行共有三条CPU命令,分别是:- 指令一:将
self._balance
的值从内存加载到CPU的寄存器中。 - 指令二:在寄存器中执行
self._balance - draw_amount
操作。 - 指令三:将新值写入到内存中(由于存在缓存机制,写入的可能是CPU缓存而不是堆内存)。
一般认为单条CPU指令是原子的,但是在执行多条CPU指令时可能随时会发生线程切换。切换之后的线程取到的值还是老值。
- 指令一:将
互斥锁
那么多线程并发情况导致的问题,我们该如何处理呢?就像一群小孩争抢一盒糖果,我们通过可以通过让小孩排队的方式来分发糖果。应用到从银行取钱上:同一个账户同一时间内只能有一个人进行取钱操作。只有上一个人取完钱了,下一个人才能开始取钱。说白了就是给账户加个锁。Python中有互斥锁来保护共享资源。Python的threading模块提供了Lock和RLock两个类,他们都提供了加锁和释放锁的方法。
加锁:acquire(blocking=Ture,time=-1):
请求对Lock或RLock加锁,其中timeout参数指定加锁多少秒。
释放锁:release()
。
其中Lock:是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。
RLock: 它代表可重入锁(Reentrant Lock)。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用RLock,那么accquire()和release()方法必须成对出现。如果调用了n次accquire()加锁,则必须调用n次release()才能释放锁。
在实际开发中我们需要控制锁的粒度,之中需要加锁的地方加锁。这句话该如何理解呢?
那就是能对代码块加锁就不要直接对方法加锁。还是举个栗子吧!以上面取钱为例。
import threading
class Account:
# 定义构造函数
def __init__(self, account_no, balance):
"""
:param account_no: 账户
:param balance: 余额
"""
self.account_no = account_no
self._balance = balance
self.lock = threading.RLock()
def draw(self, draw_amount):
"""
:param draw_amount: 需要取的钱
:return:
"""
# 加锁
self.lock.acquire()
try:
if self._balance > draw_amount:
self._balance = self._balance - draw_amount
print(threading.current_thread().getName() + '从' + self.account_no + " 取钱成功,账户余额是:" + str(
self._balance) + "\n")
else:
print(threading.current_thread().getName() + '从' + self.account_no + " 取钱失败\n")
finally:
# 释放锁
self.lock.release()
# 两个线程并发取钱
account = Account('账户一', 2000)
threading.Thread(target=account.draw, name='线程一', args=(800,)).start()
threading.Thread(target=account.draw, name='线程二', args=(800,)).start()
threading.Thread(target=account.draw, name='线程三', args=(800,)).start()
运行结果是:
线程一从账户一 取钱成功,账户余额是:1200
线程二从账户一 取钱成功,账户余额是:400
线程三从账户一 取钱失败
线程通信
Python使用Condition对象来协调锁。Condition对象与Lock对象组合使用,可以为每个对象提供多个等待集(wait-set),因此,
Condition对象总是需要有对应的Lock对象。创建Condition对象是需要传入绑定的Lock对象,如果不指定lock对象,在创建Condition时它会自动创建一个与之绑定的RLock对象。
Condition类提供了如下几个方法:
- acquire([timeout])/release():调用Condition关联的Lock的acquire()或release()方法。
- wait([timeout]):导致当前线程进入Condition的等待池等待通知并释放锁,直到其他线程调用该Condition的notify()或者notify_all()方法来唤醒该线程。在调用该wait()方法时可以传入一个timeout参数,指定该线程最多等待多少秒。
- notify(): 唤醒在该Condition等待池中的单个线程并通知它,收到通知的线程会自动调用accquire()方法尝试加锁。如果所有线程都在该Condition等待池中等待,则会选择唤醒其中一个线程,选择是任意性的。
- notify_all():唤醒在该Condition等待池中等待的所有线程并通知它们。
举个例子吧:下面这个Account类中定义了一个存款方法deposit,一个取款方法draw。只有有人存款之后才能有人取款。并且取款的金额不能为负数。存款者不能连续两次存款,取款者也不能连续两次取款。这里通过wait方法让当前线程进入等待池,通过notify_all方法唤醒等待池中的所有线程。
import threading
class Account:
# 定义构造函数
def __init__(self, account_no, balance):
self.account_no = account_no
self._balance = balance
self.condition = threading.Condition()
# 定义代表是否已经存钱的标识
self.__deposit_flag = False
# 取钱
def draw(self, draw_amount):
# 加锁
self.condition.acquire()
try:
# 还没存钱
if not self.__deposit_flag:
self.condition.wait()
else:
if self._balance >= draw_amount:
self._balance = self._balance - draw_amount
print(threading.current_thread().getName() + " 取钱成功,账户余额是:" + str(self._balance) + "\n")
else:
print(threading.current_thread().getName() + " 取钱失败\n")
# 将标识账户已有存款的标识改成False
self.__deposit_flag = False
# 唤醒其他等待现车线程
self.condition.notify_all()
finally:
# 释放锁
self.condition.release()
# 存钱
def deposit(self, deposit_amount):
# 加锁
self.condition.acquire()
try:
# 如果已经存款了,则等待取款
if self.__deposit_flag:
self.condition.wait()
else:
self._balance = self._balance + deposit_amount
print(threading.current_thread().getName() + " 存款成功,存款金额是:" + str(deposit_amount) + "\n")
# 将存款标识改成已存款
self.__deposit_flag = True
# 唤醒其他线程
self.condition.notify_all()
finally:
# 释放锁
self.condition.release()
def draw_many(account, draw_amount, max):
for i in range(max):
account.draw(draw_amount)
def deposit_many(account, deposit_amount, max):
for i in range(max):
account.deposit(deposit_amount)
# 创建一个账户
account = Account("账户一", 0)
# 创建并启动取钱线程
draw_1 = threading.Thread(name='取钱者一', target=draw_many, args=(account, 200, 50))
draw_1.start()
draw_2 = threading.Thread(name='取钱者二', target=draw_many, args=(account, 200, 50))
draw_2.start()
# 创建并启动存钱线程
deposit_1 = threading.Thread(name='存钱者一', target=deposit_many, args=(account, 200, 50))
deposit_1.start()
deposit_2 = threading.Thread(name='存钱者二', target=deposit_many, args=(account, 200, 50))
deposit_2.start()
draw_1.join()
draw_2.join()
deposit_1.join()
deposit_2.join()
运行结果是:
这里关键的控制点是__deposit_flag 标识,用它来标记是否有存款,默认值为False。如果已经存款了则该值为True。如果取款之后则将该值改成False。通过它来控制取款线程和存款线程是进入等待池还是执行业务操作。
总结
本文详细介绍了互斥锁的运用以及线程通信。
我是码农飞哥,再次感谢您读完本文。
- 点赞
- 收藏
- 关注作者
评论(0)