Python实现进程同步和通信

举报
风吹稻花香 发表于 2021/06/05 00:06:26 2021/06/05
【摘要】 Python实现进程同步和通信 如之前创建多进程的例子 # -*- coding:utf-8 -*-from multiprocessing import Process,Poolimport os,time def run_proc(name): ##定义一个函数用于进程调用 for i in range(5): time.sleep(0.2) #休眠0.2秒...

Python实现进程同步和通信

如之前创建多进程的例子


    
  1. # -*- coding:utf-8 -*-
  2. from multiprocessing import Process,Pool
  3. import os,time
  4. def run_proc(name): ##定义一个函数用于进程调用
  5. for i in range(5):
  6. time.sleep(0.2) #休眠0.2秒
  7. print 'Run child process %s (%s)' % (name, os.getpid())
  8. #执行一次该函数共需1秒的时间
  9. if __name__ =='__main__': #执行主进程
  10. print 'Run the main process (%s).' % (os.getpid())
  11. mainStart = time.time() #记录主进程开始的时间
  12. p = Pool(8) #开辟进程池
  13. for i in range(16): #开辟14个进程
  14. p.apply_async(run_proc,args=('Process'+str(i),))#每个进程都调用run_proc函数,
  15. #args表示给该函数传递的参数。
  16. print 'Waiting for all subprocesses done ...'
  17. p.close() #关闭进程池
  18. p.join() #等待开辟的所有进程执行完后,主进程才继续往下执行
  19. print 'All subprocesses done'
  20. mainEnd = time.time() #记录主进程结束时间
  21. print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主进程执行时间
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

运行结果:

Run the main process (36652). 
Waiting for all subprocesses done … 
Run child process Process0 (36708)Run child process Process1 (36748)

Run child process Process3 (36736) 
Run child process Process2 (36716) 
Run child process Process4 (36768)

如第3行的输出,偶尔会出现这样不如意的输入格式,为什么呢? 
原因是多个进程争用打印输出资源的结果。前一个进程为来得急输出换行符,该资源就切换给了另一个进程使用,致使两个进程输出在同一行上,而前一个进程的换行符在下一次获得资源时才打印输出。

Lock

为了避免这种情况,需在进程进入临界区(使进程进入临界资源的那段代码,称为临界区)时加锁。 
可以向如下这样添加锁后看看执行效果:


    
  1. # -*- coding:utf-8 -*-
  2. lock = Lock() #申明一个全局的lock对象
  3. def run_proc(name):
  4. global lock #引用全局锁
  5. for i in range(5):
  6. time.sleep(0.2)
  7. lock.acquire() #申请锁
  8. print 'Run child process %s (%s)' % (name, os.getpid())
  9. lock.release() #释放锁
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Semaphore

Semaphore为信号量机制。当共享的资源拥有多个时,可用Semaphore来实现进程同步。其用法和Lock差不多,s = Semaphore(N),每执行一次s.acquire(),该资源的可用个数将减少1,当资源个数已为0时,就进入阻塞;每执行一次s.release(),占用的资源被释放,该资源的可用个数增加1。

多进程的通信(信息交互)

不同进程之间进行数据交互,可能不少刚开始接触多进程的同学会想到共享全局变量的方式,这样通过向全局变量写入和读取信息便能实现信息交互。但是很遗憾,并不能这样实现。具体原因,看这篇文章。

下面通过例子,加深对那篇文章的理解:


    
  1. # -*- coding:utf-8 -*-
  2. from multiprocessing import Process, Pool
  3. import os
  4. import time
  5. L1 = [1, 2, 3]
  6. def add(a, b):
  7. global L1
  8. L1 += range(a, b)
  9. print L1
  10. if __name__ == '__main__':
  11. p1 = Process(target=add, args=(20, 30))
  12. p2 = Process(target=add, args=(30, 40))
  13. p1.start()
  14. p2.start()
  15. p1.join()
  16. p2.join()
  17. print L1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

输出结果:

[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] 
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39] 
[1, 2, 3]

该程序的原本目的是想将两个子进程生成的列表加到全局变量L1中,但用该方法并不能达到想要的效果。既然不能通过全局变量来实现不同进程间的信息交互,那有什么办法呢。 
mutiprocessing为我们可以通过Queue和Pipe来实现进程间的通信。

Queue

