这篇文章主要介绍AMQP 0-9-1 协议,是RabbitMQ支持的协议之一,理解AQMP对于使用和理解RabbitMQ也很有帮助。 AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,它使客户端应用程序能与消息中间件进行通信。消息中间件接收Producer的消息,并将消息路由到Queue,Consumer订阅Queue,就能消费到消息。从AMQP 0-9-1 模型看,Producer发布消息到Exchange(交换器),Exchange根据绑定规则,将消息路由到一个或多个Queue。消息中间件把消息推送给订阅该Queue的Consumer,或者由Consumer从Queue上拉取消息。 下图直观的展示了AMQP 0-9-1 协议中的角色和消息主要流转过程,其中Exchange、Bindings、Qeue是AMQP 0-9-1的主要实体。 Exchange AMQP 0-9-1 定义了Exchange,用来把消息路由到Queue,路由算法依赖 Binding Rule 和 Exchange 类型,AMQP 0-9-1 提供了4种Exchange类型。 Exchange type 默认的预定义的名字 Direct exchange (Empty string) and amq.direct Fanout exchange amq.fanout Topic exchange amq.topic Headers exchange amq.match (and amq.headers in RabbitMQ) Default Exchange 默认的Exchange是没有名字的,它是一种特殊的Direct Exchange。所有的Queue创建完,如果没有手动设置绑定关系,自动绑定到默认的Exchange,Routing Key 和 Queue 名字相同。 Direct Exchange Direct Exchange 要求发布消息时设置的Routing Key 必须与 Binding Key 一致,才能路由到Queue。它常用的场景是分发消息,根据Routing Key,把消息分发给不同的Consumer。 Fanout Exchange Fanout Exchange忽略Binding Key,会把消息路由给所有与它绑定的Queues,类似于广播消息。 Topic Exchange Topic Exchange是一种比较灵活的Exchange,采用模式匹配的方式路由消息,具体的模式匹配规则由各个消息中间件实现。 RabbitMQ的实现规则在下一篇文章中更新,小伙伴慢记得关注哦。 Headers Exchange Headers Exchange根据消息的属性来路由消息,比如设置绑定的属性是x-product=product1,那么只有消息上有x-product=product1这个属性值时才会路由到队列。 Bindings Bindings是Exchange和Queue绑定的具体规则,Exchange就是根据这个Bindings路由消息到Queue的。发布消息时的Routing Key必须和Binding Key匹配,消息才能正确路由到Queue。 Queue Queue是实际保存消息的地方,AMQP 0-9-1规定队列名字的长度不能超过255个字节。如果客户端没有指定队列名字,Broker会自动生成队列名字,不管是哪种方式,必须保证队列名字唯一。如果申明的队列已经在Broker存在,那么就会报403错误。 持久性 AMPQ 0-9-1 规定,队列可以申明为持久化和非持久化的,持久化队列会保存到磁盘,包括它的元数据。非持久化的队列只是保存在内存中。 Consumer AQMP 0-9-1 规定消费者有两种方式消费消息: Consumer订阅,Broker推送消息给Consumer,这种方式也是推荐的一种实现。优点是获取消息实时,缺点是失去了对消息控制的灵活性,比如根据机器的性能决定一次消费多少消息。 Consumer拉取消息,优点是客户端可控性高,缺点是无法准确获知消息何时到达。 消息确认 由于网络是不可靠的,当Broker推送消息给Consumer后,无法知道这条消息是否已经被Consumer接收并正确处理了,Broker也无法判断是否可以移除这条消息。所以AMQP 0-9-1 定义了消息确认机制,有两种不同的模式:在Broker发出消息之后,这种方式容易导致消息丢失。在应用程序返回ack之后。 默认是自动确认,也支持手动确认,推荐使用手动确认,因为客户端程序可以自己控制在何时进行消息确认。拒绝消息 消费者处理消息有可能失败,或者当前业务逻辑无法正确处理,就可以拒绝消息,此时,消息可能会被丢弃,也有可能重新入队,取决客户端的实现。注意,当队列只有一个订阅者,避免由于处理逻辑不当造成死循环。 批量拒绝消息 消费者可以批量接受消息,如果这批消息都无法处理,或者都处理失败,可以批量拒绝消息。 消息接收数量 当一个队列有多个消费者订阅时,定义每个消费在下一个ack之前能接收的最大消息数量,可以实现消息的负载均衡和提高吞吐量。 消息属性 AMQP 0-9-1 规范定义消息有元数据和消息体,元数据就是消息的属性,比如: Content type Content encoding Routing key Delivery mode (persistent or not) Message priority Message publishing timestamp Expiration period Publisher application id 消息体就是发送到队列的具体数据,以二进制格式发送,broker接收到消息,不会处理它,根据客户端的设置,保存到磁盘或者内存。 函数列表 AMQP 0-9-1 定义了非常丰富的函数,比如跟exchange相关的如下: exchange.declare exchange.declare-ok exchange.delete exchange.delete-ok 完整的函数定义请参考官方 手册:https://www.rabbitmq.com/amqp-0-9-1-reference.htmlConnection AMQP 0-9-1 规范定义了连接是建立在TCP的可靠性连接,并且是长连接。当一个客户端不需要连接Broker,AQMP 0-9-1 规范是应该关闭AMQP的连接,而不是底层的TCP连接。 Channel 客户端有可能需要和Broker建立多个连接,那么就需要建立多个TCP连接,而这是很消耗资源和时间的。所以AMQP 0-9-1 规范定义了通道,通道是建立在连接之上的,一个连接可以有多个通道。客户端需要建立多个连接时,可以创建多个通道,每个通过都有唯一的ID。 当连接关闭时,在这个连接之上的所有通过都会关闭。虚拟主机 这里的虚拟主机的概念是AQMP 0-9-1 中定义的,在一个Broker中(可以理解为一个服务)创建多个环境,用于资源隔离,每个环境中都有各自的Exchange、Bindings、Queue、用户、元数据等等,环境之间资源不会共享。 好了,以上就是关于AMQP 0-9-1 的介绍。 RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。