队列与Stream redisstream结构如上图所示 消息链表,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。 Stream唯一名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。 消费组,一个stream支持多个lastdeliveredid,表示当前消费组已经消费到哪条消息了。 每个消费者组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroupcreate进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化lastdeliveredid变量。 同一个消费组(ConsumerGroup)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标lastdeliveredid往前移动。每个消费者有一个组内唯一名称。 pendingids,它记录了当前消费者已经被客户端读取,但是还没有ack的消息。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pendingids变量在Redis官方被称之为PEL,也就是PendingEntriesList,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。 消息ID的形式是timestampInMillissequence,例如15278468805725,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第5条消息。 消息内容就是键值对,形如hash结构的键值对,这没什么特别之处。常用命令 版本:redis6。2。8生产端xadd追加消息xdel删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。XDELstreamtest16725743639100xrange获取消息列表,会自动过滤已经删除的消息xlen消息长度del删除Stream streamtest表示当前这个队列的名字,也就是我们一般意义上Redis中的key, 号表示服务器自动生成ID,后面顺序跟着,是我们存入当前streamtest这个队列的消息,采用的也是keyvalue的存储形式 返回值16725743639100则是生成的消息ID,由两部分组成:时间戳序号。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。建议使用Redis的方案生成消息ID,因为这种时间戳序号的单调递增的ID方案,几乎可以满足全部的需求,但ID是支持自定义的。 为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latestgeneratedid属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latestgeneratedid所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。消费端单消费者 Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(ConsumerGroup)的存在,就好比Stream就是一个普通的列表(list)。XREADcount1streamsstreamtest00count1读取1条消息streams关键字00从头开始xreadcount2streamsstreamtest16725743164040消费16725743164040(不包括)后面的两条消息XREADcount1streamsstreamtest默认返回nil,从尾部读取最新的一条消息XREADblock0count1streamsstreamtestblock阻塞读取消息,直到有消息写入 一般来说客户端如果想要使用xread进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用xread时,将上次返回的最后一个消息ID作为参数传递进去,就可以继续消费后续的消息。消费者组 创建消费者组XGROUPcreatestreamtestcgroup100从头开始消费XGROUPcreatestreamtestcgroup2从尾部开始消费,只接收新消息,其他消息忽略 XINFOstreamstreamtest查看消息队列信息 XINFOgroupsstreamtest查看消息者组情况 消费消息 有了消费组,自然还需要消费者,Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。XREADGROUPgroupcgroup1c1count1streamsstreamtestcgroup1指定消费者组c1指定消费者count1消费数量从当前消费者组的lastdeliveredid(不包括)开始读阻塞读取,直到有消息写入,并返回阻塞时间XREADGROUPgroupcgroup1c1block0count1streamsstreamtestXINFOgroupsstreamtest消费者组状态XINFOconsumersstreamtestcgroup1消费者组cgroup1内的消费者状态XACKstreamtestcgroup116726241139380确认消息XPENDINGstreamtestcgroup1返回cgroup1内消费者未处理完的消息 消费者组状态 更多的Redis的Stream命令请大家参考Redis官方文档:Redis队列几种实现的总结 基于List的LPUSHBRPOP的实现 足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试。 其他缺点包括: 做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认;不能做广播模式,如pubsub,消息发布订阅模型;不能重复消费,一旦消费就会被删除;不支持分组消费。 基于SortedSet的实现 多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。 PUBSUB,订阅发布模式 优点: 典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。 缺点: 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回;不能保证每个消费者接收的时间是一致的;若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时;可见,PubSub模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。 基于Stream类型的实现 基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控就需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件。不保证消息不丢失。消息队列问题 从我们上面对Stream的使用表明,Stream已经具备了一个消息队列的基本要素,生产者API、消费者API,消息Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到。 Stream消息太多怎么办? 要是消息积累太多,Stream的链表岂不是很长,内容会不会爆掉?xdel指令又不会删除消息,它只是给消息做了个标志位。 Redis自然考虑到了这一点,所以它提供了一个定长Stream功能。在xadd的指令提供一个定长长度maxlen,就可以将老的消息干掉,确保最多不超过指定长度。 消息如果忘记ACK会怎样? Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。所以消息要尽可能的快速消费并确认。 PEL如何避免消息丢失? 在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息ID不能为参数,而必须是任意有效的消息ID,一般将参数设为00,表示读取所有的PEL消息以及自lastdeliveredid之后的新消息。 死信问题 如果某个消息,不能被消费者处理(处理失败),也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的deliverycounter(通过XPENDING可以查询到)就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),删除即可。 删除一个消息,使用XDEL语法,注意,这个命令并没有删除Pending中的消息,因此查看Pending,消息还会在,可以在执行执行XDEL之后,XACK这个消息标识其处理完毕。 Stream的高可用 Stream的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在Sentinel和Cluster集群环境下Stream是可以支持高可用的。不过鉴于Redis的指令复制是异步的,在failover发生时,Redis可能会丢失极小部分数据,这点Redis的其它数据结构也是一样的。 分区Partition Redis的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个Stream,然后在客户端使用一定的策略来生产消息到不同的Stream。 Stream小结 Stream的消费模型借鉴了Kafka的消费分组的概念,它弥补了RedisPubSub不能持久化消息的缺陷。但是它又不同于kafka,Kafka的消息可以分partition,而Stream不行。如果非要分parition的话,得在客户端做,提供不同的Stream名称,对消息进行hash取模来选择往哪个Stream里塞。 总的来说,如果是中小项目和企业,在工作中已经使用了Redis,在业务量不是很大,而又需要消息中间件功能的情况下,可以考虑使用Redis的Stream功能。但是如果并发量很高,资源足够支持下,还是以专业的消息中间件,比如RocketMQ、Kafka等来支持业务更好。HyperLogLog HyperLogLog并不是一种新的数据结构(实际类型为字符串类型),而是一种基数算法,通过HyperLogLog可以利用极小的内存空间完成独立总数的统计,数据集可以是IP、Email、ID等。 如果你负责开发维护一个大型的网站,有一天产品经理要网站每个网页每天的UV数据,然后让你来开发这个统计模块,你会如何实现? 如果统计PV那非常好办,给每个网页一个独立的Redis计数器就可以了,这个计数器的key后缀加上当天的日期。这样来一个请求,incrby一次,最终就可以统计出所有的PV数据。 但是UV不一样,它要去重,一个简单的方案,那就是为每一个页面一个独立的set集合来存储所有当天访问过此页面的用户ID。当一个请求过来时,我们使用sadd将用户ID塞进去就可以了。通过scard可以取出这个集合的大小,这个数字就是这个页面的UV数据。 但是,如果你的页面访问量非常大,比如一个爆款页面几千万的UV,你需要一个很大的set集合来统计,这就非常浪费空间。如果这样的页面很多,那所需要的存储空间是惊人的。为这样一个去重功能就耗费这样多的存储空间,值得么?其实需要的数据又不需要太精确,1050w和1060w这两个数字对于老板们来说并没有多大区别,So,有没有更好的解决方案呢? Redis提供了HyperLogLog数据结构就是用来解决这种统计问题的。HyperLogLog提供不精确的去重计数方案,虽然不精确但是也不是非常不精确,Redis官方给出标准误差是0。81,这样的精确度已经可以满足上面的UV统计需求了。 HyperLogLog提供了3个命令:pfadd、pfcount、pfmerge。例如0815的访问用户是u1、u2、u3、u4,0816的访问用户是u4、u5、u6、u7pfadd20130102:user:idu1u2u3u4u5u6添加元素PFCOUNT20130102:user:id返回元素个数6PFADD20130103:user:idu1u2u3u90u91PFMERGE20130102:user:id20130103:user:id合并元素到20130102:user:idPFCOUNT20130102:user:id求并集,返回8 以使用集合类型和HperLogLog统计百万级用户访问次数的占用空间对比: 数据类型 1天 1月 1年 集合类型 80M 2。4G 28G HyperLogLog 15k 450k 5M 可以看到,HyperLogLog内存占用量小得惊人,但是用如此小空间来估算如此巨大的数据,必然不是100的正确,其中一定存在误差率。前面说过,Redis官方给出的数字是0。81的失误率。Redis事务 简单地说,事务表示一组动作,要么全部执行,要么全部不执行。例如在社交网站上用户A关注了用户B,那么需要在用户A的关注表中加入用户B,并且在用户B的粉丝表中添加用户A,这两个行为要么全部执行,要么全部不执行,否则会出现数据不一致的情况。multi开启事务exec事务结束,开始执行discard停止执行,代替exec,它们之间的命令是原子顺序执行的 可以看到sadd命令此时的返回结果是QUEUED,代表命令并没有真正执行,而是暂时保存在Redis中的一个缓存队列(所以discard也只是丢弃这个缓存队列中的未执行命令,并不会回滚已经操作过的数据,这一点要和关系型数据库的Rollback操作区分开)。如果此时另一个客户端执行sismemberu:a:followub返回结果应该为0。 事务中出现错误命令错误,属于语法错误,会造成整个事务无法执行运行时错误,例如用户B在添加粉丝列表时,误把sadd命令(针对集合)写成了zadd命令(针对有序集合),这种就是运行时命令,因为语法是正确的,那第一条执行成功,第二条执行失败, 可以看到Redis并不支持回滚功能,开发人员需要自己修复这类问题。watch 有些应用场景需要在事务之前,确保事务中的key没有被其他客户端修改过,才执行事务,否则不执行(类似乐观锁)。Redis提供了watch命令来解决这类问题。 可以看到客户端1在执行multi之前执行了watch命令,客户端2在客户端1执行exec之前修改了key值,造成客户端1事务没有执行(exec结果为nil)。Pipeline和事务的区别 1、pipeline是客户端的行为,对于服务器来说是透明的,可以认为服务器无法区分客户端发送来的查询命令是以普通命令的形式还是以pipeline的形式发送到服务器的; 2、而事务则是实现在服务器端的行为,用户执行MULTI命令时,服务器会将对应这个用户的客户端对象设置为一个特殊的状态,在这个状态下后续用户执行的查询命令不会被真的执行,而是被服务器缓存起来,直到用户执行EXEC命令为止,服务器会将这个用户对应的客户端对象中缓存的命令按照提交的顺序依次执行。 3、应用pipeline可以提服务器的吞吐能力,并提高Redis处理查询请求的能力。 存在问题,当通过pipeline提交的查询命令数据较少时(可以被内核缓冲区所容纳),Redis可以保证这些命令执行的原子性。然而一旦数据量过大,超过了内核缓冲区的接收大小,那么命令的执行将会被打断,原子性也就无法得到保证。 因此pipeline只是一种提升服务器吞吐能力的机制,如果想要命令以事务的方式原子性的被执行,还是需要事务机制,或者使用更高级的脚本功能以及模块功能。 4、可以将事务和pipeline结合起来使用,减少事务的命令在网络上的传输时间,将多次网络IO缩减为一次网络IO。 Redis提供了简单的事务,之所以说它简单,主要是因为它不支持事务中的回滚特性,同时无法实现命令之间的逻辑关系计算,当然也体现了Redis的keepitsimple的特性。 作者:咖啡冲不冲 链接:https:juejin。cnpost7184007074945171513