范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Python中级精华线程间通信基础知识及常用的队列通信(Queue)

  常见的基本概念:
  首先普及下进程和线程的概念:
  进程:进程是操作系统资源分配的基本单位。
  线程:线程是任务调度和执行的基本单位。
  一个应用程序至少一个进程,一个进程至少一个线程。
  两者区别:同一进程内的线程共享本进程的资源如内存、I/O、cpu等,但是进程之间的资源是独立的。
  并发 & 并行:
  并发:在操作系统
  中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机
  上运行。简言之,是指系统具有处理多个任务的能力。
  并行:当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行
  (Parallel)。简言之,是指系统具有同时处理多个任务的能力。
  同步 & 异步:
  对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区
  中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核
  拷贝到进程中 (Copying the data from the kernel to the process)
  同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等)
  异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其他任务,一直等到数据接收成功,再回来处理。异步(例如发短信)
  总结:
  只要有一丁点阻塞,就是阻塞IO。异步IO的特点就是全程无阻塞。
  有些人常把同步阻塞和异步非阻塞联系起来,但实际上经过分析,阻塞与同步,非阻塞和异步的定义是不一样的。同步和异步的区别是遇到IO请求是否等待。阻塞和非阻塞的区别是数据没准备好的情况下是否立即返回。同步可能是阻塞的,也可能是非阻塞的,而非阻塞的有可能是同步的,也有可能是异步的。
  一般通用(非python层面)的进程线程间通信的常见方式:
  进程间通信:
  管道(pipe):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用进程间的亲缘关系
  通常是指父子进程关系。
  命名管道(named pipe/FIFO):命名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
  信号量
  (semophonre):信号量是一个计数器,可以用来控制多个进程队共享资源的访问。它常作为一个锁机制,防止某进程在访问共享资源时,其他进程也访问此资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
  消息队列(message queue):消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  信号
  (sinal):信号是一种比较复杂的通信方式,用于通知接受进程某个事件已经发生。
  共享内存(shared memory):共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的ipc通信方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往和其他通信方式如信号量,配合使用来实现进程间的同步和通信。
  套接字(socket):套接字也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同设备间的进程通信。
  全双工管道:共享内存、信号量、消息队列、管道和命名管道只适用于本地进程间通信,套接字和全双工管道可用于远程通信,因此可用于网络编程。
  线程间通信:
  锁机制:包括互斥锁、条件变量、读写锁
  互斥锁:提供了以排他方式防止数据结构被并发修改的方法。
  读写锁:允许多个线程同时共享数据,而对写操作是互斥的。
  条件变量:可以以原子的方式阻塞进程,直到某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。
  信号量机制(Semaphore):包括无名进程信号量和命名线程信号量
  信号机制(Signal):类似进程间的信号处理
  python中的安全线程通信手段queue:
  目的:
  程序中有多个线程,想在这些线程间实现安全的通信或者交换数据。
  方法:
  在python中将数据从一个线程发送到另外一个线程可能最安全的方法就是使用queue库中的Queue模块了。
  要想做到这些,首先就要创建一个Queue 的实例,所有线程会共享其。之后线程可以通过put()和get()方法来给队列添加和删除元素。
  例子:from queue import Queue from threading import Thread  #经典的生产者消费者模式:  # 一个线程专门用于生产数据 def producer(out_q):     while True:         # 生产些数据         ...         out_q.put(data)  # 一个线程作为消费者用于取出queue中的数据 def consumer(in_q):     while True:         # 消耗数据         data = in_q.get()         # 处理数据         ...  q = Queue() t1 = Thread(target=producer,args=(q,)) t2 = Thread(target=consumer,args=(q,)) t1.start() t2.start()
  Queue已经实现了所有所需的锁,因此它们可以安全地在任意多的线程之间进行共享。当使用队列时,如何对生产者和消费者的关闭过程进行同步协调需要一些技巧,这个问题的结局方法是使用一个特殊的终止值。当我们将它放入队列时,就使消费者退出:from threading import Thread import queue import time  _sentinel = object()  def producer(out_q):     times = 10     while times:         data = "hello world!"         print("producer send a data", str(times)+"times",sep="---")         out_q.put(data)         time.sleep(2)         times -= 1     out_q.put(_sentinel)  def consumer(in_q):     while True:         data = in_q.get()         if data is _sentinel:             in_q.put(_sentinel)# 用过之后还回队列,方便其他消费者线程使用;可以保证逐个去关闭其他线程             break         print("consumer recive a data:", data,sep="---")  q = queue.Queue() Thread(target=producer,args=(q,)).start() Thread(target=consumer,args=(q,)).start()  # 结果为: """ producer send a data---10times consumer recive a data:---hello world! producer send a data---9times consumer recive a data:---hello world! ... producer send a data---2times consumer recive a data:---hello world! producer send a data---1times consumer recive a data:---hello world!  Process finished with exit code 0 """
  尽管队列时线程间通信的最常见机制,但是只要添加了所需要的锁和同步功能,就可以构建自己的线程安全型的数据结构,最常见的做法是将你的数据结构和条件变量打包在一起,比如如下示例构建了一个优先级队列数据结构:
  这里需要借助优先级队列heapq库,对这个库做个简单说明:库中有两个最常用的方法,一个是heappop函数,另外一个是heappush函数;这两个函数尤其需要关注heappop函数import heapq import threading  class PriorityQueue:     def __init__(self):         self._queue = []         self._count = 0         self._cv = threading.Condition()      def put(self, item, priority):         with self._cv:# 上下文管理这个 条件变量对象,方便资源回收(不需要我们管了)             heapq.heappush(self._queue, (-priority, self._count, item))#heappush会将元素从小到大排列             # 以列表作为容器,元组作为元素组成的heapq;             # 其中priority代表优先级(数字越大优先级越高),_count代表计数(理解为下标),item是真正的值             # 这里之所以取-priority,是因为将对象按照优先级从高到低的顺序排列             # heapq能够保证第一个元素的优先级最低,最后一个元素的优先级最高             self._count += 1#插入一个下标自增1             self._cv.notify()# 通知其他线程,使其wait()解阻塞      def get(self):         with self._cv:             while len(self._queue) == 0:                 self._cv.wait()             return heapq.heappop(self._queue)[-1]             #跳出一个结果,这个结果为列表中的优先级最高的元素的item
  回到Queue中,我们应该知道的是队列实现的线程间通信是单向的且不确定的过程,一般来说我们无法获知接收线程(消费者)何时会实际接受到消息并且开始工作,但是,queue却提供了这样的基本事件来完成这些功能,接下来重点说明其中的task_done()和join().from queue import Queue from threading import Thread  #经典的生产者消费者模式:  # 一个线程专门用于生产数据 def producer(out_q):     while True:         # 生产些数据         ...         out_q.put(data)  # 一个线程作为消费者用于取出queue中的数据 def consumer(in_q):     while True:         # 消耗数据         data = in_q.get()         # 处理数据         ...         # 告知生产者数据使用完成:         in_q.task_done() #先生成队列 q = Queue() # 开辟两条线程 t1 = Thread(target=producer,args=(q,)) t2 = Thread(target=consumer,args=(q,)) # 线程启动 t1.start() t2.start() #等待所有消费者完成消费: q.join()
  有一种情况,通过Event去监视所有消费者的消费情况,比如要确保消费者完成消费后才导入新的数据这种情况:from threading import Thread, Event from queue import Queue  _a = "quit" def producer(out_q):     times = 10     while times:         data = "hello world!"         evt = Event()         out_q.put((data, evt))         print("producer is putting data"{}" and waitting consumer".format(data))         evt.wait()         times -= 1         if times == 0:             out_q.put((_a, evt))             break def consumer(in_q):     while True:         data, evt = in_q.get()         print("consumer data is", data)         evt.set()         if data == _a:             print("consumer end!")             break   q = Queue()  Thread(target=producer,args=(q,)).start() Thread(target=consumer,args=(q,)).start() # 结果为: """ producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! producer is putting data"hello world!" and waitting consumer consumer data is hello world! consumer data is quit consumer end!  Process finished with exit code 0  """
  最后,你们需要知道,queue支持固定大小诸如Queue(N);同样get()和put()方法支持非阻塞和超时机制:import queue a = queue.Queue()  try:     data = a.get(block=False) except queue.Empty:     ... try:     a.put("hello",block=False) except queue.Full:     ...  try:     data = a.get(timeout=5.0) except queue.Empty:     ...
  这两种情况都可以用来避免在特定的队列操作上无限期的阻塞下去的问题。比如,可以用put()的非阻塞版本和固定大小的队列来完成当队列满时进行不同类型的处理方式。例如,可以生成一条log,并且将数据丢弃:def producer(n): 	... 	try: 		n.put(item,block=False) 	except queue.Full: 		log.warning("queue is full ,the value {} is discarded!".format(item))
  如果想要消费者线程周期性地放弃q.get()这样的操作,以便于检查类似结束标记这样的情况,那么超时机制很有用。_flag = True  def consumer(q): 	while _flag: 		try: 			data = q.get(timeout=5.0) 		except queue.Empty: 			pass
  在Queue中提供了很多方便的方法,比如,q.qsize(),q.full(),q.empty(),这些方法可以帮助我们确定队列大小,队列状态,但是,这些方法在多线程
  中尤其的危险,因为有可能这个线程运行时检测q.empty()确实为True,但是与此同时另外一个线程已经在队列中插入了一个元素,那么这就会导致出现意想不到的结果,且对于初学者而言,在没有对多线程有一定理解的基础上去实施,可能会导致放弃学习。所以请重视多线程,真的难。所以,多线程的代码尽量不要去以来这些函数。

