Python进阶必备:线程模块threading

举报
天元浪子 发表于 2021/07/31 11:21:54 2021/07/31
【摘要】 对于新手来说,首先要理解线程的概念,以及为什么需要线程编程。什么是线程呢?网上一般是这样定义的:线程(thread)是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。Python 提供了多个模块来支持多线程编程,包括 thread、 threading 和 Queue 模块等。

1. 戏说线程和进程

       对于新手来说,首先要理解线程的概念,以及为什么需要线程编程。什么是线程呢?网上一般是这样定义的:线程(thread)是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。哈哈,你听懂了吗?我觉得这样的定义纯粹是自说自话:新手看完了一脸懵逼,老鸟看完了不以为然。咱们还是用白话解释一下吧:

  • 假定你经营着一家物业管理公司。最初,业务量很小,事事都需要你亲力亲为,给老张家修完暖气管道,立马再去老李家换电灯泡——这叫单线程,所有的工作都得顺序执行。
  • 后来业务拓展了,你雇佣了几个工人,这样,你的物业公司就可以同时为多户人家提供服务了——这叫多线程,你是主线程。
  • 工人们使用的工具,是物业管理公司提供的,大家共享——这叫多线程资源共享。
  • 工人们在工作中都需要管钳,可是管钳只有一把——这叫冲突。解决冲突的办法有很多,比如排队、等同事用完后的微信通知等——这叫线程同步。
  • 业务不忙的时候,你就在办公室喝喝茶。下班时间一到,你群发微信,所有的工人不管手头的工作是否完成,都立马撂下工具,跟你走人。因此如果有必要,你得避免不要在工人正忙着的时候发下班的通知——这叫线程守护属性设置和管理。
  • 再后来,你的公司规模扩大了,同时为很多生活社区服务,你在每个生活社区设置了分公司,分公司由分公司经理管理,运营机制和你的总公司几乎一模一样——这叫多进程,总公司叫主进程,分公司叫子进程。
  • 总公司以及各个分公司之间,工具都是独立的,不能借用、混用——这叫进程间不能共享资源。各个分公司之间可以通过专线电话联系——这叫管道。各个分公司之间还可以通过公司公告栏交换信息——这叫共享内存。
  • 分公司可以跟着总公司一起下班,也可以把当天的工作全部做完之后再下班——这叫守护进程设置。

       Python 提供了多个模块来支持多线程编程,包括 thread、 threading 和 Queue 模块等。程序是可以使用 thread 和 threading 模块来创建与管理线程。 thread 模块提供了基本的线程和锁定支持;而 threading 模块提供了更高级别、功能更全面的线程管理。我们在这里只讨论 threading 模块。

2. 创建并使用线程

       使用 threading 模块的 Thread 类,可以快速创建并启动线程。当然,创建线程之前,你得先把交给线程去做的工作,写成一个函数,我们管这个函数叫线程函数。

       threading.Thread 类有以下方法和属性:

对象 描述
name 线程名(属性)
ident 线程标识符(属性)
daemon 线程是否是守护线程(属性)
start() 开启线程
join() 等待至线程中止,或超过参数指定的时间
setDaemon() 设置线程是否是守护线程
getName() 返回线程名
isAlive() 判断线程是否还在运行
isDaemon() 判断线程是否是守护线程
run() 定义线程功能的方法(通常在子类中被应用开发者重写)

       我们设计一个任务:你(主线程)启动3个子线程,名字分别是A、B、C。其中A线程启动后,你要先观察5秒钟,再启动其他线程。每个子线程的任务是每隔指定时间间隔就向你问好,并报上自己的名字,你呢,只管睡觉。20秒后,你醒了。你逐一检查了各个子线程的工作状态之后,结束运行。下面是实现代码:

import time
import threading

def hello(name, t):
    """线程函数"""
    
    for i in range(10):
        print('Hello, 我是小%s'%name)
        time.sleep(t)

def demo():
    A = threading.Thread(target=hello, args=('A',1), name='A')
    B = threading.Thread(target=hello, args=('B',2), name='B')
    C = threading.Thread(target=hello, args=('C',3), name='C')
    
    #C.setDaemon(True) # 设置子线程在主线程结束时是否无条件跟随主线程一起退出
    
    A.start()
    A.join(5) # 等待A线程结束,若5秒钟后未结束,则代码继续
    B.start()
    C.start()
    
    time.sleep(20)
    
    print('进程A%s'%('还在工作中' if A.isAlive() else '已经结束工作',))
    print('进程B%s'%('还在工作中' if B.isAlive() else '已经结束工作',))
    print('进程C%s'%('还在工作中' if C.isAlive() else '已经结束工作',))
    
    print('下班了。。。')

