写给Python社群的第11课:Python线程,进程,协程,3个毫无关系的兄弟

举报
梦想橡皮擦 发表于 2023/02/21 17:39:24 2023/02/21
【摘要】 ⛳️ 线程、进程与协程线程、进程、协程 这三个名称相似的概念,都是为了让程序处理多个任务,从而加快任务处理效率,本篇博客就带大家掌握这三个知识点,让我们先从线程和进程的概念讲起。 🔥 进程与线程简介进程是在计算机内存中运行的一个软件实例,它可以包含线程,在 Windows 电脑桌面任务栏右键选择任务管理器即可查看电脑进程,如下所示。线程是程序执行流程的最小单元,它是进程的一部分,一个进程...

⛳️ 线程、进程与协程

线程、进程、协程 这三个名称相似的概念,都是为了让程序处理多个任务,从而加快任务处理效率,本篇博客就带大家掌握这三个知识点,让我们先从线程和进程的概念讲起。

🔥 进程与线程简介

进程是在计算机内存中运行的一个软件实例,它可以包含线程,在 Windows 电脑桌面任务栏右键选择任务管理器即可查看电脑进程,如下所示。
image.png

线程是程序执行流程的最小单元,它是进程的一部分,一个进程可以包含若干线程

所以在后续的学习中,一定要牢记,进程包含线程 这一大小关系。

下面我们先从多线程开始学习。

⛳️ Python 多线程模块

在 Python3.0 之后的版本中多线程模块常用的有 2 个,其一是 threading,其二是 queue模块,依次为大家介绍。

🔥 threading 模块

学习任何一个模块,都要掌握其提供的函数与类,当然有的模块还会提供常量,下面是 threading 模块提供的函数。

  • active_count():获取当前活动线程对象的数量;
  • current_thread():获取当前线程对象;
  • main_thread():获取主线程对象,主线程一般是 Python 解释器启动的线程;
  • get_ident():返回当前线程的 ID;
  • enumerate():获取当前所有活动的线程对象;

函数我们先稍微搁置,稍后查看使用代码,下面从 threading 模块提供的类开始学习。

threading 模块提供的类有 ThreadLockRLockConditionSemaphore

Thread 类
Thread 类是 threading 模块中最常用也最先学习的类,它通过调用用户自定义函数,生成一个独立的活动线程,这里用户自定义函数有两种形式,分别如下。

  • 在使用 Thread 创建实例对象时,将自定义函数以参数形式传递给构造函数;
  • 通过继承 Thread 类,然后重新 run() 方法,调用用户自定义函数,需要注意在 Thread 类的派生类中,只允许重写 __init__() 构造方法和 run() 方法。

Thread 类的构造函数如下所示:

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

参数表描述如下。

  • group:无用占位参数,未来获取扩展功能使用;
  • target:创建的线程要调度的目标方法/函数;
  • name:线程名称,不指定默认生成规则是 Thread-N
  • args:如果自定义函数有参数时,通过该参数以元组形式传递
  • kwargs:同上,只是参数以 字典形式 传递;
  • daemon:创建的线程是否为守护线程,守护线程可以简单理解为当主线程退出时,守护线程会同步终止。

在初学阶段,重点掌握 targetargskwargs 三个参数即可。

Thread 类的主要方法如下

  • start():线程启动状态,该方法要求在 run() 方法前被调用;
  • run():运行线程;
  • join():阻塞线程,等待调用该方法的线程对象运行完毕;
  • name:线程名称;
  • ident:线程 ID;
  • is_alive():判定线程是否活动;
  • daemon:查看线程是否为守护线程。

上述各方法执行的简单顺序为,线程对象创建之后,首先调用 start() 方法,进行启动,然后调用 run() 运行线程绑定的函数,join() 方法可以阻塞当前运行线程。

Lock 类和 RLock 类
Lock 类是原始锁,RLock 类是课重复锁,原始锁可以提供锁定解锁两种状态,一旦线程获得锁之后,相应请求就会被阻塞,直到它被释放,建立一个锁的方法是 acquire(),释放锁的方法是 release()。RLock 类对象可以被一个线程多次获取,防止出现 Lock 锁的死锁问题,同样提供两个方法,分别是 acquire()release()

Condition 类和 Semaphore 类
Condition 类是条件变量对象,它提供对复杂线程同步问题的解决方案,除了与 Lock 类对象一样的 acquire()release() 方法外,还提供了 wait()notify() 方法。

Semaphore 信号量用于管理内部数据其,每次当线程使用 acquire() 方法后,计数器+1,使用 release() 方法后计数器-1。

🔥 threading 模块实践

本小节使用一个案例实现多线程应用,场景模拟书店售书系统,假设书店库存如下所示。

书籍名称 库存数量
《滚雪球学 Python》 20
《爬虫 100 例》 10
《Python 爬虫 120》 100
《写给 Python 社群的课》 5
《数据结构》 15
《运筹学》 0

