Python中级精华线程间通信基础知识及常用的队列通信(Queue)
常见的基本概念:
首先普及下进程和线程的概念:
进程:进程是操作系统资源分配的基本单位。
线程:线程是任务调度和执行的基本单位。
一个应用程序至少一个进程,一个进程至少一个线程。
两者区别:同一进程内的线程共享本进程的资源如内存、I/O、cpu等,但是进程之间的资源是独立的。
并发 & 并行:
并发:在操作系统
中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机
上运行。简言之,是指系统具有处理多个任务的能力。
并行:当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行
(Parallel)。简言之,是指系统具有同时处理多个任务的能力。
同步 & 异步:
对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区
中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核
拷贝到进程中 (Copying the data from the kernel to the process)
同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等)
异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其他任务,一直等到数据接收成功,再回来处理。异步(例如发短信)
总结:
只要有一丁点阻塞,就是阻塞IO。异步IO的特点就是全程无阻塞。
有些人常把同步阻塞和异步非阻塞联系起来,但实际上经过分析,阻塞与同步,非阻塞和异步的定义是不一样的。同步和异步的区别是遇到IO请求是否等待。阻塞和非阻塞的区别是数据没准备好的情况下是否立即返回。同步可能是阻塞的,也可能是非阻塞的,而非阻塞的有可能是同步的,也有可能是异步的。
一般通用(非python层面)的进程线程间通信的常见方式:
进程间通信:
管道(pipe):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用进程间的亲缘关系
通常是指父子进程关系。
命名管道(named pipe/FIFO):命名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
信号量
(semophonre):信号量是一个计数器,可以用来控制多个进程队共享资源的访问。它常作为一个锁机制,防止某进程在访问共享资源时,其他进程也访问此资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
消息队列(message queue):消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
信号
(sinal):信号是一种比较复杂的通信方式,用于通知接受进程某个事件已经发生。
共享内存(shared memory):共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的ipc通信方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往和其他通信方式如信号量,配合使用来实现进程间的同步和通信。
套接字(socket):套接字也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同设备间的进程通信。
全双工管道:共享内存、信号量、消息队列、管道和命名管道只适用于本地进程间通信,套接字和全双工管道可用于远程通信,因此可用于网络编程。
线程间通信:
锁机制:包括互斥锁、条件变量、读写锁
互斥锁:提供了以排他方式防止数据结构被并发修改的方法。
读写锁:允许多个线程同时共享数据,而对写操作是互斥的。
条件变量:可以以原子的方式阻塞进程,直到某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。
信号量机制(Semaphore):包括无名进程信号量和命名线程信号量
信号机制(Signal):类似进程间的信号处理
python中的安全线程通信手段queue:
目的:
程序中有多个线程,想在这些线程间实现安全的通信或者交换数据。
方法:
在python中将数据从一个线程发送到另外一个线程可能最安全的方法就是使用queue库中的Queue模块了。
要想做到这些,首先就要创建一个Queue 的实例,所有线程会共享其。之后线程可以通过put()和get()方法来给队列添加和删除元素。
例子:from queue import Queue from threading import Thread #经典的生产者消费者模式: # 一个线程专门用于生产数据 def producer(out_q): while True: # 生产些数据 ... out_q.put(data) # 一个线程作为消费者用于取出queue中的数据 def consumer(in_q): while True: # 消耗数据 data = in_q.get() # 处理数据 ... q = Queue() t1 = Thread(target=producer,args=(q,)) t2 = Thread(target=consumer,args=(q,)) t1.start() t2.start()
Queue已经实现了所有所需的锁,因此它们可以安全地在任意多的线程之间进行共享。当使用队列时,如何对生产者和消费者的关闭过程进行同步协调需要一些技巧,这个问题的结局方法是使用一个特殊的终止值。当我们将它放入队列时,就使消费者退出:from threading import Thread import queue import time _sentinel = object() def producer(out_q): times = 10 while times: data = "hello world!" print("producer send a data", str(times)+"times",sep="---") out_q.put(data) time.sleep(2) times -= 1 out_q.put(_sentinel) def consumer(in_q): while True: data = in_q.get() if data is _sentinel: in_q.put(_sentinel)# 用过之后还回队列,方便其他消费者线程使用;可以保证逐个去关闭其他线程 break print("consumer recive a data:", data,sep="---") q = queue.Queue() Thread(target=producer,args=(q,)).start() Thread(target=consumer,args=(q,)).start() # 结果为: """ producer send a data---10times consumer recive a data:---hello world! producer send a data---9times consumer recive a data:---hello world! ... producer send a data---2times consumer recive a data:---hello world! producer send a data---1times consumer recive a data:---hello world! Process finished with exit code 0 """
尽管队列时线程间通信的最常见机制,但是只要添加了所需要的锁和同步功能,就可以构建自己的线程安全型的数据结构,最常见的做法是将你的数据结构和条件变量打包在一起,比如如下示例构建了一个优先级队列数据结构:
这里需要借助优先级队列heapq库,对这个库做个简单说明:库中有两个最常用的方法,一个是heappop函数,另外一个是heappush函数;这两个函数尤其需要关注heappop函数import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv:# 上下文管理这个 条件变量对象,方便资源回收(不需要我们管了) heapq.heappush(self._queue, (-priority, self._count, item))#heappush会将元素从小到大排列 # 以列表作为容器,元组作为元素组成的heapq; # 其中priority代表优先级(数字越大优先级越高),_count代表计数(理解为下标),item是真正的值 # 这里之所以取-priority,是因为将对象按照优先级从高到低的顺序排列 # heapq能够保证第一个元素的优先级最低,最后一个元素的优先级最高 self._count += 1#插入一个下标自增1 self._cv.notify()# 通知其他线程,使其wait()解阻塞 def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1] #跳出一个结果,这个结果为列表中的优先级最高的元素的item
回到Queue中,我们应该知道的是队列实现的线程间通信是单向的且不确定的过程,一般来说我们无法获知接收线程(消费者)何时会实际接受到消息并且开始工作,但是,queue却提供了这样的基本事件来完成这些功能,接下来重点说明其中的task_done()和join().from queue import Queue from threading import Thread #经典的生产者消费者模式: # 一个线程专门用于生产数据 def producer(out_q): while True: # 生产些数据 ... out_q.put(data) # 一个线程作为消费者用于取出queue中的数据 def consumer(in_q): while True: # 消耗数据 data = in_q.get() # 处理数据 ... # 告知生产者数据使用完成: in_q.task_done() #先生成队列 q = Queue() # 开辟两条线程 t1 = Thread(target=producer,args=(q,)) t2 = Thread(target=consumer,args=(q,)) # 线程启动 t1.start() t2.start() #等待所有消费者完成消费: q.join()
有一种情况,通过Event去监视所有消费者的消费情况,比如要确保消费者完成消费后才导入新的数据这种情况:from threading import Thread, Event from queue import Queue _a = "quit" def producer(out_q): times = 10 while times: data = "hello world!" evt = Event() out_q.put((data, evt)) print("producer is putting data"{}" and waitting consumer".format(data)) evt.wait() times -= 1 if times == 0: out_q.put((_a, evt)) break def consumer(in_q): while True: data, evt = in_q.get() print("consumer data is", data) evt.set() if data == _a: print("consumer end!") break q = Queue() Thread(target=producer,args=(q,)).start() Thread(target=consumer,args=(q,)).start() # 结果为: """ producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! consumer data is quit consumer end! Process finished with exit code 0 """
最后,你们需要知道,queue支持固定大小诸如Queue(N);同样get()和put()方法支持非阻塞和超时机制:import queue a = queue.Queue() try: data = a.get(block=False) except queue.Empty: ... try: a.put("hello",block=False) except queue.Full: ... try: data = a.get(timeout=5.0) except queue.Empty: ...
这两种情况都可以用来避免在特定的队列操作上无限期的阻塞下去的问题。比如,可以用put()的非阻塞版本和固定大小的队列来完成当队列满时进行不同类型的处理方式。例如,可以生成一条log,并且将数据丢弃:def producer(n): ... try: n.put(item,block=False) except queue.Full: log.warning("queue is full ,the value {} is discarded!".format(item))
如果想要消费者线程周期性地放弃q.get()这样的操作,以便于检查类似结束标记这样的情况,那么超时机制很有用。_flag = True def consumer(q): while _flag: try: data = q.get(timeout=5.0) except queue.Empty: pass
在Queue中提供了很多方便的方法,比如,q.qsize(),q.full(),q.empty(),这些方法可以帮助我们确定队列大小,队列状态,但是,这些方法在多线程
中尤其的危险,因为有可能这个线程运行时检测q.empty()确实为True,但是与此同时另外一个线程已经在队列中插入了一个元素,那么这就会导致出现意想不到的结果,且对于初学者而言,在没有对多线程有一定理解的基础上去实施,可能会导致放弃学习。所以请重视多线程,真的难。所以,多线程的代码尽量不要去以来这些函数。