[技术交流] TensorFlow 队列与线程

TensorFlow 队列与线程

深度学习的模型训练过程往往需要大量的数据,而将这些数据一次性的读入和预处理需要大量的时间开销,所以通常采用队列与多线程的思想解决这个问题,而且TensorFlow为我们提供了完善的函数。

本文介绍了TensorFlow的线程和队列。在使用TensorFlow进行异步计算时,队列是一种强大的机制。正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素**到队列后端(rear),也可以把队列前端(front)的元素删除。

Python中是没有提供直接实现队列的函数的,所以通常会使用列表模拟队列。TensorFlow提供了整套实现队列的函数和方法,在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。操作队列的函数主要有:

l  FIFOQueue():创建一个先入先出(FIFO)的队列

l  RandomShuffleQueue():创建一个随机出队的队列

l  enqueue_many():初始化队列中的元素

l  dequeue():出队 enqueue():入队

与队列Queue有关的有以下三个概念:

l  Queue是TF队列和缓存机制的实现

l  QueueRunner是TF中对操作Queue的线程的封装

l  Coordinator是TF中用来协调线程运行的工具

虽然它们经常同时出现,但这三样东西在TensorFlow里面是可以单独使用的,不妨先分开来看待。

1.    Queue,队列

根据实现的方式不同,分成具体的几种类型,例如:

l  tf.FIFOQueue 按入列顺序出列的队列

l  tf.RandomShuffleQueue 随机顺序出列的队列

l  tf.PaddingFIFOQueue 以固定长度批量出列的队列

l  tf.PriorityQueue 带优先级出列的队列

l  ...

这些类型的Queue除了自身的性质不太一样外,创建、使用的方法基本是相同的,以下介绍两个最常用的

1)  tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 创建一个以先进先出的顺序对元素进行排队的队列

参数:

capacity:整数。可能存储在此队列中的元素数量的上限

dtypesDType对象列表。长度dtypes必须等于每个队列元 素中的张量数,dtype的类型形状,决定了后面进队列元素形状

方法:

q.dequeue()获取队列的数据

q.enqueue()将一个数据添加进队列

q.enqueue_many(列表或者元组)将多个数据添加进队列

q.size() 返回队列的大小

2)tf.RandomShuffleQueue() 随机出的队列

Queue主要包含入列(enqueue)和出列(dequeue)两个操作。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为声明),需要放在Session中运行才能获得真正的数值。下面是一个单独使用Queue的例子:

#模拟同步操作
 
'''
1.
创建一个空的队列
2.向队列设置几个初始的值
3.从队列中取出一个数据,再将数据加1
4.数据加1之后的结果进队列
5.打印出队列中的数据
'''
 
def sync_test():
   
#创建一个空的队列
   
q = tf.FIFOQueue(3,tf.float32)
   
#向队列设置几个初始的值
   
enq_many = q.enqueue_many(([0.1,0.2,0.3],))
 
   
#从队列中取出一个数据,再将数据加1
   
out_q = q.dequeue()
    data = out_q+
1
   
#数据加1之后的结果进队列
   
en_q = q.enqueue(data)
   
with tf.Session() as sess:
        sess.run(enq_many)
 
       
#执行en_q op 99次
       
for i in range(99):
            sess.run(en_q)
 
       
#打印出队列中的数据
       
for i in range(q.size().eval()):
           
print(q.dequeue().eval())
   
return None
 

 

2.     QueueRunner,队列管理器

tf.train.QueueRunner(queue, enqueue_ops=None)

参数:

l  queue:A Queue

l  enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程

l  create_threads(sess, coord=None,start=False) 创建线程来运行给定会话的入队操作

l  start:布尔值,如果True启动线程;如果为False调用者 必须调用start()启动线程

l  coord:线程协调器  用于线程的管理

Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。

QueueRunner需要与Queue一起使用(这名字已经注定了它和Queue脱不开干系),但并不一定必须使用Coordinator。看下面这个例子:

import tensorflow as tf
 
import sys
q = tf.FIFOQueue(
10, "float")
counter = tf.Variable(
0.0#计数器
# 给计数器加一
 
increment_op = tf.assign_add(counter, 1.0)
 
# 将计数器加入队列
 
enqueue_op = q.enqueue(counter)
 
 
# 创建QueueRunner
# 用多个线程向队列添加数据
# 这里实际创建了4个线程,两个增加计数,两个执行入队
 
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)
 
 
# 主线程
 
sess = tf.InteractiveSession()
tf.global_variables_initializer().run()
 
# 启动入队线程
 
qr.create_threads(sess, start=True)
 
for i in range(20):
   
print (sess.run(q.dequeue()))

增加计数的进程会不停的后台运行,执行入队的进程会先执行10次(因为队列长度只有10),然后主线程开始消费数据,当一部分数据消费被后,入队的进程又会开始执行。最终主线程消费完20个数据后停止,但其他线程继续运行,程序不会结束。

3.    Coordinator,线程协调器

tf.train.Coordinator() 线程协调员,实现一个简单的机制来协调一 组线程的终止

方法: 返回的是线程协调实例

            request_stop()  请求停止

            join(threads=None, stop_grace_period_secs=120) 等待线程终止

Coordinator是个用来保存线程组运行状态的协调器对象,它和TensorFlowQueue没有必然关系,是可以单独和Python线程使用的。例如:

#模拟异步操作
 
'''
 
通过队列管理器来实现变量加1,入队,主线程出队列的操作,观察效果?
(异步操作)
'''
 
def async_test():
   
# # 1、定义一个队列,1000
   
q = tf.FIFOQueue(300,tf.float32)
 
   
# 2、定义要做的事情 循环值,+1, 放入队列当中
   
var = tf.Variable(0.0)
   
# 实现一个自增  tf.assign_add
   
data = tf.assign_add(var,tf.constant(1.0))
    en_q = q.enqueue(data)
 
   
# 3、定义队列管理器op, 指定多少个子线程,子线程该干什么事情
    # 1,2,3,4,5,6,7,8,9,10
   
qr = tf.train.QueueRunner(q, enqueue_ops=[en_q] * 2)
 
   
# 初始化变量的OP
   
init_op = tf.global_variables_initializer()
 
   
with tf.Session() as sess:
       
#运行初始化op
       
sess.run(init_op)
       
# 开启线程管理器,coord线程协调器
       
coord = tf.train.Coordinator()
       
# 真正开启子线程,读数据入队列,t1,t2  0.0001  10
       
threads = qr.create_threads(sess, coord=coord, start=True)
 
       
# 主线程,不断读取数据训练  t0
       
for i in range(300):
           
#
            
print(sess.run(q.dequeue()))
       
print("size==>",sess.run(q.size()))
       
# 停止子线程,回收
       
coord.request_stop()
        coord.join(threads)
 
   
return None
 

将这个程序运行起来,主线程会等待所有子线程都停止后结束,从而使整个程序结束。由此可见,只要有任何一个线程调用了Coordinatorrequest_stop方法,所有的线程都可以通过should_stop方法感知并停止当前线程。

QueueRunnerCoordinator一起使用,实际上就是封装了这个判断操作,从而使任何一个现成出现异常时,能够正常结束整个程序,同时主线程也可以直接调用request_stop方法来停止所有子线程的执行。

 

更多的技术内容,请访问腾科it教育集团网站 www.togogo.net