谷歌Pixel6发布在即,Pixel5史无前例沦为白菜机惨遭疯抢今天的主角无疑是iPhone13系列,但在沉默了约4年之后,官方谷歌新闻推特暂时打破沉默,发了推特等待Pixel6。但是后来,这条推文被撤了,账号也关闭了。但是,谷歌也利用这个机会财务丑闻发生后瑞幸咖啡董事长首发声今天更要元气满满!昨晚(3日),瑞幸咖啡公告承认伪造交易22亿。随后,瑞幸咖啡盘中经历八次熔断,一度暂停交易,最终股价暴跌75。6,市值缩水至16亿美元。今天中午,中国证券监督管理委员会官方发布声明易图通CEO陆洪彬自动驾驶仿真测试管理探索2021年1月15日,主题为智行之路数字相助的戴尔科技集团汽车行业客户高峰论坛在上海华尔道夫酒店如期召开,易图通CEO陆洪彬受邀参加本次活动并发表了重要演讲,并正式发布易图通自动驾不负韶华,砥砺前行2020年箩筐技术年度盘点时光匆匆,经不住似水流年,岁月变迁。转眼间,跌宕起伏的2020年告一段落。流光一瞬,他鞭策着我们不断向前的同时,也提醒着我们回首审视走过的路径。这一年注定是非同寻常,新冠疫情成为主雷军旗下金山云启动上市发行价区间确定,小米有意购买3。5亿元5月5日消息,今天雷军旗下金山云向美国证券交易委员会(SEC)提交的IPO(首次公开招股)招股书,计划在纳斯达克交易所上市,代码为KC。招股书显示,金山云将发行2500万股美国存托名企头条拼多多最新股权披露格力员工大厦3700套房已建成格力员工大厦3700套房已建成员工每人一套房目标不变尽管第一季度遭受300亿亏损,但董明珠员工每人一套房的目标不变。4月25日,董明珠在全球木兰论坛上表示,格力3700套的员工大厦名企头条华为5G专利申请全球第一小米日本就核弹宣传道歉名企头条华为5G专利申请全球第一小米日本就核弹宣传道歉路透社美国商务部拟允许美国公司与华为合作据路透社报道,美国商务部接近签发一份新规,以允许美国企业和华为公司共同参与新一代5G网LK分享车载毫米波雷达的虚拟测试仿真智能汽车传感器的虚拟测试仿真是智能驾驶整车在环或硬件在环测试的重要一环。针对毫米波雷达的虚拟测试的需要,本文研究了传感器注入法与黑盒模拟法在毫米波雷达测试中的性能差异。随着智能交通卫星告诉你,真实的山海情到底什么样最近,扶贫剧山海情火了,剧中故事发生地宁夏永宁县闽宁镇也备受关注。从苦瘠甲天下的西海固到宁夏平原的闽宁镇,我们从卫星视角带你感受闽宁镇从无到有从贫到富波澜壮阔的变迁。吊庄移民的西海日媒华为在硬件上已具备美国制裁抗压能力就差鸿蒙系统了日媒华为在硬件上已具备美国制裁抗压能力就差鸿蒙系统了!今天(16),美国商务部工业与安全局(BIS)官方宣布,将严格限制华为使用美国的技术软件设计和制造半导体芯片进而保护美国国家安超擎如何为交通出行赋能?1807年,世界上第一艘蒸汽机船克莱蒙特号在纽约哈德孙河下水。蒸汽机的发明是人类历史上一个重要里程碑。由于动力的改变,交通与大众出行有了突飞猛进的发展短短数百年,人类不仅能上天(飞
OnePlusBudsZ2真无线耳塞的渲染图像和发布时间表曝光在发布令人印象深刻的OnePlusBudsZ和OnePlusBudsPro真无线耳机产品之后,该公司显然正在开发其继任者,新品将被命名为OnePlusBudsZ2。这将是该品牌的下配置小改体验微创新雷蛇巴塞利斯蛇V3鼠标怎么样?巴塞利斯蛇V2鼠标上市有一段时间了,近日,雷蛇将这款产品更新至RazerBasiliskV3巴塞利斯蛇V3游戏鼠标,它在老款巴塞利斯蛇鼠标的模具基础上小改,并加入性能更强的引擎,以小户型看片神器,极米NEWZ6X家用投影仪让生活更温馨有多久没有和家人在同一个屏幕下一起娱乐了呢?手机几乎占据了我们所有的生活空间,尤其是租房的年轻男女们,在大屏下一起看剧重拾温馨成了一个难题。今天要为大家介绍的极米NEWZ6X家用投国产芯片巨头干得真漂亮,高通骁龙888和骁龙888要被反超在全球移动芯片领域,高通拥有压倒性的优势。在华为被断供芯片之前,安卓机市场尚且还有麒麟芯片可以跟骁龙芯片抗衡,而如今华为麒麟芯片也几乎成为了绝唱,国内安卓机市场几乎成为了高通的盘中发布两个月跌至1999元,6nm旗舰芯120Hz直屏,为何却无人问津?大家都知道现在很多厂商都喜欢搞什么子品牌,为的就是细分市场好去吸引消费者,本体则可以冲上高端卖出更贵的价格,子品牌基本上都是主打性价比,今天我们就来聊聊近两年来势头迅猛的realm一款老少咸宜的相机!尼康Zfc在日本大受好评自尼康发布复古微单相机Zfc后,迅速成为摄影市场的新人王,大家对这款相机也有相当不俗的评价,而在日本本土市场,尼康Zfc的销量同样报捷,日经日前的报导表示,尼康Zfc是一款可以吸引变频空调和定频空调该如何选择?为什么我选定频空调家人都不开心现在的家用空调种类有中央空调和分体空调,而分体空调中又分为变频空调和定频空调。由于装修之初没有安装中央空调,现在只能安装分体空调,可是选择分体空调的时候遇到一大难题,不知该选变频空上海游戏公司抢人大战应届生年薪开到60万,游戏从业者迎来黄金时代?本文来源时代周报作者郑栩彤去年至今,游戏企业涨薪异常疯狂。最集中涨薪和抢人的是上海,最稀缺的岗位是制作人TA(技术美术)和引擎。在上海某些公司,应届生从事TA和引擎可拿五六十万年薪大学老师说没有写过一千行以上代码的程序就别想上大公司大学老师说没有写过一千行以上代码的程序就别想上大公司这种说法对吗?如果说一千行代码都没写过基本上说明你没有受过充分的训练,很难说你在编程上面到底有没有什么造诣。当然这句话并不是说你中国移动与中国广电喜结连理2分钟前中国移动与中国广电签署有关协议规定中移动先行承担约定范围内700M无线网络全部建设费用,并先行享有上述无线网络资产所有权,双方均享有700M无线网络使用权,中国广电在条件具设计模式5依赖倒转原则依赖倒转原则A。高层模块不应该依赖底层模块,两个都应该依赖抽象B。抽象不应该依赖细节,细节应该依赖于抽象说白了,就是要针对接口编程,不要对实现编程高层模块依赖底层模块的例子面向过程