容概要
redis 5.0最大的亮点就是Steams,本文重点介绍此技术
Stream是Redis 5.0引入的一种新数据类型,它以更抽象的方式模拟日志数据结构,但日志的本质仍然完好无损:就像日志文件一样,通常实现为仅附加模式打开的文件, Redis流主要是仅附加数据结构。至少在概念上,因为Redis Streams是一种在内存中表示的抽象数据类型,它们实现了更强大的操作,以克服日志文件本身的限制。 是什么让Redis流式传输最复杂的Redis类型,尽管数据结构本身非常简单,它实现了额外的非强制性功能:一组阻塞操作允许消费者等待生产者添加到流中的新数据,此外还有一个名为Consumer Groups的概念。 消费者群体最初由称为Kafka(TM)的流行消息系统引入。 Redis以完全不同的术语重新实现了类似的想法,但目标是相同的:允许一组客户端合作消费同一消息流的不同部分。
1
Streams基础知识
为了理解Redis流是什么以及如何使用它们,我们将忽略所有高级功能,而是根据用于操作和访问它的命令来关注数据结构本身。这基本上是大多数其他Redis数据类型共有的部分,如列表,集合,排序集等。但是,请注意,列表还有一个可选的更复杂的阻塞API,由BLPOP等命令导出。因此,流与这方面的列表没有多大区别,只是附加API更复杂,功能更强大。 由于流是仅附加数据结构,因此基本写入命令(称为XADD)会将新条目附加到指定的流中。流条目不仅仅是一个字符串,而是由一个或多个字段 - 值对组成。这样,流的每个条目都已经结构化,就像仅以CSV格式写入的附加文件,其中每行中存在多个分离的字段。
上面对XADD命令的调用使用自动生成的条目ID向密钥mystream的流添加条目sensor-id:123,温度:19.8,该ID是命令返回的条目ID,具体为1518951480106-0。它获取密钥名称mystream的第一个参数,第二个参数是标识流中每个条目的条目ID。但是,在这种情况下,我们传递了*因为我们希望服务器为我们生成新的ID。每个新ID都将单调增加,因此更简单地说,添加的每个新条目与过去的所有条目相比都会有更高的ID。服务器自动生成ID几乎总是您想要的,并且明确指定ID的原因非常少见。我们稍后会详细讨论这个问题。每个Stream条目具有ID的事实是与日志文件的另一个相似性,其中可以使用行号或文件内的字节偏移来识别给定条目。回到我们的XADD示例,在键名和ID之后,下一个参数是组成我们的流条目的字段 - 值对。 只需使用XLEN命令就可以获取Stream中的项目数:
条目ID XADD命令返回的条目ID,以及单个标识给定流内的每个条目,由两部分组成:
毫秒时间部分实际上是生成流ID的本地Redis节点中的本地时间,但是如果当前毫秒时间恰好小于前一个入口时间,则使用前一个入口时间,因此如果时钟向后跳跃单调递增的ID属性仍然存在。序列号用于以相同毫秒创建的条目。由于序列号是64位宽,实际上在相同的毫秒内可以生成的条目数量没有限制。 这些ID的格式最初可能看起来很奇怪,温和的读者可能想知道为什么时间是ID的一部分。原因是Redis通过ID支持范围查询。由于ID与生成条目的时间相关,因此可以基本上免费查询时间范围。在覆盖XRANGE命令时,我们很快就会看到这一点。 如果由于某种原因,用户需要与时间无关但实际上与另一个外部系统ID关联的增量ID,如前所述,XADD命令可以采用显式ID而不是触发自动生成的*通配符ID,如下例所示:
请注意,在这种情况下,最小ID为0-1,并且命令不接受等于或小于前一个ID的ID:
2
从Streams获取数据
现在我们终于可以通过XADD在我们的流中添加条目了。但是,虽然将数据附加到流中非常明显,但是为了提取数据而查询流的方式并不是那么明显。如果我们继续对日志文件进行类比,一种显而易见的方法是模仿我们通常使用Unix命令tail -f做的事情,也就是说,我们可能会开始监听以获取附加到流的新消息。请注意,与Redis的阻塞列表操作不同,其中给定元素将到达单个客户端,该客户端在流行样式操作(如BLPOP)中阻塞,我们希望多个消费者可以看到附加到流的新消息,如许多尾部-f进程可以查看添加到日志的内容。使用传统术语,我们希望流能够将消息扇出到多个客户端。 但是,这只是一种潜在的访问模式。我们还可以以完全不同的方式看到流:不是作为消息传递系统,而是作为时间序列存储。在这种情况下,也许附加新消息也很有用,但另一种自然查询模式是按时间范围获取消息,或者使用游标迭代消息以逐步检查所有历史记录。这绝对是另一种有用的访问模式。
最后,如果我们从消费者的角度看到一个流,我们可能希望以另一种方式访问流,即,作为可以分区到处理此类消息的多个消费者的消息流,以便消费者群体只能看到到达单个流的消息的子集。通过这种方式,可以跨不同的消费者扩展消息处理,而不需要单个消费者处理所有消息:每个消费者只需要处理不同的消息。这基本上是Kafka(TM)与消费者群体的关系。通过消费者组阅读消息是另一种从Redis流中读取的有趣模式。 Redis流通过不同的命令支持上述三种查询模式。接下来的部分将展示所有这些部分,从最简单和更直接的使用:范围查询开始。 按范围查询:XRANGE和XREVRANGE 要按范围查询流,我们只需要指定两个ID,即开始结束。返回的范围将包括具有ID的开头或结尾的元素,因此范围是包含的。两个特殊ID - 和+分别表示可能的最小和最大ID。
返回的每个条目都是两个项目的数组:ID和字段 - 值对列表。我们已经说过,条目ID与时间有关系,因为 - 字符左边的部分是创建条目的本地节点在创建条目时的毫秒时间(以毫秒为单位)(但是请注意)使用完全指定的XADD命令复制Streams,因此从属服务器将具有与主服务器相同的ID)。这意味着我可以使用XRANGE查询一系列时间。但是,为了做到这一点,我可能想省略ID的序列部分:如果省略,则在范围的开始处假定它是0,而在最后部分它将被假定为最大值序列号可用。这样,使用两倍的Unix时间查询,我们以包含的方式获得在该时间范围内生成的所有条目。例如,我可能想查询我可以使用的两毫秒时间:
我在这个范围内只有一个条目,但是在实际数据集中,我可以查询小时数范围,或者在两毫秒内可能有很多项目,并且返回的结果可能很大。因此,XRANGE最后支持可选的COUNT选项。通过指定计数,我可以获得前N个项目。如果我想要更多,我可以获得最后一个ID,逐个递增序列,然后再次查询。我们在下面的例子中看到这一点。我们开始用XADD添加10个项目(我没有说明,已经假设流mystream填充了10个项目)。要开始我的迭代,每个命令获得2个项目,我从全范围开始,但计数为2。
为了继续使用下两个项目进行迭代,我必须选择返回的最后一个ID,即1519073279157-0,并将1添加到ID的序列号部分。请注意,序列号为64位,因此无需检查溢出。结果ID,在这种情况下为1519073279157-1,现在可以用作下一个XRANGE调用的新起始参数:
等等。由于XRANGE复杂度为O(log(N))要寻找,然后O(M)返回M个元素,所以命令具有对数时间复杂度,这意味着迭代的每一步都很快。因此XRANGE也是事实上的流迭代器,不需要XSCAN命令。 命令XREVRANGE相当于XRANGE但是以反转顺序返回元素,因此XREVRANGE的实际用途是检查Stream中的最后一项是什么:
请注意,XREVRANGE命令以相反的顺序获取start和stop参数。
4
使用XREAD收听新项目
当我们不想按流中的范围访问项目时,通常我们想要的是订阅到达流的新项目。这个概念可能与您订阅频道的Redis Pub / Sub或Redis阻止列表有关,在这里您等待密钥获取要获取的新元素,但是您使用流的方式存在根本差异:
流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都将传递给等待给定流中的数据的每个消费者。此行为与阻止列表不同,其中每个使用者将获得不同的元素。但是,扇出到多个消费者的能力类似于Pub / Sub。 虽然在Pub / Sub中消息是永远不会存储的,但是在使用阻塞列表时,当客户端收到消息时,它会从列表中弹出(有效删除),流以完全不同的方式工作。
所有消息都无限期地附加在流中(除非用户明确要求删除条目):不同的消费者通过记住收到的最后一条消息的ID,从其角度知道什么是新消息。 Streams Consumer Groups提供Pub / Sub或阻止列表无法实现的控制级别,同一个流的不同组,已处理项目的明确确认,检查待处理项目的能力,声明未处理的消息以及每个组的连贯历史可见性单个客户端,只能查看其私人过去的消息历史记录。
提供侦听到达流的新消息的能力的命令称为XREAD。它比XRANGE复杂一点,所以我们将开始显示简单的表单,稍后将提供整个命令布局。
以上是XREAD的非阻塞形式。请注意,COUNT选项不是必需的,实际上该命令的唯一强制选项是STREAMS选项,它指定一个键列表以及调用消费者已经为每个流已经看到的相应最大ID,以便命令将仅向客户端提供ID大于我们指定ID的消息。 在上面的命令中,我们编写了STREAMS mystream 0,因此我们希望Stream mystream中的所有消息都具有大于0-0的ID。正如您在上面的示例中所看到的,该命令返回键名,因为实际上可以使用多个键调用此命令以同时从不同的流中读取。我可以编写,例如:STREAMS mystream otherstream 0 0.注意在STREAMS选项之后我们需要提供密钥名称,以及后来的ID。因此,STREAMS选项必须始终是最后一个。 除了XREAD可以同时访问多个流,并且我们能够指定我们拥有的最后一个ID以获取更新的消息之外,在这个简单的形式中,命令与XRANGE相比没有做出如此不同的事情。但是,有趣的是我们可以通过指定BLOCK参数轻松地在阻塞命令中打开XREAD:
请注意,在上面的示例中,除了删除COUNT之外,我指定了新的BLOCK选项,其超时为0毫秒(这意味着永不超时)。而且,我没有传递流mystream的普通ID,而是传递了特殊的ID $。这个特殊ID意味着XREAD应该使用已经存储在流mystream中的最大ID作为最后一个ID,这样我们将从我们开始收听的时间开始只接收新消息。这在某种程度上类似于tail -f Unix命令。 请注意,使用BLOCK选项时,我们不必使用特殊ID $。我们可以使用任何有效的ID。如果命令能够立即服务我们的请求而不会阻塞,它将执行此操作,否则它将阻止。通常,如果我们想从新条目开始使用流,我们从ID $开始,之后我们继续使用收到的最后一条消息的ID来进行下一次调用,依此类推。
XREAD的阻止形式也可以通过指定多个键名来侦听多个Streams。如果请求可以同步提供,因为至少有一个流的元素大于我们指定的相应ID,则返回结果。否则,该命令将阻止并将返回获取新数据的第一个流的项目(根据指定的ID)。 与阻塞列表操作类似,阻塞流读取从等待数据的客户端的角度来看是公平的,因为语义是FIFO样式。阻止给定流的第一个客户端是第一个在新项可用时将被解除阻止的客户端。 XREAD没有除COUNT和BLOCK之外的其他选项,因此它是一个非常基本的命令,具有特定目的,可以将消费者攻击到一个或多个流。使用使用者组API可以使用更强大的功能来使用流,但是通过使用者组读取是通过名为XREADGROUP的不同命令实现的,本指南的下一部分将对此进行介绍。
结束语:总之Stream类似于集成了kafka这样的消息流,在应用中处理消息更能得心应手。
领取专属 10元无门槛券
私享最新 技术干货