来了 3 名学生购买图书,详情如下所示。

  • A 同学,购买《滚雪球学 Python》 3 本;
  • B 同学,购买 《写给 Python 社群的课》 2 本;
  • C 同学,购买 《数据结构》5 本。

他们同时将书给了 3 名售卖员(这里假设有多个结算通道,不排队)。

下面实现相应的代码,注意代码不用多线程技术实现。

import time
from datetime import datetime

books = [
    ['《滚雪球学Python》', 20],
    ['《爬虫100例》', 10],
    ['《Python爬虫120》', 100],
    ['《爬虫100例》', 10],
    ['《写给Python社群的课》', 5],
    ['《数据结构》', 15],
    ['《运筹学》', 0]
]


def buy_book(name, num):
    # 每次购书,模拟消耗时间 1 秒
    time.sleep(1)

    for index, item in enumerate(books):

        if item[0] == name:
            if item[1] >= num:
                books[index][1] = item[1] - num
                return num
            else:
                print(f"{name} 库存不足,无法购买")
                return -1


if __name__ == '__main__':
    print("开始购买", datetime.now())
    # A 同学开始购书
    ret = buy_book('《滚雪球学Python》', 3)
    if ret > 0:
        print("A 购买成功")

    # B 同学开始购书
    ret = buy_book('《写给Python社群的课》', 2)
    if ret > 0:
        print("B 购买成功")

    # C 同学开始购书
    ret = buy_book('《数据结构》', 5)
    if ret > 0:
        print("C 购买成功")

    print("三人结束购买", datetime.now())

    # 图书剩余数量
    for b in books:
        print(b)

运行代码,可以看到输出如下,附带模拟消耗时间输出,累计消耗 4 秒左右。

开始购买 2022-12-19 11:54:33.993924
A 购买成功
B 购买成功
C 购买成功
三人结束购买 2022-12-19 11:54:37.034426
['《滚雪球学Python》', 17]
['《爬虫100例》', 10]
['《Python爬虫120》', 100]
['《爬虫100例》', 10]
['《写给Python社群的课》', 3]
['《数据结构》', 10]
['《运筹学》', 0]

上述代码三名同学依次购买,完全模拟单人排队场景,显然这与我们的目标不一致,毕竟刚刚前文安排了 3 名售卖员,下面对代码进行修改,使用 threading 函数实现多线程案例。

多线程代码实现如下所示

import time
from datetime import datetime
import threading

books = [
    ['《滚雪球学Python》', 20],
    ['《爬虫100例》', 10],
    ['《Python爬虫120》', 100],
    ['《爬虫100例》', 10],
    ['《写给Python社群的课》', 5],
    ['《数据结构》', 15],
    ['《运筹学》', 0]
]


def buy_book(name, num):
    # 每次购书,模拟消耗时间 1 秒
    time.sleep(1)

    for index, item in enumerate(books):

        if item[0] == name:
            if item[1] >= num:
                books[index][1] = item[1] - num
                return num
            else:
                print(f"{name} 库存不足,无法购买")
                return -1


if __name__ == '__main__':
    print("开始购买", datetime.now())

    t1 = threading.Thread(target=buy_book, args=('《滚雪球学Python》', 3))  # A 同学开始购书
    t2 = threading.Thread(target=buy_book, args=('《写给Python社群的课》', 2))  # B 同学开始购书
    t3 = threading.Thread(target=buy_book, args=('《数据结构》', 5))  # C 同学开始购书
    t1.start()  # 线程启动
    t2.start()
    t3.start()

    t1.join()  # 等待线程结束
    t2.join()
    t3.join()

    print("三人结束购买", datetime.now())

    # 图书剩余数量
    for b in books:
        print(b)

运行上述代码,能看到购书时间大幅度缩短,用了 1 秒左右的时间。

开始购买 2022-12-19 12:02:57.963400
三人结束购买 2022-12-19 12:02:58.973167
['《滚雪球学Python》', 17]
['《爬虫100例》', 10]
['《Python爬虫120》', 100]
['《爬虫100例》', 10]
['《写给Python社群的课》', 3]
['《数据结构》', 10]
['《运筹学》', 0]

在上述代码中,我们通过 3 个 start() 方法启动了 3 个进程运行,然后使用 3 个 join() 等待进程运行完毕,并且可以明显看到运行时间变快,这里要注意不同的电脑得到的结果可能不一致,这是计算机本身问题,所以多线程应用开发完毕,需要尽可能多的进行测试。

除了上述写法外,还可以使用 threading 类实现。

import time
from datetime import datetime
import threading

books = [
    ['《滚雪球学Python》', 20],
    ['《爬虫100例》', 10],
    ['《Python爬虫120》', 100],
    ['《爬虫100例》', 10],
    ['《写给Python社群的课》', 5],
    ['《数据结构》', 15],
    ['《运筹学》', 0]
]