if __name__ == '__main__':
    demo()

但是,运行这段代码,你会发,当你喊下班的时候,小C并没有立刻撂下手头的活儿跟你走人,而是做完了工作之后才跟你走人——或者说,是你在等他做完工作后一起走人。这里容易产生误会,以为主线程结束后,子线程还会工作到任务完成。这是错误的理解。真相是,主线程不忍心打断正在忙碌的子线程(active),一旦该子线程休眠(inactive),不管任务是否结束,都会被主线程直接带走。

那么如何令子线程在主线程结束时无条件跟随主线程一起走人呢?很简单,在线程 start() 之前,使用 setDaemon(True) 设置该线程为守护线程就可以了。子线程的 daemon 属性默认为 False。

3. 线程同步

3.1 线程锁 Lock

       前几天,我想在一个几百人的微信群里统计喜欢吃苹果的人数。有人说,大家从1开始报数吧,并敲了起始数字1,立马有人敲了数字2,3。但是统计很快就进行不下去了,因为大家发现,有好几个人敲4,有更多的人敲5。

       这就是典型的资源竞争冲突:统计用的计数器就是唯一的资源,很多人(子线程)都想取得写计数器的资格。怎么办呢?Lock(互斥锁)就是一个很好的解决方案。Lock只能有一个线程获取,获取该锁的线程才能执行,否则阻塞;执行完任务后,必须释放锁。

       请看演示代码:

# -*- encoding: utf8 -*-

import time
import threading

lock = threading.Lock() # 创建互斥锁
counter = 0 # 计数器

def hello():
    """线程函数"""
    
    global counter
    
    if lock.acquire(): # 请求互斥锁,如果被占用,则阻塞,直至获取到锁
        time.sleep(0.2) # 假装思考、敲键盘需要0.2秒钟
        counter += 1
        print('我是第%d个'%counter)
    
    lock.release() # 千万不要忘记释放互斥锁,否则后果很严重

def demo():
    threads = list()
    for i in range(30): # 假设群里有30人,都喜欢吃苹果
        threads.append(threading.Thread(target=hello))
        threads[-1].start()
    
    for t in threads:
        t.join()
    
    print('统计完毕,共有%d人'%counter)

if __name__ == '__main__':
    demo()

       除了互斥锁,线程锁还有另一种形式,叫做递归锁(RLock),又称可重入锁。已经获得递归锁的线程可以继续多次获得该锁,而不会被阻塞,释放的次数必须和获取的次数相同才会真正释放该锁。欲了解详情,同学们可以自行检索资料。

3.2 信号量 Semaphore

       上面的例子中,统计用的计数器是唯一的资源,因此使用了只能被一个线程获取的互斥锁。假如共享的资源有多个,多线程竞争时一般使用信号量(Semaphore)同步。信号量有一个初始值,表示当前可用的资源数,多线程执行过程中会通过 acquire() 和 release() 操作,动态的加减信号量。比如,有30个工人都需要电锤,但是电锤总共只有5把。使用信号量(Semaphore)解决竞争的代码如下:

# -*- encoding: utf8 -*-

import time
import threading

S = threading.Semaphore(5) # 有5把电锤可供使用

def us_hammer(id):
    """线程函数"""
    
    S.acquire() # P操作,阻塞式请求电锤,
    time.sleep(0.2)
    print('%d号刚刚用完电锤'%id)
    S.release() # V操作,释放资源(信号量加1)

def demo():
    threads = list()
    for i in range(30): # 有30名工人要求使用电锤
        threads.append(threading.Thread(target=us_hammer, args=(i,)))
        threads[-1].start()
    
    for t in threads:
        t.join()
    
    print('所有线程工作结束')

if __name__ == '__main__':
    demo()

3.3 事件Event

       想象我们每天早上上班的场景:为了不迟到,总得提前几分钟(我一般都会提前30分钟)到办公室,打卡之后,一看表,还不到工作时间,大家就看看新闻、聊聊天啥的;工作时间一到,立马开工。如果有人迟到了呢,自然就不能看新闻聊天了,得立即投入工作中。

       这个场景中,每个人代表一个线程,工作时间到,表示事件(Event)发生。事件发生前,线程会调用 wait() 方法阻塞自己(对应看新闻聊天),一旦事件发生,会唤醒所有调用 wait() 而进入阻塞状态的线程。

