【Python从入门到精通】(二十二)Python线程池的正确使用姿势

举报
码农飞哥 发表于 2021/09/08 16:52:31 2021/09/08
【摘要】 线程池

您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦
本文将详细介绍Python线程池的使用。
干货满满,建议收藏,需要用到时常看看。 小伙伴们如有问题及需要,欢迎踊跃留言哦~ ~ ~。

前言

我是学习Java出身的学习完Java线程池之后再来学习Python的线程池。感觉是那么的熟悉和亲切。Python的线程池与Java线程池基本原理和概念是共通的。我想最大的区别大概就是语言的区别吧。感兴趣的朋友可以看下我写的那篇Java线程池的文章第十六篇:面试必备的线程池知识-线程池的使用。言归正传,让我们正式进入Python的线程池学习之旅。

为什么要使用线程池呢?

前面几篇文章介绍的线程都是直接通过代码手动创建的线程。线程执行完任务之后就会被系统销毁,下次再执行任务的时候再进行创建。这种方式在逻辑上没有啥问题。但是系统启动一个新线程的成本是比较高,因为其中涉及与操作系统的交互,操作系统需要给新线程分配资源。打个比方吧!就像软件公司招聘员工干活一样。当有活干时,就招聘一个外包人员干活。当活干完之后就把这个人员辞退掉。你说在这过程中所耗费的时间成本和沟通成本是不是很大。那么公司一般的做法是:当项目立项时就确定需要几名开发人员,然后将这些人员配齐。然后这些人员就常驻在项目组,有活就干,没活就摸鱼。线程池也是同样的道理。线程池可以定义最大线程数,这些线程有任务就执行任务,没任务就进入线程池中歇着。

线程池怎么用呢?

线程池的基类是concurrent.futures模块中的Executor类,而Executor类提供了两个子类,即ThreadPoolExecutor类和ProcessPoolExecutor类。其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。本文将重点介绍ThreadPoolExecutor类的使用。首先,让我们来看看ThreadPoolExecutor类的构造函数。这里使用的Python版本是:3.6.7。

      def __init__(self, max_workers=None, thread_name_prefix=''):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))

他的构造函数只有两个参数:一个是max_workers参数,用于指定线程池的最大线程数,如果不指定的话则默认是CPU核数的5倍。另一个参数是thread_name_prefix,它用来指定线程池中线程的名称前缀。其他参数:

  1. _shutdown初始值值为False,默认情况下线程池不销毁,即线程池的生命周期跟项目的生命周期一致。
  2. self._work_queue = queue.Queue()生成缓冲队列。
  3. _threads没有任务被提交时,线程的数量设置为0。
  4. _shutdown_lock 指定线程池的锁是Lock锁。
    说完了线程池的创建之后,接着来看看线程池中比较常用的几个方法吧。
  5. submit(self, fn, *args, **kwargs)
    该方法用提交任务,即将fn函数提交给线程池,*args代表传给fn函数的参数,**kwargs代表以关键字参数的形式为fn函数传入参数。
  6. shutdown(self, wait=True)
    关闭线程池
  7. map(func, *iterables, timeout=None, chunksize=1)
    该函数类似于全局函数map(func,*iterables),只是该函数将会启动多个线程,以异步方式立即对iterables执行map处理。

程序将task函数通过submit方法提交给线程池之后,线程池会返回一个Future对象,该对象的作用主要是用于获取线程任务函数的返回值。Future提供了如下几个方法。

  1. cancel():取消该Future代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False;否则,程序会取消该任务,并返回True。
  2. result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  3. add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
  4. done():如果该Future代表的线程任务被成功取消或执行完成,则该方法返回True。

来个简单的例子

该例中创建了一个最大线程数是2的线程池来执行async_add函数。

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum


# 创建两个线程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='测试线程')
# 向线程池提交一个task,20作为async_add()函数的参数
future1 = pool.submit(async_add, 20)
# 向线程池再提交一个task
future2 = pool.submit(async_add, 50)
# 判断future1代表的任务是否执行完
time.sleep(2)
print(future1.done())
print(future2.done())
# 查看future1代表的任务返回的结果
print('线程一的执行结果是=' + str(future1.result()))
# 查看future2代表的任务的返回结果
print('线程二的执行结果是=' + str(future2.result()))
print("----" + threading.current_thread().name + "----主线程执行结束-----")

运行结果是:

测试线程_0执行求和操作求得的和是=190
测试线程_1执行求和操作求得的和是=1225
True
True
线程一的执行结果是=190
线程二的执行结果是=1225
----MainThread----主线程执行结束-----

本例中定义了一个最大线程数是2的线程池,并向线程池中提交了两个任务,其中async_add函数就是要执行的任务。在async_add函数中添加 time.sleep(1) 休眠一秒是为了验证done()方法返回的结果。最后才打印主线程执行结束表明result()方法是阻塞的。如果将result()屏蔽掉。
改成如下形式:

# 创建两个线程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='测试线程')
# 向线程池提交一个task,20作为async_add()函数的参数
future1 = pool.submit(async_add, 20)
# 向线程池再提交一个task
future2 = pool.submit(async_add, 50)
# 判断future1代表的任务是否执行完
print(future1.done())
print(future2.done())
print("----" + threading.current_thread().name + "----主线程执行结束-----")

则运行结果是:

False
False
----MainThread----主线程执行结束-----
测试线程_0执行求和操作求得的和是=190
测试线程_1执行求和操作求得的和是=1225

如何非阻塞的获取线程执行的结果

前面介绍的result()方法是通过阻塞的方式来获取线程的运行结果的。那么如果通过非阻塞的方法来获取线程任务最后的返回结果呢?这里就需要使用线程的回调函数来获取线程的返回结果。

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task
    future1 = pool.submit(async_add, 20)
    future2 = pool.submit(async_add, 50)


    # 定义获取结果的函数
    def get_result(future):
        print(threading.current_thread().name + '运行结果:' + str(future.result()))


    # 查看future1代表的任务返回的结果
    future1.add_done_callback(get_result)
    # 查看future2代表的任务的返回结果
    future2.add_done_callback(get_result)
    print('------------主线程执行结束----')

运行结果是:

------------主线程执行结束----
ThreadPoolExecutor-0_1执行求和操作求得的和是=1225
ThreadPoolExecutor-0_1运行结果:1225
ThreadPoolExecutor-0_0执行求和操作求得的和是=190
ThreadPoolExecutor-0_0运行结果:190

从结果可以看出获取线程执行结果的方法完全没有阻塞到主线程的运行。这里通过add_done_callback函数向线程池中注册了一个获取线程执行结果的函数get_result。
由于线程池实现了上下文管理协议(Context Manage Protocol),因此程序可以使用with语句来管理线程池,这样即可避免手动关闭线程池。

线程池的运行策略

这里有必要介绍一下线程池的执行策略,也就是说当线程池中的任务数大于线程池的最大线程数时,线程池该如何处理这些任务呢?处理不了的任务是直接丢弃还是慢慢处理呢?再回答这个问题之前,让我们来看下下面这个例子:这里定义了一个最大线程数是4个线程池,然后向线程池中提交了100个task任务。

def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=4) as pool:
    for i in range(100):
        pool.submit(async_add, i)
    print('------------主线程执行结束----')

运行结果是:

------------主线程执行结束----
ThreadPoolExecutor-0_1执行求和操作求得的和是=0
ThreadPoolExecutor-0_0执行求和操作求得的和是=0
ThreadPoolExecutor-0_3执行求和操作求得的和是=3
ThreadPoolExecutor-0_2执行求和操作求得的和是=1
...省略部分结果.....
ThreadPoolExecutor-0_1执行求和操作求得的和是=4656
ThreadPoolExecutor-0_2执行求和操作求得的和是=4753
ThreadPoolExecutor-0_0执行求和操作求得的和是=4560
ThreadPoolExecutor-0_3执行求和操作求得的和是=4851

从运行结果可以看出:一直都是相同的线程来执行这些任务,并且所有的任务都没有被丢弃。并且任务按照先来后到的顺序来执行。这里就需要说到线程池默认的缓冲队列了。self._work_queue = queue.Queue() 该语句会创建一个大小无限制的缓冲队列。该队列是一个 FIFO(先进先出)的常规队列。所以当任务数超过最大线程数时,任务会暂时放在缓冲队列queue中。当线程空闲之后会从缓冲队列中取出任务来执行。
该队列有个参数maxsize可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时,就会被阻塞,直到队列中的元素被消费。如果将maxsize的设置为0或者负数时,则该队列的大小就是无限制的。

总结

本文详细介绍了线程池的使用,希望对读者朋友们有所帮助。

我是码农飞哥,再次感谢您读完本文

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。