def buy_book(name, num):
    # 每次购书,模拟消耗时间 1 秒
    time.sleep(1)

    for index, item in enumerate(books):

        if item[0] == name:
            if item[1] >= num:
                books[index][1] = item[1] - num
                return num
            else:
                print(f"{name} 库存不足,无法购买")
                return -1


# 定义一个类,并让其继承自 Thread
class BookThread(threading.Thread):
    def __init__(self, target, args):
        threading.Thread.__init__(self)
        self.target = target
        self.args = args

    # 重写 run 方法
    def run(self):
        self.target(*self.args)


if __name__ == '__main__':
    print("开始购买", datetime.now())

    # 三人购买数据
    purchasers = [
        ('《滚雪球学Python》', 3),
        ('《写给Python社群的课》', 2),
        ('《数据结构》', 5)
    ]

    # 待指定的任务清单
    do_list = []

    # 将进程类添加到列表中
    for one in purchasers:
        item = BookThread(target=buy_book, args=one)
        do_list.append(item)

    for i in range(len(do_list)):
        do_list[i].start()  # 启动线程

    for i in range(len(do_list)):
        do_list[i].join()  # 等待线程运行完毕

    print("三人结束购买", datetime.now())

    # 图书剩余数量
    for b in books:
        print(b)

上述代码的实现结果与前文一致,仅写法上与前文有所差异,即使用面向对象写法实现,学习的时候可以反复临摹几遍。

⛳️ Python 并发进程模块

在 Python 中 multiprocessing 是一个支持多进程的软件包(模块),它可以在 UNIX 和 Windows 上运行,多进程更适合处理 CPU 密集型任务,如果是 I/O 密集型任务,依旧建议使用多线程处理。

🔥 Process 创建多进程

mulitprocessing 模块最主要的对象是 Process 类,可以使用它来创建进程。其构造函数如下所示:

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

其中:

  • group 参数:可以用于指定进程组,但是通常不使用。
  • target 参数:是一个可调用对象,它指定新进程要执行的函数。
  • name参数:是一个字符串,可以用于为新进程命名。
  • args:是一个元组,包含传递给函数的参数。
  • kwargs:是一个字典,包含传递给函数的关键字参数。

下面是一个简单的例子,展示如何使用 Process 类创建新的进程:

import multiprocessing

def process_function(arg1, arg2):
    print(arg1, arg2)


if __name__ == '__main__':

    p = multiprocessing.Process(target=process_function, args=(10, 20))
    p.start()
    p.join()

在这个例子中,我们创建了一个新的进程,并将 process_function 函数作为目标传递给了进程。这个函数接受两个参数,分别是 1020。我们调用 start() 方法来启动进程,然后调用 join() 方法来等待进程完成。

请注意,在 Python 3.8 及更高版本中,multiprocessing 模块的默认导入名称已经更改为 multiprocessing,而不是 iprocessing

Process 类所支持的方法和属性

  • start() 方法:用于启动新进程。
  • join([timeout]) 方法:用于等待新进程完成,并返回其退出状态。如果提供了 timeout 参数,则等待指定的时间(以秒为单位),然后返回。
  • is_alive() 方法:用于检查新进程是否在运行。
  • pid 属性:是一个整数,表示新进程的进程 ID。
  • name 属性:是一个字符串,表示新进程的名称。
  • daemon 属性:是一个布尔值,表示新进程是否是守护进程。

下面是一个简单的例子,展示了如何使用 Process 类的几个方法和属性:

import multiprocessing
import time


def process_function(arg1, arg2):
    print(arg1, arg2)
    time.sleep(5)


if __name__ == '__main__':
    p = multiprocessing.Process(target=process_function, args=(10, 20))
    print("Process ID:", p.pid)
    print("Process Name:", p.name)
    print("Process Daemon:", p.daemon)

    p.start()
    print("Process is alive:", p.is_alive())

    p.join(1)
    print("Process is alive:", p.is_alive())

    p.join()
    print("Process is alive:", p.is_alive())

在这个例子中,我们创建了一个新的进程,并输出了它的进程 ID、名称和是否是守护进程。然后,我们启动进程并使用 is_alive() 方法检查它是否正在运行。接下来,我们调用 join() 方法两次,第一次等待 1 秒,第二次等待直到进程完成。最后,我们再次使用 is_alive() 方法检查进程是否正在运行。

运行这个例子的输出如下所示:

Process ID: None
Process Name: Process-1
Process Daemon: False
Process is alive: True
10 20

📢📢📢📢📢📢
💗 你正在阅读 【梦想橡皮擦】 的博客
👍 阅读完毕,可以点点小手赞一下
🌻 发现错误,直接评论区中指正吧
📆 橡皮擦的第 792 篇原创博客

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。