进阶Python之线程进程篇

举报
王小王-123 发表于 2021/04/20 00:15:33 2021/04/20
【摘要】 线程与进程 线程是程序运行的基本执行单元,在系统里面至少会创建一个进程,而在一个进程里面也必须要创建一个线程。如果这样理解的话,进程包含线程,而一个进程中至少要有一个线程。 要注意的是进程之间是不可以共享内存的,比如我们电脑的系统,它不会共享的,但是线程之间是可以共享进程的CPU的,它们我们可以理解为是我们运行的程序,当我们运行了很多的程序,CPU不够,这个时候电脑...

线程与进程

线程是程序运行的基本执行单元,在系统里面至少会创建一个进程,而在一个进程里面也必须要创建一个线程。如果这样理解的话,进程包含线程,而一个进程中至少要有一个线程。

要注意的是进程之间是不可以共享内存的,比如我们电脑的系统,它不会共享的,但是线程之间是可以共享进程的CPU的,它们我们可以理解为是我们运行的程序,当我们运行了很多的程序,CPU不够,这个时候电脑就比较卡顿,那我们就可以很好地理解,这是一个线程之间共享进程的资源了。

在Python程序中,我们要学会运用进程与线程,尽可能的调用我们的电脑CPU,提高我们的效率,保证程序可以快速的跑起来!
在这里插入图片描述

Python中的线程处理

t1 = threading.Thread(target=你写的函数名,args=(传入变量(如果只有一个变量就必须在后加上逗号),),name=随便取一个线程名):把一个线程实例化给t1,这个线程负责执行target=你写的函数名
t1.start():负责执行启动这个线程
t1.join():必须要等待你的子线程执行完成后再执行主线程
t1.setDeamon(True):当你的主线程执行完毕后,不管子线程有没有执行完成都退出主程序,注意不能和t1.join()一起使用。
threading.current_thread().name:打印出线程名


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

_thread模块

这个模块是为了兼容Python3的一个合并模块,把之前的thread程序给命名为_thread。我们用的比较少,但是我们还是了解一下:

_thread.start__new__thread(function,age)
function 是一个函数,我们自己封装好的
age 是函数里面的参数,必须是元组类型

  
 
  • 1
  • 2
  • 3

threading模块(常用)

上面的那个模块只是提供了比较低级的功能,我们常用的还是threading模块来完成一些高级的操作。

import threading
threading.currentThread()#返回当前线程变量
threading.enumerate()#返回一个正在运行的线程列表
threading.activeCount()#返回正在运行的线程数量

  
 
  • 1
  • 2
  • 3
  • 4

除了上述的一些方法,我们还有一些其他的方法,比如run(),start(),jion(),isAlive(),getName(),setName()。

使用

def zhiyun(x,y): for i in range(x,y): print(str(i*i)+";")
ta=threading.Thread(target=zhiyun,args=(1,6))
tb=threading.Thread(target=zhiyun,args=(16,21))
ta.start()
tb.start()
print(tb.getName())

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述
通过类的继承来创建

class myThread(threading.Thread):
#定义继承类threading.Thread的子类myThread def __init__(self,mynum): super().__init__()#处理父类与子类的关系 self.myunm=mynum def run(self): for i in range(self.myunm,self.myunm+5): print(str(i*i)+";")
ma=myThread(1)
mb=myThread(16)
ma.start()
mb.start()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述通过类的基础一样达到效果

线程等待

在Python程序中,我们需要等待某一个线程完成后才可以继续,这个时候我们用到join()实现线程等待

import time
def zhiyun(x,y,thr=None):#包含一个线程实例 if thr: thr.join() else: time.sleep(2) for i in range(x,y): print(str(i * i) + ";")
tb=threading.Thread(target=zhiyun,args=(16,21))
ta=threading.Thread(target=zhiyun,args=(1,6,tb))
tb.start()
ta.start()
# 这里调用了join方法,等到tb执行完后才可以执行ta.

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述
线程同步

如果是多个线程对某一个数据进行修改的话,那么就会出现很多不可以预料的事情。所以我们要同步线程,在Python中我们使用Thread对象的属性lock和属性Rlock可以简单的实现线程同步的功能。

对于那些每次只允许一个线程操作的数据,我们可以将其操作放在acquire和release方法之间。多线程的优势就是,可以同时运行多个任务,但是我们在共享数据的时候,就可能存在数据不同步的问题。

比如有这样一个实例,一个列表里面的元素都是0,线程1从后面向前面把所有的数据都修改为1,而线程2负责打印,从前往后读取输出,那么现在就可能存在一个问题了,输出的时候有一半是0,一半是1,就造成了数据的不同步。

那么我们为了避免这种情况,我们就引入了“锁”的概念。锁有两种概念,分别是锁定和未锁定,当一个线程想要共享数据的时候,首先必须要获得锁定,如果已经有了另外一个线程提前获得了锁定,那么我们这个线程就只能等待了暂停,也就是“同步阻塞”等到之前那个线程访问完毕,释放锁之后,在让之前的运行。

比如我们之前的那个问题,我们就可以先让修改的那个线程获得锁定,让它执行完毕之后,释放锁定在让打印这个线程去执行,这样就实现了数据同步了。

import time
class mt(threading.Thread):#继承类的子类 def run(self):#定义重载函数run global x lock.acquire()#在操作x变量之前锁定的资源,进行下面的那一个变量叠加,如果不去锁定资源,就会出现很多东西 for i in range(5):#遍历操作 x+=10 time.sleep(1)#休眠1秒 print(x) lock.release()#释放锁资源
x=0
lock=threading.RLock()#实例化可重入锁类
def main(): thrs=[] for i in range(10): thrs.append(mt())#实例化线程类 for item in thrs: item.start()#启动线程
if __name__=="__main__": main()

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述
在上述代码里面,自定义了一个带锁访问全局变量X的线程类mt,在主函数main()初始化了10个线程修改X,在同一个时刻只能由一个线程对x进行操作。

