在 Python 中使用 Pool 进行多处理

举报
宇宙之一粟 发表于 2022/04/15 00:23:36 2022/04/15
【摘要】 为什么要引入线程池如果在程序中经常要用到线程,频繁的创建和销毁线程会浪费很多硬件资源,所以需要把线程和任务分离。线程可以反复利用,省去了重复创建的麻烦。在 Process 类中,我们必须显式地创建流程。但是,Pool 类更方便,您不必手动管理它。创建池对象的语法是 multiprocessing.Pool(processes, initializer, initargs, maxtasks...

为什么要引入线程池

如果在程序中经常要用到线程,频繁的创建和销毁线程会浪费很多硬件资源,

所以需要把线程和任务分离。线程可以反复利用,省去了重复创建的麻烦。

在 Process 类中,我们必须显式地创建流程。但是,Pool 类更方便,您不必手动管理它。创建池对象的语法是 multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild, context) 。所有参数都是可选的。

  • processes 表示您要创建的工作进程的数量。默认值通过 os.cpu_count() 获取。
  • initializer第二个初始化器参数是一个用于初始化的函数。
  • initargs 是传递给它的参数。
  • maxtasksperchild 表示分配给每个子进程的任务数。在完成该数量的任务之后,该进程将被一个新的工作进程替换。指定它的好处是任何未使用的资源都将被释放。如果未提供任何内容,则只要池存在,进程就会存在。
import time
from multiprocessing import Pool


def square(x):
    print(f"start process:{x}")
    square = x * x
    print(f"square {x}:{square}")
    time.sleep(1)
    print(f"end process:{x}")


if __name__ == "__main__":
    starttime = time.time()
    pool = Pool()
    pool.map(square, range(0, 5))
    pool.close()
    endtime = time.time()
    print(f"Time taken {endtime-starttime} seconds")

结果为:

start process:0
start process:1
square 1:1
square 0:0
end process:1
start process:2
end process:0
start process:3
square 2:4
square 3:9
end process:3
end process:2
start process:4
square 4:16
end process:4
Time taken 3.0474610328674316 seconds

在这里,我们从多处理模块中导入 Pool 类。在主函数中,我们创建了一个 Pool 类的对象。 pool.map() 将我们想要并行化的函数和一个可迭代的函数作为参数。它在可迭代的每个项目上运行给定的函数。它还接受一个可选的 chunksize 参数,它将可迭代对象拆分为等于给定大小的块,并将每个块作为单独的任务传递。 pool.close() 用于拒绝新任务。

我们可以看到花费的时间大约是 3 秒。
pool.imap()pool.map() 方法几乎相同。不同的是,每个项目的结果都是在准备好后立即收到的,而不是等待所有项目都完成。此外, map() 方法将可迭代对象转换为列表(如果不是)。但是, imap() 方法没有。

来看下一个例子:

import time
from multiprocessing import Pool


def square(x):
    print(f"start process {x}")
    square = x * x
    time.sleep(1)
    print(f"end process {x}")
    return square


if __name__ == "__main__":
    pool = Pool()
    a = pool.map(square, range(0, 5))
    print(a)

运行结果:

start process 0
start process 1
end process 0
start process 2
end process 1
start process 3
end process 2
start process 4
end process 3
end process 4
[0, 1, 4, 9, 16]
from concurrent.futures import ThreadPoolExecutor

def say_hello():
  	print("Hello")
    
executor = ThreadPoolExecutor(50)
for i in range(0, 10):
   executor.submit(say_hello)

练习

利用 Python 多线程模拟商品秒杀过程,不可以出现超买和超卖的情况。假设A商品有50件参与秒杀活动,10分钟秒杀自动结束。

  • kill_total 商品总数

  • kill_num 成功抢购数

  • kill_flag 有效标志位

  • kill_user 成功抢购的用户ID

from redis_db import pool
import redis
import random
from concurrent.futures import ThreadPoolExecutor

s = set()
while True:
  	if len(s) == 1000:
      	break
    num = random.randint(10000, 100000)
    s.add(num)
print(s)

con = redis.Redis(
		connection_pool=pool
)

try:
    con.delete("kill_total", "kill_num", "kill_flag", "kill_user")
    con.set("kill_total", 50)
    con.set("kill_num", 0)
    con.set("kill_flag", 1)
    con.expire("kill_flag", 600)
    
except Exception as e:
  	print(e)
finally:
  	del con
    
executor = ThreadPoolExecutor(200)
def buy():
  	connection = redis.Redis(
    		connection_pool=pool
    )
    pipline = connection.pipline()
    try:
      	if connection.exists("kill_flag") == 1:
      			pipline.watch("kill_num", "kill_user")
            total = pipline.get("kill_total")
            num = int(pipline.get("kill_num").decode("utf-8"))
            if num < total:
              	pipline.multi()
                pipline.incr("kill_num")
                user_id = s.pop()
                pipline.rpush("kill_user", user_id)
                
                pipline.execute()
    except Exception as e:
      	print(e)
    finally:
      	if "pipline" in dir():
          	pipline.reset()
      	del connection
 
for i in range(0, 1000):
		executor.submit(buy)
print("秒杀活动已经结束")
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。