threading库:Python线程锁与释放锁

举报
择城终老 发表于 2021/07/27 01:25:27 2021/07/27
【摘要】 目录 控制资源访问判断是否有另一个线程请求锁with lock同步线程Condition屏障(barrier) 有限资源的并发访问隐藏资源 控制资源访问 前文提到threading库在多线程时,对同一资源的访问容易导致破坏与丢失数据。为了保证安全的访问一个资源对象,我们需要创建锁。 示例如下: import threading import ti...

控制资源访问

前文提到threading库在多线程时,对同一资源的访问容易导致破坏与丢失数据。为了保证安全的访问一个资源对象,我们需要创建锁。

示例如下:

import threading
import time

class AddThread(): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): print("Wait Lock") self.lock.acquire() try: print("Acquire Lock") self.value += 1 print(self.value) finally: self.lock.release()

def worker(a): time.sleep(1) a.increment()

addThread = AddThread()
for i in range(3): t = threading.Thread(target=worker, args=(addThread,)) t.start()


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

运行之后,效果如下:
解锁

acquire()会通过锁进行阻塞其他线程执行中间段,release()释放锁,可以看到,基本都是获得锁之后才执行。避免了多个线程同时改变其资源对象,不会造成混乱。

判断是否有另一个线程请求锁

要确定是否有另一个线程请求锁而不影响当前的线程,可以设置acquire()的参数blocking=False。

示例如下:

import threading
import time

def worker2(lock): print("worker2 Wait Lock") while True: lock.acquire() try: print("Holding") time.sleep(0.5) finally: print("not Holding") lock.release() time.sleep(0.5)

def worker1(lock): print("worker1 Wait Lock") num_acquire = 0 value = 0 while num_acquire < 3: time.sleep(0.5) have_it = lock.acquire(blocking=False) try: value += 1 print(value) print("Acquire Lock") if have_it: num_acquire += 1 finally: print("release Lock") if have_it: lock.release()

lock = threading.Lock()
word2Thread = threading.Thread( target=worker2, name='work2', args=(lock,)
)
word2Thread.start()
word1Thread = threading.Thread( target=worker1, name='work1', args=(lock,)
)
word1Thread.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

运行之后,效果如下:
8次

这里,我们需要迭代很多次,work1才能获取3次锁。但是尝试了很8次。

with lock

前文,我们通过lock.acquire()与lock.release()实现了锁的获取与释放,但其实我们Python还给我们提供了一个更简单的语法,通过with lock来获取与释放锁。

示例如下:

import threading
import time

class AddThread(): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): print("Wait Lock") with self.lock: print("lock acquire") self.value += 1 print(self.value) print("lock release")

def worker(a): time.sleep(1) a.increment()

addThread = AddThread()
for i in range(3): t = threading.Thread(target=worker, args=(addThread,)) t.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这里,我们只是将最上面的例子改变了一下。效果如下:
效果

需要注意的是,正常的Lock对象不能请求多次,即使是由同一个线程请求也不例外。如果同一个调用链中的多个函数访问一个锁,则会发生意外。如果期望在同一个线程的不同代码需要重新获得锁,那么这种情况下使用RLock。

同步线程

Condition

在实际的操作中,我们还可以使用Condition对象来同步线程。由于Condition使用了一个Lock,所以它可以绑定到一个共享资源,允许多个线程等待资源的更新。

示例如下:

import threading
import time

def consumer(cond): print("waitCon") with cond: cond.wait() print('获取更新的资源')

def producer(cond): print("worker") with cond: print('更新资源') cond.notifyAll()

cond = threading.Condition()
t1 = threading.Thread(name='t1', target=consumer, args=(cond,))
t2 = threading.Thread(name='t2', target=consumer, args=(cond,))
t3 = threading.Thread(name='t3', target=producer, args=(cond,))
t1.start()
time.sleep(0.2)
t2.start()
time.sleep(0.2)
t3.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

运行之后,效果如下:
资源

这里,我们通过producer线程处理完成之后调用notifyAll(),consumer等线程等到了它的更新,可以类比为观察者模式。这里是,当一个线程用完资源之后时,则会自动通知依赖它的所有线程。

屏障(barrier)

屏障是另一种线程的同步机制。barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与方都到达这一点。采用这种方法,线程可以单独启动然后暂停,直到所有线程都准备好了才可以继续。

示例如下:

import threading
import time

def worker(barrier): print(threading.current_thread().getName(), "worker") worker_id = barrier.wait() print(threading.current_thread().getName(), worker_id)

threads = []
barrier = threading.Barrier(3)
for i in range(3): threads.append( threading.Thread( name="t" + str(i), target=worker, args=(barrier,) ) )
for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1)

for t in threads: t.join()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

运行之后,效果如下:
屏障

从控制台的输出会发发现,barrier.wait()会阻塞线程,直到所有线程被创建后,才同时释放越过这个控制点继续执行。wait()的返回值指示了释放的参与线程数,可以用来限制一些线程做清理资源等动作。

当然屏障Barrier还有一个abort()方法,该方法可以使所有等待线程接收一个BroKenBarrierError。如果线程在wait()上被阻塞而停止处理,会产生这个异常,通过except可以完成清理工作。

有限资源的并发访问

除了多线程可能访问同一个资源之外,有时候为了性能,我们也会限制多线程访问同一个资源的数量。例如,线程池支持同时连接,但数据可能是固定的,或者一个网络APP提供的并发下载数支持固定数目。这些连接就可以使用Semaphore来管理。

示例如下:

import threading
import time

class WorkerThread(threading.Thread): def __init__(self): super(WorkerThread, self).__init__() self.lock = threading.Lock() self.value = 0 def increment(self): with self.lock: self.value += 1 print(self.value)

def worker(s, pool): with s: print(threading.current_thread().getName()) pool.increment() time.sleep(1) pool.increment()

pool = WorkerThread()
s = threading.Semaphore(2)
for i in range(5): t = threading.Thread( name="t" + str(i), target=worker, args=(s, pool,) ) t.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

运行之后,效果如下:
控制台

从图片虽然能看所有输出,但无法看到其停顿的事件。读者自己运行会发现,每次顶多只有两个线程在工作,是因为我们设置了threading.Semaphore(2)。

隐藏资源

在实际的项目中,有些资源需要锁定以便于多个线程使用,而另外一些资源则需要保护,以使它们对并非使这些资源的所有者的线程隐藏。

local()函数会创建一个对象,它能够隐藏值,使其在不同的线程中无法被看到。示例如下:

import threading
import random

def show_data(data): try: result = data.value except AttributeError: print(threading.current_thread().getName(), "No value") else: print(threading.current_thread().getName(), "value=", result)

def worker(data): show_data(data) data.value = random.randint(1, 100) show_data(data)

local_data = threading.local()
show_data(local_data)
local_data.value = 1000
show_data(local_data)

for i in range(2): t = threading.Thread( name="t" + str(i), target=worker, args=(local_data,) ) t.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

运行之后,效果如下:
输出

这里local_data.value对所有线程都不可见,除非在某个线程中设置了这个属性,这个线程才能看到它。

文章来源: liyuanjinglyj.blog.csdn.net,作者:李元静,版权归原作者所有,如需转载,请联系作者。

原文链接:liyuanjinglyj.blog.csdn.net/article/details/116665247

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。