python并发編程之多线程

举报
子都爱学习 发表于 2021/12/25 23:36:34 2021/12/25
【摘要】 并发与并行cup是顺序执行的,操作系统轮流让各个任务交替执行,每个任务在时间片内执行,时间片结束切换到下一个任务,这是并发。真正的并行执行多任务只能在 多核CPU 上实现,但是,由于任务数量远远多于 CPU 的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。并发性:是指两个或多个事件在同一时间间隔内发生。并行性:是指两个或多个事件在同一时刻发生。举个例子:我们有一个车间(...

并发与并行

cup是顺序执行的,操作系统轮流让各个任务交替执行,每个任务在时间片内执行,时间片结束切换到下一个任务,这是并发。
真正的并行执行多任务只能在 多核CPU 上实现,但是,由于任务数量远远多于 CPU 的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

并发性:是指两个或多个事件在同一时间间隔内发生。
并行性:是指两个或多个事件在同一时刻发生。


举个例子:我们有一个车间(cpu),可以生产各种东西(任务),比如汽车,飞机,自行车。制造汽车时:车间每生产一个零件(计算操作)后,需要等它冷却,定型,组装好后才能生产下一个零件,在这个冷却,定型,组装的过程(IO操作)中车间是不工作的(程序霸占着cpu时间,效率低)。为了让我们车间效率高,我们让它在冷却,定型,组装的时间中生产其他的零件,等组装好后再继续生产汽车零件。某个确定的时间时做一个事情,但在一个时间段(时间片)内做多个事情,这就是并发。多个车间同时工作就是并行。


【拓展:时间片】
每个进程被分配一个时间段,称作它的时间片,即该进程允许运行的时间。如果在时间片结束时进程还在运行,则CPU将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则CPU当即进行切换。调度程序所要做的就是维护一张就绪进程列表,当进程用完它的时间片后,它被移到队列的末尾。

从一个进程切换到另一个进程是需要一定时间的--保存和装入寄存器值及内存映像,更新各种表格和队列等。
时间片设得太短会导致过多的进程切换,降低了CPU效率;而设得太长又可能引起对短的交互请求的响应变差。将时间片设为100毫秒通常是一个比较合理的折中。


进程和线程

进程

进程(Process):是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基 本单位,是操作系统结构的基础。
程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到 内存中,就是进程,进程中存放着指令和数据(资源)。一个程序的执行实例就是一个进程。它也是线 程的容器。

线程


线程 thread :线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。一条线程指的是进程中一个单一顺序的控制流,一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行,每条线程并行执行不同的任务。

每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。

  • 线程可以被抢占(中断)。
  • 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。

线程可以分为:

  • 内核线程:由操作系统内核创建和撤销。
  • 用户线程:不需要内核支持而在用户程序中实现的线程。
线程的状态

就绪(Ready)

线程能够运行但在等待被调度可能线程刚刚创建启动或刚刚从阻塞中恢 或者被其他线程抢占

(Running)

线程正在运行

阻塞(Blocked)

线程等待外部事件发生而无法运行I/O操作

(Terminated)

线程完成或退或被取消


进程和程序的关系

Linux进程有父进程、子进程,Windows的进程是平等关系。
在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是
进程中的实际运作单位。
一个标准的线程由线程ID,当前指令指针(PC)、寄存器集合和堆、栈组成。
在许多系统中,创建一个线程比创建一个进程快10-100倍。

进程、线程的理解
现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。


Python中的进程和线程

运行程序会启动一个解释器进程线程共享一个解释器进程
进程靠线程执行代码,至少有一个主线程其它线程是工作线程
主线程是第一个启动的线程
父线程如果线程A中启动了一个线程B  A就是B的父线程。
子线程  B就是A的子线程。



多线程

为什么要使用多线程?
    线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄
    和其他进程应有的状态。
    因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程之中拥有独立的内存单元,而多个线程共享
    内存,从而极大的提升了程序的运行效率。
    线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性,多个线程共享一个进程的虚拟空间。线程的共享环境
    包括进程代码段、进程的共有数据等,利用这些共享的数据,线程之间很容易实现通信。
    操作系统在创建进程时,必须为改进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程
    来实现并发比使用多进程的性能高得要多。

总结起来,使用多线程编程具有如下几个优点:
    进程之间不能共享内存,但线程之间共享内存非常容易。
    操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高


python的多线程

python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。
在 CPython 中,由于存在 全局解释器锁,同一时刻只有一个线程可以执行 Python 代码,如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。

python实现多线程

  • _thread
  • threading(推荐使用)

thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 "_thread"。
这里只讲threading。


threading模块

简单示例:

import threading

# 最简单的线程程序
def worker():
    print("I'm working")
    print('Fineshed')

t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() #  启动

threading.Thread参数解析

  • target    线程调用的对象,就是目标函数
  • name    为线程起个名字
  • args    为目标函数传递实参,元组
  • kwargs    为目标函数关键字传参,字典

通过threading.Thread创建一个线程对象,  target是目标函数,可以使用name为线程指定名称 但是线程没有启动,需要调用start方法
线程之所以执行函数是因为线程中就是要执行代码的而最简单的代码封装就是函数所以还是函数 调用
数执行完线程也就退出了

线程退出
Python没有提供线程退出的方法线程在下面情况时退出

  • 线程函数内语句执行完毕
  • 线程函数中抛出未处理的异常
# 程序抛异常线程退出
import threading
import time

def worker():
    for i in range(10):
        time.sleep(0.5)
        if i > 5:
        #break # 终止循环
        #return # 函数返回
            raise RuntimeError # 抛异常
    print('I am working')
    print('finished')

t = threading.Thread(target=worker, name='worker')
t.start()

print('=' * 30)


这个模块定义了以下函数:

  • threading.main_thread()  
    返回主 Thread 对象。一般情况下,主线程是Python解释器开始时创建的线程。
  • threading.current_thread():
    返回当前对应调用者的控制线程的 Thread 对象。如果调用者的控制线程不是利用 threading 创建,会返回一个功能受限的虚拟线程对象。
  • threading.enumerate():
    返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.active_count():
    返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
  • threading.get_ident():
    返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。
  • threading.excepthook(args, /)
    处理由 Thread.run() 引发的未捕获异常。
  • ...
import threading
import time

def showtreadinfo():
    # 打印当前线程对象,主线程对象,活跃线程数
    print('current thread = {}\nmain thread = {}\nactive count = {}' .format(
    threading.current_thread(), threading.main_thread(), threading.active_count()))
    # 打印正在运行的线程的list,线程标识符,
    print('threading.enumerate = {}\nthreading.get_ident = {}'.format(
    threading.enumerate(),threading.get_ident()))


def worker():
    showtreadinfo()
    for i in range(2):
        time.sleep(1)
        print('i am working')
        print('finished')

t = threading.Thread(target=worker, name='worker') # 线程对象
showtreadinfo()
time.sleep(1)
t.start() # 启动
print('===end===')

# current thread = <_MainThread(MainThread, started 17200)>
# main thread = <_MainThread(MainThread, started 17200)>
# active count = 1
# threading.enumerate = [<_MainThread(MainThread, started 17200)>]
# threading.get_ident = 17200
# current thread = <Thread(worker, started 10784)>
# main thread = <_MainThread(MainThread, started 17200)>
# active count = 2
# threading.enumerate = [<_MainThread(MainThread, started 17200)>, <Thread(worker, started 10784)>]
# threading.get_ident = 10784
# ===end===
# i am working
# finished
# i am working
# finished


线程对象

除了使用方法外,线程模块同样提供了Thread类来处理线程

threading.Thread()


1.png

创建一个daemon=True的守护线程:

import time
import threading

def foo():
    time.sleep(1)
    for i in range(2):
        print(i)

# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=True)   # 主线程执行完直接退出,不会等待t结束
t.start()

print('Main Thread Exits')

# Main Thread Exits

主线程是non-daemon线程daemon = False

  • 线程具有一个daemon属性,可以手动设置为TrueFalse也可以不设置则取默认值None 如果不设置daemon,就取当前线程的daemon来设置它
  • 主线程是non-daemon线程daemon = False
  • 从主线程创建的所有线程的不设置daemon属性则默认都是daemon = False也就是non- daemon线程
  • Python程序在没有活着的non-daemon线程运行时程序退出也就是除主线程之外剩下的只能 都是daemon线程,主线程才能退出,否则主线程就只能等待

 


Thread类提供了以下属性和方法:

  • name    只是一个名字,只是个标识,名称可以重名。  getName()、  setName()获取、设置这 个名词
  • ident    线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可 以访问。此ID可以重复使用
  • native_id    此线程的线程 ID (TID),由 OS (内核) 分配。 这是一个非负整数,或者如果线程还未启动则为 None。
  • daemon  一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。当没有存活的非守护线程时,整个Python程序才会退出。




  • start():启动线程活动。start方法才能启动操作系统线程并运行run方法  run方法内部调用了目标函数
  • run(): 用以表示线程活动的方法。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • is_alive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。
  • isDaemon()    返回线程是否是守护线程
  • setDaemon() 设置daemon值


创建多个线程:

import threading
import time
import sys

def worker(f=sys.stdout):
    t = threading.current_thread()
    for i in range(2):
        time.sleep(1)
        print('i am working' , t.name, t.ident, file=f)
        print('finished', file=f)

t1 = threading.Thread(target=worker, name='worker1')
t2 = threading.Thread(target=worker, name='worker2', args=(sys.stderr,))
t1.start()
t2.start()

# i am working worker1 18412
# finished
# i am working worker2 1312
# finished
# i am working worker2 1312
# finished
# i am working worker1 18412
# finished

可以看到worker1work2交替执行

当使用start方法启动线程后进程内有多个活动的线程并行的工作就是多线程
一个进程中至少有一个线程并作为程序的入口这个线程就是主线程
一个进程至少有一个主线程其他线程称为工作线程

线程安

多线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的

多线程在运行过程中由于共享同一进程中的数据多线程并发使用同一个数据那么数据就有可能被 相互修改从而导致某些时刻无法确定这个数据的值最终随着多线程运行运行结果不可预期这就 是线程不安全

import threading
import time
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class A:
    def __init__(self):
        self.x = 0

# 全局对象
global_data = A()

def worker():
    global_data .x = 0
    for i in range(100):
        time.sleep(0.0001)
        global_data .x += 1
        logging.info(global_data.x)


for i in range(10):
     threading.Thread(target=worker, name='t-{}'.format(i)).start()

# 每次执行global_data .x的值都是不确定的


threading.local

python提供 threading.local 将这个类实例化得到一个全局对象但是不同的线程使用这个对象存储的数据其他线程看不见

import threading
import time
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 全局对象
global_data = threading.local()

def worker():
    global_data .x = 0
    for i in range(100):
        time.sleep(0.0001)
        global_data .x += 1
        logging.info(global_data.x)

for i in range(10):
    threading.Thread(target=worker, name='t-{}'.format(i)).start()

结果显示和使用局部变量的效果一样

# 当前线程创建的threading.local()的属性被其他线程访问时会报错

import logging
import threading
import time

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 全局对象
X = 'abc'
global_data = threading.local()
global_data .x = 100
print(global_data , global_data .x)
print('~' * 30)
time.sleep(2)

def worker():
    logging.info(X)
    logging.info(global_data)
    logging.info(global_data.x)

worker() # 普通函数调用
print('=' * 30)
time.sleep(2)
threading.Thread(target=worker, name='worker').start() # 启动一个线程

# AttributeError: '_thread._local' object has no attribute 'x'

threading.local类构建了一个大字典存放所有线程相关的字典定义如下

{ id(Thread) -> (ref(Thread), thread-local dict) } 


每一线程实例的idkey元组为value
value2分为线程对象引用每个线程自己的字典


运行时  threading.local实例处在不同的线程中就从大字典中找到当前线程相关键值对中的字 典,覆盖threading.local实例的__dict__ 
这样就可以在不同的线程中安全地使用线程独有的数据做到了线程间数据隔离如同本地变量 一样安全


线程同步

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。


Event 

实现事件对象的类。事件对象管理一个内部标识,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为 false 。调用 wait() 方法将进入阻塞直到标识为true。这个标识初始时为 false 。

set()

将内部标识设置为 true 。所有正在等待这个事件的线程将被唤醒。当标识为 true 时,调用 wait() 方法的线程不会被被阻塞。

clear()

将内部标识设置为 false 。之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标识再次设置为 true 。

is_set()

当且仅当内部标识为 true 时返回 True 。

 wait(timeout=None)



# 老板监视工人生产10个杯子

from threading import Event, Thread
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def boss(event:Event):
    logging.info("I'm boss, waiting for U")
    event.wait() # 阻塞等待
    logging.info('Good Job.')

def worker(event:Event, count=10):
    logging.info('I am working for U')
    cups = []
    while not event.wait(0.1):
        logging.info('make 1 cup')
        time.sleep(0.1)
        cups.append(1)
        if len(cups) >= count:
            event.set()
    logging.info('I finished my job. cups={}' .format(cups))

event = Event()
b = Thread(target=boss, name='boss', args=(event,))
w = Thread(target=worker, name='worker', args=(event,))
b.start()
w.start()

生产完成even事件为True,老板监视进程阻塞结束,打印good job

总结
需要使用同一个Event对象的标记flag
wait就是等到flag变为True或等到超时返回False
不限制等待者的个数知所有等待者


锁对象

原始锁处于 "锁定" 或者 "非锁定" 两种状态之一。它被创建时为非锁定状态。它有两个基本方法, acquire() 和 release() 。当状态为非锁定时, acquire() 将状态改为 锁定 并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。 release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError  异常。

锁同样支持 上下文管理协议。

当多个线程在 acquire() 等待状态转变为未锁定被阻塞,然后 release() 重置状态为未锁定时,只有一个线程能继续执行;至于哪个等待线程继续执行没有定义,并且会根据实现而不同。

所有方法的执行都是原子性的。


threading.Lock

Lock实现原始锁对象的类,是mutex互斥锁。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它;一旦一个线程获得锁其它试图获取锁的线程将被阻塞只到拥有锁的线程释放锁

凡是存在共享资源争抢的地方都可以使用锁从而保证只有一个使用者可以完全使用这个资源

acquire(blocking=True,timeout=-1)

可以阻塞或非阻塞地获得锁。默认阻塞,阻塞可以设置超时时间非阻塞时  timeout禁止 设置

成功获取锁,返回True否则返回False

 release()

释放锁。可以从任何线程调用释放已上锁的锁会被重置为unlocked,未上锁的锁上调用RuntimeError异常如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。

locked()

如果获得了锁则返回真值。

示例:订单要求生产1000个杯子,组织10个工人生产

import threading
from threading import Thread, Lock
import time
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock() # 锁

def worker(count=100):
    logging.info("I'm working")
    while True:
        # 生产杯子期间加锁(改变cups列表)
        lock.acquire() # 获取锁

        if len(cups) >= count:
            lock.release()
            break

        time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间
        cups.append(1)
        lock.release()
    logging.info('I finished my job. cups = {}' .format(len(cups)))

for i in range(1, 11):
    t = Thread(target=worker, name="w{}".format(i), args=(1000,))
    t.start()

上下文支持

锁是典型必须释放的  Python提供了上下文支持查看Lock类的上下文方法  __enter__方法返回bool 表示是否获得锁  __exit__方法中释放锁。
由此上例可以修改

import threading
from threading import Thread, Lock
import time
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock() # 锁

def worker(count=100):
    logging.info("I'm working")
    while True:
        with lock: # 获取锁,离开with释放锁
            if len(cups) >= count:
                logging.info('leaving')
                break
            time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间
            cups.append(1)
            logging.info(lock.locked())

    logging.info('I finished my job. cups = {}' .format(len(cups)))

for i in range(1, 11):
    t = Thread(target=worker, name="w{}".format(i), args=(1000,))
    t.start()


threading.RLock

重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 "所属线程" 和 "递归等级" 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。

若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

递归锁也支持 上下文管理协议。

若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

递归锁也支持 上下文管理协议。

acquire(blocking=True,

timeout=-1)

默认阻塞,阻塞可以设置超时时间非阻塞时  timeout禁止 设置

当无参数调用时: 如果这个线程已经拥有锁,递归级别增加一,并立即返回。否则,如果其他线程拥有该锁,则阻塞至该锁解锁。一旦锁被解锁(不属于任何线程),则抢夺所有权,设置递归等级为一,并返回。如果多个线程被阻塞,等待锁被解锁,一次只有一个线程能抢到锁的所有权。在这种情况下,没有返回值。

 

release()

释放锁,自减递归等级。如果减到零,则将锁重置为非锁定状态(不被任何线程拥有),并且,如果其他线程正被阻塞着等待锁被解锁,则仅允许其中一个线程继续。如果自减后,递归等级仍然不是零,则锁保持锁定,仍由调用线程拥有。

只有当前线程拥有锁才能调用这个方法。如果锁被释放后调用这个方法,会引起 RuntimeError 异常。

没有返回值。

# 递归锁:RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用RLock类
import threading
import time

def func(lock):
    global gl_num
    lock.acquire()
    gl_num += 1
    time.sleep(1)
    print(gl_num)
    lock.release()


if __name__ == '__main__':
    gl_num = 0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=func,args=(lock,))
        t.start()