线程优先级队列模块queue

这个模块提供了同步的,线程安全的队列类

常用方法:

Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当于Queue.get(False),非阻塞方法
Queue.put(item) 写入队列,timeout等待时间
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。
Queue.join() 实际上意味着等到队列为空,再执行别的操作

生产者消费者模式

生产者消费者模式并不是GOF提出的众多模式之一,但它依然是开发同学编程过程中最常用的一种模式

在这里插入图片描述
生产者模块儿负责产生数据,放入缓冲区,这些数据由另一个消费者模块儿来从缓冲区取出并进行消费者相应的处理。该模式的优点在于:

解耦:缓冲区的存在可以让生产者和消费者降低互相之间的依赖性,一个模块儿代码变化,不会直接影响另一个模块儿

并发:由于缓冲区,生产者和消费者不是直接调用,而是两个独立的并发主体,生产者产生数据之后把它放入缓冲区,就继续生产数据,不依赖消费者的处理速度

这个里面有一些常见的数据结构,以及用法
https://www.jianshu.com/p/3e422a96b008

import queue

q = queue.Queue(3)  # 调用构造函数,初始化一个大小为3的队列
print(q.empty())  # 判断队列是否为空,也就是队列中是否有数据
#  入队,在队列尾增加数据, block参数,可以是True和False 意思是如果队列已经满了则阻塞在这里,
# timeout 参数 是指超时时间,如果被阻塞了那最多阻塞的时间,如果时间超过了则报错。
q.put(13, block=True, timeout=5)
print(q.full())  # 判断队列是否满了,这里我们队列初始化的大小为3
print(q.qsize())  # 获取队列当前数据的个数
#  block参数的功能是 如果这个队列为空则阻塞,
#  timeout和上面一样,如果阻塞超过了这个时间就报错,如果想一只等待这就传递None
print(q.get(block=True, timeout=None))

#  queue模块还提供了两个二次封装了的函数,
q.put_nowait(23)  # 相当于q.put(23, block=False)
q.get_nowait()  # 相当于q.get(block=False)


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()
阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

基本的FIFO队列

这个表示先进先出队列

import queue
q=queue.Queue()#创建一个队列对象实例
for i in range(8):#遍历操作 q.put(i)#调用队列对象的put方法在队尾插入一项
while not q.empty():#如归队列不为空 print(q.get())#显示队列信息

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述
LIFO队列
表示后进先出队列

import queue
q=queue.LifoQueue()
for i in range(10): q.put(i)
while not q.empty(): print(q.get())

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

优先级队列

import queue
import random
q=queue.PriorityQueue()#级别越低越先出队列
class Node: def __init__(self,x): self.x=x def __lt__(self, other):#内置函数 return other.x>self.x def __str__(self): return "{}".format(self.x)
a=[Node(int(random.uniform(0,10))) for i in range(10)]#产生10个随机数
for i in a: print(i,end=" ") q.put(i)
print("****************")
while not q.empty(): print(q.get(),end=" ")#按序转出列表中的数字

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述
这里自定义节点的时候,我们要用到__lt__()函数,这样优先级队列才知道如何对节点进行排序。

模板

	# coding:utf-8
	from concurrent.futures import ThreadPoolExecutor
	# 导入线程池模块
	import threading
	# 导入线程模块,作用是获取当前线程的名称
	import os,time def task(n): print('%s:%s is running' %(threading.currentThread().getName(),os.getpid())) # 打印当前线程名和运行的id号码 time.sleep(2) return n**2 # 返回传入参数的二次幂 if __name__ == '__main__': p=ThreadPoolExecutor() #实例化线程池,设置线程池的数量,不填则默认为cpu的个数*5 l=[] # 用来保存返回的数据,做计算总计 for i in range(10): obj=p.submit(task,i) # 这里的obj其实是futures的对象,使用obj.result()获取他的结果 # 传入的参数是要执行的函数和该函数接受的参数 # submit是非堵塞的 # 这里执行的方式是异步执行 # ----------------------------------- # # p.submit(task,i).result()即同步执行 # ----------------------------------- # 上面的方法使用range循环有个高级的写法,即map内置函数 # p = ThreadPoolExecutor() # obj=p.map(task,range(10)) # p.shutdown() # 这里的obj的值就是直接返回的所有计算结果,不属于futures对象 # ----------------------------------- l.append(obj) # 把返回的结果保存在空的列表中,做总计算 p.shutdown() # 所有计划运行完毕,关闭结束线程池 print('='*30) print([obj.result() for obj in l]) #上面方法也可写成下面的方法 # with ThreadPoolExecutor() as p:   #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks])


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

subprocess创建进程

模板

from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n): print('%s is running' %os.getpid()) time.sleep(2) return n**2


if __name__ == '__main__': p=ProcessPoolExecutor()  #不填则默认为cpu的个数 l=[] start=time.time() for i in range(10): obj=p.submit(task,i)   #submit()方法返回的是一个future实例,要得到结果需要用obj.result() l.append(obj) p.shutdown()  #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用 print('='*30) # print([obj for obj in l]) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可写成下面的方法 # start = time.time() # with ProcessPoolExecutor() as p:   #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

对于进程的研究我这里就不过多的说明了,主要是线程的运用

每文一语

我们不能改变过去的失意,但是我们可以改变现在的自己!

文章来源: blog.csdn.net,作者:王小王-123,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/weixin_47723732/article/details/108113006

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。