范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

厉害!一文了解消息中间件RabbitMQ

  一、基础知识1. 什么是RabbitMQ
  RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。
  2.什么是消息和队列
  1.消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据
  2.队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)
  3.什么是消息队列
  1.消息队列指:一端进消息,一端出消息
  2.RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。
  4.什么地方使用RabbitMQ
  1.在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果
  2.在微服务架构中,UI与微服务通信,主要是通过Http或者gRPC同步通信
  问题分析
  在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。
  1.高并发请求导致系统性能下降响应慢,同时数据库承载风险加大
  2.扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降
  3.瞬时流量涌入巨大的话,服务器可能直接挂了
  解决方案
  为了解决性能瓶颈的问题。我们需要将同步通信方式换成异步通信方式。因此就使用消息队列,用户在UI中操作直接写入RabbitMQ然后直接返回,剩下的业务操作由消息队列和各自的微服务来完成
  RabbitMQ的优势 异步处理,响应快,增加了数据库(服务器的承载能力) 削峰,可以把流量的高峰分解到不同的时间段来处理 解耦(扩展性就更强),让UI和业务独立演化 高可用,处理器如果发生故障了,对其他的处理器没有影响
  RabbitMQ的不足 增加了系统复杂性,不方便调试和开发,在使用RabbitMQ以前前端直接和服务交互,现在加了一层 即时性降低了,在某一程度上提升了用户操作体验,也降低了用户体验,但是避免不了,取长补短 更加依赖消息队列了
  5.RabbitMQ组成概念
  1.ConnectionFactory 为Connection的制造工厂。
  2.Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
  3.Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
  4.Exchange(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部) 二、如何落地RabbitMQ1.RabbitMQ环境安装
  1.下载RabbitMQ
  2.运行环境erlang
  3.安装完成之后,加载RabbitMQ管理插件 rabbitmq-plugins enable rabbitmq_management
  4.安装成功访问RabbitMQ管理后台http://localhost:15672 2.创建系统业务
  1.分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色
  2.创建员工管理网站用于模拟前端调用,主要充当生产者角色
  3.在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQ.Client
  4.在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码  //创建连接  using (var connection = factory.CreateConnection())  {      //创建通道      var channel = connection.CreateModel();      //定义队列      channel.QueueDeclare("CreateAttendance", false, false, false, null);       string json = JsonConvert.SerializeObject(attendanceDto);       //创建内容对象      var properties = channel.CreateBasicProperties();      //发送消息      channel.BasicPublish(exchange: "",routingKey: "CreateAttendance",basicProperties: properties,body: Encoding.UTF8.GetBytes(json));  }
  5.在考勤微服务中创建接口,并在接口中加入消费者代码 var connection = factory.CreateConnection(); var channel = connection.CreateModel();    //创建消费者事件 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {     var body = ea.Body;     // 1、逻辑代码,添加到数据库     var message = Encoding.UTF8.GetString(body.ToArray());     object json = JsonConvert.DeserializeObject(message);     Console.WriteLine(" [x] 创建考勤信息 {0}", message); }; //设置消费者属性 //p1.监听队列p2.消息确认ACK p3.消费者实例赋值 channel.BasicConsume(queue: "CreateAttendance",autoAck: false,consumer:consumer);
  三、Exchange交换机及实例分析1.Fanout Exchange (扇形交换机)
  fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
  1.生产者一个Exchange对应多个Queue,或者不声明Queue
  2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息
  业务实例
  当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机,一个生产者,多个消费者.
  生产者模拟使用调用控制器来实现 [HttpPost] public IEnumerable CreateLeave(CreateLeaveDto createLeaveDto) {     var factory = new ConnectionFactory()     {         HostName = "192.168.0.106",         Port = 5672,         Password = "guest",         UserName = "guest",         VirtualHost = "/"     };     using (var connection = factory.CreateConnection())     {         var channel = connection.CreateModel();         //定义交换机         channel.ExchangeDeclare(exchange: "Leave_fanout", type: "fanout");         string productJson = JsonConvert.SerializeObject(createLeaveDto);         var body = Encoding.UTF8.GetBytes(productJson);         var properties = channel.CreateBasicProperties();         //设置消息持久化         properties.Persistent = true;          channel.BasicPublish(exchange: "Leave_fanout", routingKey: "",  basicProperties: properties,body: body);     }  }
  消费者实现IHostedService 接口创建一个监听主机 public class RabbitmqHostService : IHostedService { 	  public Task StartAsync(CancellationToken cancellationToken)       {             var factory = new ConnectionFactory()             {                 HostName = "localhost",                 Port = 5672,                 Password = "guest",                 UserName = "guest",                 VirtualHost = "/"             };            	var connection = factory.CreateConnection();             var channel = connection.CreateModel();              // 1、定义交换机             channel.ExchangeDeclare(exchange: "Leave_fanout", type: ExchangeType.Fanout);             //定义随机队列             var queueName = channel.QueueDeclare().QueueName;	                //队列和交换机绑定             channel.QueueBind(queueName,"Leave_fanout",routingKey: "");             var consumer = new EventingBasicConsumer(channel);            consumer.Received += (model, ea) =>            {                Console.WriteLine(#34;model:{model}");                var body = ea.Body;                // 1、业务逻辑                var message = Encoding.UTF8.GetString(body.ToArray());                Console.WriteLine(" [x] 创建请假 {0}", message);                 // 1、自动确认机制缺陷,消息是否正常添加到数据库当中,所以需要使用手工确认                channel.BasicAck(ea.DeliveryTag, true);           };                      // Qos(防止多个消费者,能力不一致,导致的系统质量问题。           // 每一次一个消费者只成功消费一个)           channel.BasicQos(0, 1, false);            // 消息确认(防止消息消费失败)           channel.BasicConsume(queue: queueName ,autoAck: false,consumer: consumer);       }              public Task StopAsync(CancellationToken cancellationToken)       {          // 1、关闭rabbitmq的连接          throw new NotImplementedException();       } }
  2.Direct Exchange (直连交换机)
  直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者
  1.生产者一个Exchange对应一个routingKey绑定,也可以声明队列并绑定,然后向指定的队列发送消息。
  2.消费者需要定义Exchange和routingKey,如果生产者声明并绑定了队列,那消费者必须绑定生产者指定的Queue来接收消息,如果没有指定Queue,那消费者需要自己声明一个随机Queue然后绑定用于接收消息
  当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用"扇形交换机"了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者.
  生产者模拟使用调用控制器来实现 public IEnumerable SendCalculateSalary(CalculateSalaryDto calculateSalaryDto) {  var factory = new ConnectionFactory()  {      HostName = "192.168.0.106",      Port = 5672,      Password = "admin",      UserName = "admin",      VirtualHost = "/"  };  using (var connection = factory.CreateConnection())  {      var channel = connection.CreateModel();      //2、定义交换机      channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: "direct");       string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);      var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);       //3、发送消息      var properties = channel.CreateBasicProperties();      properties.Persistent = true; // 设置消息持久化      //p1 指定交换机      //p2 routingKey       channel.BasicPublish(exchange: "CalculateSalary_direct",routingKey: "product-sms",basicProperties: properties,body: body);  } }
  消费者实现IHostedService 接口创建一个监听主机 public class RabbitmqHostService : IHostedService {       public Task StartAsync(CancellationToken cancellationToken)       {          var factory = new ConnectionFactory()          {              HostName = "localhost",              Port = 5672,              Password = "guest",              UserName = "guest",              VirtualHost = "/"          };             	var connection = factory.CreateConnection(); 	var channel = connection.CreateModel();  	// 1、定义交换机 	channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: ExchangeType.Direct);  	// 2、定义随机队列 	var queueName = channel.QueueDeclare().QueueName;  	// 3、队列要和交换机绑定起来 	channel.QueueBind(queueName,"CalculateSalary_direct",routingKey: "product-sms");  	var consumer = new EventingBasicConsumer(channel);         consumer.Received += (model, ea) =>         {             Console.WriteLine(#34;model:{model}");             var body = ea.Body;             // 1、业务逻辑             var message = Encoding.UTF8.GetString(body.ToArray());             Console.WriteLine(" [x] 发送短信 {0}", message);             // 1、消息是否正常添加到数据库当中,所以需要使用手工确认             channel.BasicAck(ea.DeliveryTag, true);         };             // 3、消费消息             channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。             // autoAck设为false 不进行自动确认                                  channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);       }              public Task StopAsync(CancellationToken cancellationToken)       {          // 1、关闭rabbitmq的连接          throw new NotImplementedException();       } }
  3.Topic Exchange (主题交换机)
  Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符; 或者# , 匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景
  1.生产者定义Exchange,然后不同的routingKey绑定
  2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue以及routingKey绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息,
  3.消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms.开头, * 号只能匹配的routingKey为一级,例如(sms.A)或(sms.B)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (sms.A)或者(sms.A.QWE.IOP)
  在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费
  分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息 var factory = new ConnectionFactory()  {      HostName = "192.168.0.106",      Port = 5672,      Password = "admin",      UserName = "admin",      VirtualHost = "/"  };  //计算薪资生产者 public IEnumerable SendCalculateSalary(CalculateSalaryDto calculateSalaryDto) { using (var connection = factory.CreateConnection())  {      var channel = connection.CreateModel();      //2、定义topic交换机      channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");       string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);      var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);       //3、发送消息      var properties = channel.CreateBasicProperties();      properties.Persistent = true; // 设置消息持久化      //p1 指定交换机      //p2 routingKey       channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateSalary",basicProperties: properties,body: body);  } }  //考勤生产者 public IEnumerable SendCalculateAttendance(CalculateAttendanceDto calculateAttendance) { using (var connection = factory.CreateConnection())  {      var channel = connection.CreateModel();      //2、定义topic交换机      channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");       string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);      var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);       //3、发送消息      var properties = channel.CreateBasicProperties();      properties.Persistent = true; // 设置消息持久化      //p1 指定交换机      //p2 routingKey       channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);  } }  //请假信息生产者 public IEnumerable SendCalculateLeave(CalculateLeaveDto calculateLeave) { using (var connection = factory.CreateConnection())  {      var channel = connection.CreateModel();      //2、定义topic交换机      channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");       string calculateLeaveJson = JsonConvert.SerializeObject(calculateLeave);      var body = Encoding.UTF8.GetBytes(calculateLeaveJson);       //3、发送消息      var properties = channel.CreateBasicProperties();      properties.Persistent = true; // 设置消息持久化      //p1 指定交换机      //p2 routingKey       channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);  } } public class RabbitmqHostService : IHostedService { 	  public Task StartAsync(CancellationToken cancellationToken)       {             var factory = new ConnectionFactory()             {                 HostName = "localhost",                 Port = 5672,                 Password = "guest",                 UserName = "guest",                 VirtualHost = "/"             };             	              var connection = factory.CreateConnection();                       var channel = connection.CreateModel();  			// 1、定义交换机 			channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);  			// 2、定义随机队列 			var queueName = channel.QueueDeclare().QueueName;  			// 3、队列要和交换机绑定起来 			// * 号的缺陷:只能匹配一级             // # 能够匹配一级及多级以上  			channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");  			var consumer = new EventingBasicConsumer(channel);             consumer.Received += (model, ea) =>             {                 Console.WriteLine(#34;model:{model}");                 var body = ea.Body;                 // 1、业务逻辑                 var message = Encoding.UTF8.GetString(body.ToArray());                 Console.WriteLine(" [x] 发送短信 {0}", message);                 // 1、消息是否正常添加到数据库当中,所以需要使用手工确认                 channel.BasicAck(ea.DeliveryTag, true);             };             // 3、消费消息             channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。             // autoAck设为false 不进行自动确认                                  channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);       }              public Task StopAsync(CancellationToken cancellationToken)       {          // 1、关闭rabbitmq的连接          throw new NotImplementedException();       } }
  4.Header Exchange(头部交换机)
  headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
  在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。
  1.不需要依赖Key
  2.更多的时候,像这种Key Value 的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Key value ,从而达到消息灵活转发到不同的队列中去 四、RabbitMQ消息确认
  我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。
  1.生产端
  2.存储端
  3.消费端 1.消息生产端
  我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制
  模式
  描述
  实现方式
  Confirm模式
  应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示消息确认收到
  异步模式,在应答之前,可以继续发送消息,单条消息、批量消息
  Tx事务模式
  基于AMQP协议;可以把channel 设置成一个带事务的通道道,分为三步:1.开启事务,提交事务,回滚事务
  同步模式,在事务提交之前不能继续发送消息,事务模式效率差一些 1.Confirm 实现 using (var connection = factory.CreateConnection()) {     var channel = connection.CreateModel();     //2、定义topic交换机     channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");      string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);     var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);      //3、发送消息     var properties = channel.CreateBasicProperties();     properties.Persistent = true; // 设置消息持久化      try     {         //开启消息确认模式         channel.ConfirmSelect();         channel.BasicPublish(exchange: "sms_topic",          routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);         //如果一条消息或多消息都确认发送 	    if (channel.WaitForConfirms())          {            Console.WriteLine(#34;【{message}】发送到Broke成功!");         }         else         {             //可以记录个日志,重试一下;         }         //如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;         channel.WaitForConfirmsOrDie();     }     catch (Exception ex)     {	     	 Console.WriteLine(#34;【{message}】发送到Broker失败!");     } } 2.Tx事务 实现   using (var connection = factory.CreateConnection())   {       var channel = connection.CreateModel();       //2、定义topic交换机       channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");          string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);       var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);          //3、发送消息       var properties = channel.CreateBasicProperties();       properties.Persistent = true; // 设置消息持久化          try       {           //开启事务机制,AMQP协议支持           channel.TxSelect(); //事务是协议支持的           channel.BasicPublish(exchange: "sms_topic",            routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);           //提交事务 只有事务提交了才会真正写入队列           channel.TxCommit();       }       catch (Exception ex)       {	       	//事务回滚     		 channel.TxRollback();        }   } 2.消息存储端
  我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用
  1.持久化消息
  2.集群 3.消息消费端1.消费者宕机,导致消息丢失 2.执行业务逻辑失败,但是消息已经被消费
  当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制
  模式
  描述
  特点
  自动确认 autoAck
  自动确认,是消费消息的时候,只要收到消息,就直接回执给RabbitMQ,已经收到一切正常; 直接总览所有了,如果有1w条消息,只是消费成功了一条消息,RabbitMQ也会认为你是全部成功了,会将所有消息从队列中移除;这样会导致消息的丢失
  处理很快
  手动确认
  消费者消费一条,回执给RabbitMQ一条消息,RabbitMQ 只删除当前这一条消息,相当于是一条消费了,删除一条消息;
  性能稍微低一些
  1.自动确认 // 消息自动确认机制 channel.BasicConsume(queue: "CreateAttendance",autoAck: true, consumer: consumer);
  2.手动确认
  消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失 var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");  var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {       var message = Encoding.UTF8.GetString(ea.Body.ToArray());       //执行业务逻辑              //手工确认告诉borker可以删除消息了       channel.BasicAck(ea.DeliveryTag, true);        //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;       //channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true); };  // autoAck设为false 不进行自动确认                      channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer); 3.由于服务器性能不一致导致消息堆积 生产者发送高并发消息,消费者来不及处理,导致消息堆积,如何解决消息堆积问题?可以使用消费服务集群,将压力分散到不同的服务实例能解决这个问题,但是又产生了一个新的集群缺陷问题,假设集群服务器的强弱不一致,比较弱的服务器处理消息慢,就会导致大部分消息堆积在这台性能较差的服务器,那又该如何解决呢?
  我们可以采用RabbitMQ的QOS功能,俗称限流,他的意思就是消费者一次可以拉取指定数量的消息,在这些消息未处理完毕之前,不会再向队列拉取消息。 // Qos(防止多个消费者,能力不一致,导致的系统质量问题。 // 每一次一个消费者只成功消费一个) channel.BasicQos(0, 1, false);  4.如何保证消息不被重复消费(幂等性) 1.生产时消息重复 由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ
  已经接收到了消息。这时候生产者就会重新发送一遍这条消息。生产者中如果消息未被确认,或确
  认失败,我们可以使用定时任务+(redis/db)来进行消息重试。 2.消费时消息重复 消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
  我们可以让每个消息携带一个全局的唯一ID,即可保证消息的幂等性消费者获取到消息后先根据id去查询redis/db是否存在该消息。如果不存在,则正常消费,消费完毕后写入redis/db。
  如果存在,则证明消息被消费过,直接丢弃。
  原文链接:https://www.cnblogs.com/yuxl01/p/15978229.html
  觉得本问对你有帮助的小伙伴,点个赞转发一下吧!

NFC功能真的实用?卢伟冰大力推行,红米千元机也会标配科技进步的脚步从未停下,随着技术的提升,我们的日常生活方式也有了很大改善,就拿智能手机来说,各种各样的新功能出现,当大家适应之后竟然发现意外的好用,并且一直保留至今,比如红外遥控和老妈要换一款长续航千元机千元机推荐第一波老妈今天说要换一款千元新手机,要求续航好耐用就行真我Q3立马浮现在我的脑子里面。oppo品牌是国产手机中的佼佼者,通过多年发展,这个品牌在品控上做到了极致,要说国产手机品质那还真得小米OV都能用?华为鸿蒙OS将开源,部分功能比安卓更强点击右上方关注,第一时间获取科技资讯技能攻略产品体验,私信我回复01,送你一份玩机技能大礼包。2019年,华为正式公布harmonyOS,当时就对外称,系统将开源。5月24日,在鸿明确了!数字人民币不会替代支付宝,你会怎么选择?关注币圈的人们可能非常清楚,作为人民币数字化的产物,数字人民币自从2014年以来就不断有消息传来,但从2020年起,随着数字人民币在深圳苏州等城市开展小范围测试,它的脚步也离我们越靠消费贾跃亭,孙宏斌能复活乐视吗?文布谷编辑李信近期,乐视开了一场发布会,乐视智能生态高级市场总监吴国平表示,这本是一次中小型的发布会,但不料因为舆论的宣传,让原本人数并不多的现场变得拥挤。造成这一次乐观发布会人数是不是小米性价比最高?只是部分机型最高的从硬件来看,小米的确是部分产品性价比最高的。如小米11RedmiK系列(今年的RedmiK40)以及RedmiNote系列等。小米是靠MIUI系统和高性价比手机起家的。这也是小米的一夜之间血本无归,现在的币圈太可怕了这几天,币圈掀起了一场腥风血雨。5月19日,比特币一度跌破3万美元,暴跌近30。超过57万人爆仓,爆仓金额达443亿元。虽然后续回升至3。9万美元左右,但一夜暴负是跑不掉的了。炒币泡面盖已成过去式,iPadPro将与MacBook平起平坐在智能手机刚起步的阶段,苹果率先带来了iPad,为掌上影音娱乐提供了更多可能性。然而随着手机屏幕尺寸的不断增加,iPad的优势被不断削弱,使用场景也越来越少,这才有了买前生产力,买抖音快手等105款App被通报近期,针对人民群众反映强烈的App非法获取超范围收集过度索权等侵害个人信息的现象,国家互联网信息办公室依据中华人民共和国网络安全法App违法违规收集使用个人信息行为认定方法常见类型Edge新特性新增迷你右键菜单内置在线词典优化密码监控基于Chromium的新版Edge浏览器在近日的更新中,终于迁移了经典版Edge浏览器中的在线字典功能。此外在最新版本中,微软似乎正在开发迷你版右键菜单,并对密码监控工具进行改善。华为喊话了?小米的选择很关键谷歌中断与华为合作让其操作系统面临巨大难题。不过已经有所准备的华为推出了鸿蒙系统用以应对,虽然并没有第一时间上线,但也稳住了不少用户。至少在2020年前期华为依然能够取得骄人的销售
三大运营商降维打击,有线电视和户户通用户直线下降近年来,三大电信运营商(移动联通电信)积极发展农村宽带光纤,为了抢占农村市场,三大运营商纷纷推出促销措施,在很多地区装宽带免费赠送IPTV电视机顶盒。IPTV机顶盒可以看几百个电视世纪华通元宇宙游戏LiveTopia用户超1亿全球前三今年4月28日,A股游戏龙头世纪华通在全球最大的元宇宙社区Roblox平台,推出元宇宙游戏LiveTopia,游戏上线不到5个月时间,月活跃用户超过4000万,在Roblox平台上阿里巴巴发布全员公开信客户第一的初心不改Hello,大家早上好,又是元气满满的一天,先来浏览新鲜的早报吧阿里巴巴发布全员公开信客户第一的初心不改阿里巴巴表示,无论是过去的22年,还是未来的80年,客户第一的初心不改。今天英特尔施压英伟达高端市场壁垒难突破,定价策略或是关键出路中国科技新闻网9月10日讯(高运韬任彦松)近日,英特尔在其2021架构日上发布了多项变革计划,其中最引人注意的就是独立GPU的开发。会上,英特尔介绍了其备受瞩目的全新独立游戏GPU骁龙895挤爆牙膏,4nm换代GPU新基带,重现835骁龙888的热度想必大家已经领略过了,现在关于骁龙895的消息也爆料了不少,这颗芯片是否能成为新的神U,芯片带来了哪些新变化,让我们一同走进骁龙895这颗安卓旗舰芯片。全新架构全新鸿蒙代替安卓?专家两者根本不在一个层面!揭秘鸿蒙真正野心华为鸿蒙2。0系统发布后,大家纷纷表示为什么与安卓系统如此相似?为什么大家都在说鸿蒙系统代替不了安卓系统?让我们走进今天的主题华为鸿蒙2。0系统。引子大家好,我是大牛。就在前不久,为什么有的人戴助听器能听见声音但是听不清楚内容?助听器的主要作用是提高声音,让耳朵听到之前听不到的声音,在噪音有干扰的情况下,尽可能的分清噪音和言语信号进行降噪。但是当语言信号传到耳朵后面的中枢神经后,能听懂多少语言则不是助听器替所有用户出头,小米机主起诉小米一位小米机主因为广告问题把小米告了,并且还赢了,就很绝。据悉,华东政法大学学生范某将小米告上法庭,原因是触碰广告即自动下载。并且,原告范某要求小米公司停止发送广告赔偿因触碰广告自动美议员声称担忧华为收集美国信息,赵立坚一句话反问美企来源环球网环球时报环球网报道记者白云怡美国国会众议院能源和商务委员会多名议员9日致信美国交通部长,对美国政府批准华为采购汽车芯片的决定表示担忧,称担心华为会收集美国民众及交通基础设曝三星GalaxyS22或无缘两亿像素主摄按照惯例,三星GalaxyS22将在明年年初正式发布。与很多旗舰一样,除了将要配备骁龙898处理器之外,这款手机的影像配置也十分值得关注,因为此前有消息显示,三星S22不仅将要和奥手机卡顿,原来罪魁祸首是这5个开关,关掉瞬间流畅默默问一句你的手机卡顿吗?这个老生常谈的话题了,手机卡顿的情况下,有钱人选择换新手机,穷人选择忍受,或许这就是现实吧但是,手机卡顿也是有原因的,暂且不说苹果手机和安卓手机运行机制的