1。1队列实现进程间通信1。1。1Queue常用类 类 说明 Queue(〔maxsize〕) 创建共享的进程队列,maxsize是队列中允许的最大项,省略则队列无大小限制1。1。2Queue常用实例q常用方法 方法 说明 q。canceljointhread() 不会在进程退出时自动连接后台线程,可防止jointhread()方法阻塞 q。jointhread() 连接队列的后台线程。此方法用于q。close()方法之后,等待所有队列项被消耗完。调用q。canceljointhread()方法可以禁止这种行为 q。close() 关闭队列。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据 q。empty() 如调用此方法时q为空,返回True。此方法返回结果不可靠,在返回和使用结果之间,队列有可能已经加入新的项 q。full() 如调用此方法时q已满,返回Ture。同样此方法返回结果不可靠 q。get(〔block〔,timeout〕〕) 返回q中的一项,如果q为空,此方法将阻塞,直到队列中有项可用为止。block用于控制阻塞行为,默认为True。如果设为False,将引发Queue。Empty异常,timeout为可选超时时间 q。getnowait() 等同于q。get(False)方法 q。put(item〔,block〔,timeout〕〕) 将item放入队列中,如果队列已满,此方法将阻塞至有可用空间为止。block控制阻塞行为,默认为True。如果设置为False,将会引发Queue。Full异常。timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue。Full异常 q。putnowait(item) 等同于q。put(item,False)方法 q。qsize() 返回队列中项的数量,结果不可靠1。1。3JoinableQueue 类 说明 JoinableQueue(〔maxsize〕) 创建可连接的共享进程队列,类似Queue的一个对象,但它允许项的消费者通知生产者项已被成功处理1。1。4JoinableQueue实例除Queue实例方法处的方法 方法 说明 q。taskdone() 消费者使用此方法发出信号,表示q。get()返回项已经被成功处理,如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常 q。join() 生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均被调用q。taskdone()方法为止1。1。5JoinableQueue实现进程间通信示例importmultiprocessingdefconsumer(inputq):消费者whileTrue:iteminputq。get()处理项print(f处理inputq:{item})发出信号通知任务完成inputq。taskdone()defproducer(sequence,outputq):生产者foriteminsequence:outputq。put(item)ifnamemain:qmultiprocessing。JoinableQueue()运行消费者进程conspmultiprocessing。Process(targetconsumer,args(q,))consp。daemonTrueconsp。start()运行生成者进程sequence(1,2,3,4)producer(sequence,q)等待所有项被处理完成q。join()1。1。6JoinableQueue实现多个消费都进程池间通信示例frommsilibimportsequenceimportmultiprocessingdefconsumer(inputq):消费者whileTrue:iteminputq。get()处理项print(f处理inputq:{item})发出信号通知任务完成inputq。taskdone()defproducer(sequence,outputq):生产者foriteminsequence:outputq。put(item)ifnamemain:qmultiprocessing。JoinableQueue()多个进程运行消费者模型consp1multiprocessing。Process(targetconsumer,args(q,))进程1处理项consp1。daemonTrueconsp1。start()consp2multiprocessing。Process(targetconsumer,args(q,))进程2处理项consp2。daemonTrueconsp2。start()运行生产者sequence(1,2,3,4)producer(sequence,q)等待所有项被处理q。join()1。1。7哨兵模式实现多进程间通信示例fromastimportargfromaudioopimportmulfrommsilibimportsequenceimportmultiprocessingimportsqlite3fromtkinter。messageboximportNOdefconsumer(inputq):whileTrue:iteminputq。get()处理哨兵ifitemisNone:break处理项print(f处理inputq:{item})关闭print(consumer处理完成!)defproducer(sequence,outputq):foriteminsequence:outputq。put(item)ifnamemain:qmultiprocessing。Queue()启动消费者进程conspmultiprocessing。Process(targetconsumer,args(q,))consp。start()生产者sequence(1,2,3,4)producer(sequence,q)在队列上安置哨并q。put(None)等待消费者进程关闭consp。join() 注意:如果使用哨兵,一定要在队列上为每个消费都都安置哨兵,才能保证所有消费者进程都能关闭。1。2管道实现进程间通信1。2。1管道类 类 说明 Pipe(〔duplex〕) 在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1和conn2表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法1。2。2Pipe()返回的Connection对象实例c具有的方法和属性 方法 说明 c。close() 关闭连接。如果c被垃圾回收,将自动调用此方法 c。fileno() 返回连接使用的整数文件描述符 c。pull(〔timeout〕) 如果连接上的数据可用,返回True,timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达 c。recv() 接收c。send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 c。recvbytes(〔maxlength〕) 接收c。sendbytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 c。recvbytesinto(buffer〔,offset〕) 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常 c。send(obj) 通过连接发送对象。obj是与序列化兼容的任意对象 c。sendbytes(buffer〔,offset〔,size〕〕) 通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c。recvbytes()函数进行接收1。2。3管道实现生产者消费都模型示例importmultiprocessingdefconsumer(pipe):outputp,inputppipeinputp。close()关闭管道的输入端whileTrue:try:itemoutputp。recv()exceptEOFError:breakTODO可替换为有用的工作print(item)关闭print(consumerdone。)defproducer(sequence,inputp):foriteminsequence:inputp。send(item)ifnamemain:创建管道(outputp,inputp)multiprocessing。Pipe()启动消费者进程conspmultiprocessing。Process(targetconsumer,args((outputp,inputp),))consp。start()关闭生产者中的输出管道outputp。close()生产项sequence(1,2,3,4)producer(sequence,inputp)半闭输入管道,表示输入完成inputp。close()等待消费者进程结束consp。join() 说明:如果生产者或者消费者中都没有使用管道的某个端点,就就将其关闭。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点1。2。4管道双向通信示例importmultiprocessingfromunittestimportresultdefadder(pipe):serverp,clientppipeclientp。close()whileTrue:try:x,yserverp。recv()exceptEOFError:breakresultxyTODO可用实例业务代替serverp。send(result)关闭print(Serverdone。)ifnamemain:生成管道(serverp,clientp)multiprocessing。Pipe()启动服务器进程adderpmultiprocessing。Process(targetadder,args((serverp,clientp),))adderp。start()关闭客户端中的服务器管道serverp。close()客户端发出请求clientp。send((3,4))print(clientp。recv())clientp。send((Hello,World))print(clientp。recv())完成,关闭管道clientp。close()等待消费者进程完成adderp。join()