Python3,如何实现CPU的并行计算,那还不简单,5种方式,这篇就搞定。

举报
Carl_奕然 发表于 2023/07/05 16:03:01 2023/07/05
【摘要】 网友:果然,今天是跟并行计算杠上了。

1、引言


小云:鱼哥,你上一篇《Python3,掌握这几种并行处理,轻轻松松提升for循环速度》写的很赞, 我还想继续深入并行计算。
小鱼:那你的意思,哪一篇写的不深呗?
小云:我可没有那个意思。
小鱼:我也没说你是哪个意思。
小云:我就是想着你能不能再讲一讲,关于如何实现CPU并行计算。
小鱼:我屮艸芔茻…你这是有啥心思?
小云:我… 我没有。

在这里插入图片描述
小鱼:如实说来,或许,我还能讲一讲。
小云:当真?
小鱼:当真…
小云:就是,我要在公司分享一些技术,就涉及到 如何实现CPU并行计算方面的姿势 . 知识。
小鱼:这样啊, 那你直接说就好了。 我又不是特别喜欢黑桃A。
小云:… 整,整,整吧。

2、实战


2.1 多进程模式


这里,我们主要以Python中的额多进程模式进行讲解。

2.1.1 定义


在Python中,多进程模式是指同时运行多个进程来执行任务的一种编程模式。
Python提供了多个模块来支持多进程编程,其中最常用的是multiprocessing模块。


2.1.2 multiprocessing


使用multiprocessing模块可以创建和管理多个进程,每个进程都有自己独立的内存空间和执行环境。
通过将任务分配给不同的进程,可以实现并行执行,提高程序的运行效率。


2.1.3 模式


在多进程模式下,每个进程都有自己的主程序流程,可以独立执行任务。
进程之间可以通过进程间通信(IPC)机制来进行数据交换和同步操作。


2.1.4 适用场景


多进程模式适用于需要充分利用多核处理器或执行耗时任务的场景。
通过将任务分配给多个进程,可以充分利用系统资源,提高程序的运行效率。


2.1.4 代码示例

代码示例



# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ


import multiprocessing

def worker(num):
    """子进程的任务函数"""
    print(f'Worker {num} started')
    # 执行一些任务
    print(f'Worker {num} finished')

if __name__ == '__main__':
    # 创建多个子进程
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    # 等待所有子进程结束
    for p in processes:
        p.join()

    print('All workers finished')


2.2 执行方法


2.2.1 多线程并发执行


多进程模式下,将任务分配给多个进程并行执行,从而利用多核CPU的优势。
这可以说作为一名码农,必备的知识点。

这里,我们同样使用multiprocessing 来实现一个并发执行任务的示例。

代码示例

# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ

import multiprocessing

def task(name):
    print(f"Running task {name}")

if __name__ == "__main__":
    # 创建进程池,最大进程数为4
    pool = multiprocessing.Pool(processes=4)

    # 提交任务到进程池
    for i in range(10):
        pool.apply_async(task, args=(i,))

    # 关闭进程池,不再接受新的任务
    pool.close()

    # 等待所有任务完成
    pool.join()

    print("All tasks completed")


2.2.2 进程池


对于大量重复的任务,
使用进程池来维护一定数量的进程,每个进程执行一个任务后返回结果,然后再由进程池分配下一个任务。
这样的好处就是:避免频繁地创建和销毁进程,从而提高效率。

我们使用 multiprocessing模块的Pool类来实现进程池。

代码示例

# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ

import multiprocessing

def worker(num):
    print('Worker', num)

if __name__ == '__main__':
    # 创建一个进程池,最大进程数为3
    pool = multiprocessing.Pool(processes=3)

    # 使用进程池执行任务
    for i in range(5):
        pool.apply_async(worker, (i,))

    # 关闭进程池,不再接受新的任务
    pool.close()

    # 等待所有任务完成
    pool.join()


2.2.3 消息队列


在多进程模式下,不同的进程之间需要进行通信,可以利用消息队列来实现进程间通信。

我们使用Queue模块来实现消息队列。

代码示例

# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ

from queue import Queue
import time

# 创建一个消息队列
message_queue = Queue()

# 生产者函数,向消息队列中添加消息
def producer():
    for i in range(5):
        message = f"Message {i+1}"
        message_queue.put(message)
        print(f"Produced: {message}")
        time.sleep(1)

# 消费者函数,从消息队列中获取消息并处理
def consumer():
    while True:
        message = message_queue.get()
        print(f"Consumed: {message}")
        time.sleep(2)
        message_queue.task_done()

# 创建并启动生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# 等待生产者和消费者线程结束
producer_thread.join()
consumer_thread.join()


2.2.4 内存共享


对于需要多个进程共享的数据,可以使用共享内存来避免数据拷贝和进程间通信的开销。

我们使用multiprocessing模块的Value和Array类来实现共享内存

代码示例

# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ

from multiprocessing import Process, Value, Array

# 定义一个共享变量
shared_value = Value('i', 0)

# 定义一个共享数组
shared_array = Array('d', [0.0, 1.0, 2.0, 3.0, 4.0])

# 定义一个函数,用于修改共享变量和数组的值
def modify_shared_data(value, array):
    value.value = 10
    for i in range(len(array)):
        array[i] = i * 2

# 创建一个子进程,传入共享变量和数组
p = Process(target=modify_shared_data, args=(shared_value, shared_array))
p.start()
p.join()

# 打印共享变量和数组的值
print("Shared value:", shared_value.value)
print("Shared array:", shared_array[:])

2.2.5 异步IO


对于I/O密集型任务,可以使用异步IO来提高效率。

我们使用asyncio模块来实现异步IO。

代码示例

# -*- coding:utf-8 -*-
# @Time   : 2023-07-01
# @Author : Carl_DJ


import asyncio

async def fetch_data(url):
    print(f"正在请求URL:{url}")
    await asyncio.sleep(2)  # 模拟网络请求延迟
    print(f"请求URL:{url}完成")
    return f"从{url}获取的数据"

async def main():
    urls = [
        "https://www.example.com",
        "https://www.google.com",
        "https://www.baidu.com"
    ]
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)

if __name__ == "__main__":
    asyncio.run(main())



3、总结


看到这里,今天的分享差不多就到这里了。
今天主要针对在Python中, 使用多进程模式来实现CPU的并行计算。如:

  • 多线程并发执行
  • 进程池
  • 消息队列
  • 内存共享
  • 异步IO


在实际的项目中,很多地方都会用到并行计算, 这不仅提高的代码执行效率, 也提高了用户的满意度。

我是小鱼:

  • CSDN 博客专家;
  • 阿里云 专家博主;
  • 51CTO 博客专家;
  • 51认证讲师;
  • 认证金牌面试官;
  • 职场培训规划师;
  • 多个国内主流技术社区的认证专家博主;
  • 多款主流产品(阿里云等)测评一、二等奖获得者;


关注我,带你学习更多更专业更前言的Python技术。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。