GIL VS Lock

Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据
然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock

锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源需要锁吗?
不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁 使用锁的注意事项:
少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争 抢执行
  举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以
在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦 出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
加锁时间越短越好,不需要就立即释放锁
一定要避免死锁


Semaphore信号量

信号量通常用于保护数量有限的资源,例如数据库服务器。在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量。互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

工作线程生成后,当需要连接服务器时,这些线程将调用信号量的 acquire 和 release 方法:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

举例:

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()


threading.Condition(lock=None)

实现条件变量对象的类。一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。如果给出了非 None 的 lock 参数,则它必须为 Lock 或者 RLock 对象,并且它将被用作底层锁。否则,将会创建新的 RLock 对象,并将其用作底层锁。

  • acquire(*args)
    请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。
  • release()
    释放底层锁。此方法调用底层锁的相应方法。没有返回值。
  • wait(timeout=None)
    等待直到被通知或发生超时。这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify()  notify_all() 唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。
  • wait_for(predicate, timeout=None)
    等待,直到条件计算为真。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值。可以提供 timeout 参数给出最大等待时间。这个实用方法会重复地调用 wait() 直到满足判断式或者发生超时。返回值是判断式最后一个返回值,而且如果方法发生超时会返回 False 。
  • notify(n=1)
    默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError 异常。
    这个方法唤醒最多 n 个正在等待这个条件变量的线程;如果没有线程在等待,这是一个空操作。
    当前实现中,如果至少有 n 个线程正在等待,准确唤醒 n 个线程。但是依赖这个行为并不安全。未来,优化的实现有时会唤醒超过 n 个线程。
    注意:被唤醒的线程并没有真正恢复到它调用的 wait() ,直到它可以重新获得锁。 因为 notify() 不释放锁,其调用者才应该这样做。
  • notify_all()
    唤醒所有正在等待这个条件的线程。这个方法行为与 notify() 相似,但并不只唤醒单一线程,而是唤醒所有等待线程。如果调用线程在调用这个方法时没有获得锁,会引发 RuntimeError 异常。

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

python多线程总结 

python针对不同类型的代码执行效率也是不同的

1、CPU密集型代码(各种循环处理、计算等),在这种情况下,由于计算工作多,ticks技术很快就会达到阀值,然后出发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
2、IO密集型代码(文件处理、网络爬虫等设计文件读写操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序的执行效率)。所以python的多线程对IO密集型代码比较友好。

主要要看任务的类型,我们把任务分为I/O密集型和计算密集型,而多线程在切换中又分为I/O切换和时间切换。

如果任务属于是I/O密集型,若不采用多线程,我们在进行I/O操作时,势必要等待前面一个I/O任务完成后面的I/O任务才能进行,在这个等待的过程中,CPU处于等待状态,这时如果采用多线程的话,刚好可以切换到进行另一个I/O任务。这样就刚好可以充分利用CPU避免CPU处于闲置状态,提高效率。但是
如果多线程任务都是计算型,CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态,
此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。