按上面的例子通过Queue来实现:


    
  1. # -*- coding:utf-8 -*-
  2. from multiprocessing import Process, Queue, Lock
  3. L = [1, 2, 3]
  4. def add(q, lock, a, b):
  5. lock.acquire() # 加锁避免写入时出现不可预知的错误
  6. L1 = range(a, b)
  7. lock.release()
  8. q.put(L1)
  9. print L1
  10. if __name__ == '__main__':
  11. q = Queue()
  12. lock = Lock()
  13. p1 = Process(target=add, args=(q, lock, 20, 30))
  14. p2 = Process(target=add, args=(q, lock, 30, 40))
  15. p1.start()
  16. p2.start()
  17. p1.join()
  18. p2.join()
  19. L += q.get() + q.get()
  20. print L
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

执行结果:

[20, 21, 22, 23, 24, 25, 26, 27, 28, 29] 
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39] 
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]

下面介绍Queue的常用方法:

  • 定义时可用q = Queue(maxsize = 10)来指定队列的长度,默认时或maxsize值小于1时队列为无限长度。
  • q.put(item)方法向队列放入元素,其还有一个可选参数block,默认为True,此时若队列已满则会阻塞等待,直到有空闲位置。而当black值为 False,在该情况下就会抛出Full异 常
  • Queue是不可迭代的对象,不能通过for循环取值,取值时每次调用q.get()方法。同样也有可选参数block,默认为True,若此时队列为空则会阻塞等待。而black值为False时,在该情况下就会抛出Empty异常
  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.get([block[, timeout]]) 获取队列,timeout等待时间Queue.get_nowait() 相当Queue.get(False) 非阻塞 Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)

Pipe

Pipe管道,可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。双向Pipe允许两端的进即可以发送又可以接受;单向的Pipe只允许前面的端口用于接收,后面的端口用于发送。 
下面给出例子:


    
  1. # -*- coding:utf-8 -*-
  2. from multiprocessing import Process, Pipe
  3. def proc1(pipe):
  4. s = 'Hello,This is proc1'
  5. pipe.send(s)
  6. def proc2(pipe):
  7. while True:
  8. print "proc2 recieve:", pipe.recv()
  9. if __name__ == "__main__":
  10. pipe = Pipe()
  11. p1 = Process(target=proc1, args=(pipe[0],))
  12. p2 = Process(target=proc2, args=(pipe[1],))
  13. p1.start()
  14. p2.start()
  15. p1.join()
  16. p2.join(2) #限制执行时间最多为2秒
  17. print '\nend all processes.'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

执行结果如下:

proc2 recieve: Hello,This is proc1 
proc2 recieve: 
end all processes.

当第二行输出后,因为管道中没有数据传来,Proc2处于阻塞状态,2秒后被强制结束。 
以下是单向管道的例子,注意pipe[0],pipe[1]的分配。


    
  1. # -*- coding:utf-8 -*-
  2. from multiprocessing import Process, Pipe
  3. def proc1(pipe):
  4. s = 'Hello,This is proc1'
  5. pipe.send(s)
  6. def proc2(pipe):
  7. while True:
  8. print "proc2 recieve:", pipe.recv()
  9. if __name__ == "__main__":
  10. pipe = Pipe(duplex=False)
  11. p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]为发送端
  12. p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]为接收端
  13. p1.start()
  14. p2.start()
  15. p1.join()
  16. p2.join(2) # 限制执行时间最多为2秒
  17. print '\nend all processes.'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

执行结果同上。

强大的Manage

Queue和Pipe实现的数据共享方式只支持两种结构 Value 和 Array。Python中提供了强大的Manage专门用来做数据共享,其支持的类型非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等 
其用法如下:


    
  1. from multiprocessing import Process, Manager
  2. def func(dt, lt):
  3. for i in range(10):
  4. key = 'arg' + str(i)
  5. dt[key] = i * i
  6. lt += range(11, 16)
  7. if __name__ == "__main__":
  8. manager = Manager()
  9. dt = manager.dict()
  10. lt = manager.list()
  11. p = Process(target=func, args=(dt, lt))
  12. p.start()
  13. p.join()
  14. print dt, '\n', lt
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

执行结果:

{‘arg8’: 64, ‘arg9’: 81, ‘arg0’: 0, ‘arg1’: 1, ‘arg2’: 4, ‘arg3’: 9, ‘arg4’: 16, ‘arg5’: 25, ‘arg6’: 36, ‘arg7’: 49} 
[11, 12, 13, 14, 15]

文章来源: blog.csdn.net,作者:网奇,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/jacke121/article/details/79281846

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200