# -*- encoding: utf8 -*-

import time
import threading

E = threading.Event() # 创建事件

def work(id):
    """线程函数"""
    
    print('<%d号员工>上班打卡'%id)
    if E.is_set(): # 已经到点了
        print('<%d号员工>迟到了'%id)
    else: # 还不到点
        print('<%d号员工>浏览新闻中...'%id)
        E.wait() # 等上班铃声
    
    print('<%d号员工>开始工作了...'%id)
    time.sleep(10) # 工作10秒后下班
    print('<%d号员工>下班了'%id)

def demo():
    E.clear() # 设置为“未到上班时间”
    threads = list()
    
    for i in range(3): # 3人提前来到公司打卡
        threads.append(threading.Thread(target=work, args=(i,)))
        threads[-1].start()
    
    time.sleep(5) # 5秒钟后上班时间到
    E.set()
    
    time.sleep(5) # 5秒钟后,大佬(9号)到
    threads.append(threading.Thread(target=work, args=(9,)))
    threads[-1].start()
    
    for t in threads:
        t.join()
    
    print('都下班了,关灯关门走人')

if __name__ == '__main__':
    demo()

3.4 条件 Condition

       两位小朋友,Hider 和 Seeker,打算玩一个捉迷藏的游戏,规则是这样的:Seeker 先找个眼罩把眼蒙住,喊一声“我已经蒙上眼了”;听到消息后,Hider 就找地方藏起来,藏好以后,也要喊一声“我藏好了,你来找我吧”;Seeker 听到后,也要回应一声“我来了”,捉迷藏正式开始。各自随机等了一段时间后,两位小朋友都憋住了跑了出来。谁先跑出来,就算谁输。

# -*- encoding: utf8 -*-

import time
import threading
import random

cond = threading.Condition() # 创建条件对象
draw_Seeker = False # Seeker小朋友认输
draw_Hidwer = False # Hider小朋友认输

def seeker():
    """Seeker小朋友的线程函数"""
    
    global draw_Seeker, draw_Hidwer
    
    time.sleep(1) # 确保Hider小朋友已经进入消息等待状态
    cond.acquire() # 阻塞时请求资源
    time.sleep(random.random()) # 假装蒙眼需要花费时间
    print('Seeker: 我已经蒙上眼了')
    cond.notify() # 把消息通知到Hider小朋友
    cond.wait() # 释放资源并等待Hider小朋友已经藏好的消息
    
    print('Seeker: 我来了') # 收到Hider小朋友已经藏好的消息后
    cond.notify() # 把消息通知到Hider小朋友
    cond.release() # 不要再听消息了,彻底释放资源
    time.sleep(random.randint(3,10)) # Seeker小朋友的耐心只有3-10秒钟
    
    if draw_Hidwer:
        print('Seeker: 哈哈,我找到你了,我赢了')
    else:
        draw_Seeker = True
        print('Seeker: 算了,我找不到你,我认输啦')

def hider():
    """Hider小朋友的线程函数"""
    
    global draw_Seeker, draw_Hidwer
    
    cond.acquire() # 阻塞时请求资源
    cond.wait() # 如果先于Seeker小朋友请求到资源,则立刻释放并等待
    time.sleep(random.random()) # 假装找地方躲藏需要花费时间
    print('Hider: 我藏好了,你来找我吧')
    cond.notify() # 把消息通知到Seeker小朋友
    cond.wait() # 释放资源并等待Seeker小朋友开始找人的消息
    
    cond.release() # 不要再听消息了,彻底释放资源
    time.sleep(random.randint(3,10)) # Hider小朋友的耐心只有3-10秒钟
    
    if draw_Seeker:
        print('Hider: 哈哈,你没找到我,我赢了')
    else:
        draw_Hidwer = True
        print('Hider: 算了,这里太闷了,我认输,自己出来吧')

def demo():
    th_seeker = threading.Thread(target=seeker)
    th_hider = threading.Thread(target=hider)
    th_seeker.start()
    th_hider.start()
    
    th_seeker.join()
    th_hider.join()

if __name__ == '__main__':
    demo()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。