最近学习Netty
的时候想做一个基于Redis
服务协议的编码解码模块,过程中顺便阅读了Redis
服务序列化协议RESP
,结合自己的理解对文档进行了翻译并且简单实现了RESP
基于Java
语言的解析。编写本文的使用使用的JDK
版本为[8+]
。
Redis
客户端与Redis
服务端基于一个称作RESP
的协议进行通信,RESP
全称为Redis Serialization Protocol
,也就是Redis
序列化协议。虽然RESP
为Redis
设计,但是它也可以应用在其他客户端-服务端(Client-Server
)的软件项目中。RESP
在设计的时候折中考虑了如下几点:
RESP
可以序列化不同的数据类型,如整型、字符串、数组还有一种特殊的Error
类型。需要执行的Redis
命令会封装为类似于字符串数组的请求然后通过Redis
客户端发送到Redis
服务端。Redis
服务端会基于特定的命令类型选择对应的一种数据类型进行回复(这一句是意译,原文是:Redis replies with a command-specific data type
)。
RESP
是二进制安全的(binary-safe
),并且在RESP
下不需要处理从一个进程传输到另一个进程的批量数据,因为它使用了前缀长度(prefixed-length
,后面会分析,就是在每个数据块的前缀已经定义好数据块的个数,类似于Netty
里面的定长编码解码)来传输批量数据。
注意:此处概述的协议仅仅使用在客户端-服务端通信,Redis Cluster
使用不同的二进制协议在多个节点之间交换消息(也就是Redis
集群中的节点之间并不使用RESP
通信)。
Redis
客户端通过创建一个在6379
端口的TCP
连接,连接到Redis
服务端。
虽然RESP
在底层通信协议技术上是非TCP
特定的,但在Redis
的上下文中,RESP
仅用于TCP
连接(或类似的面向流的连接,如Unix
套接字)。
Redis
服务端接收由不同参数组成的命令,接收到命令并将其处理之后会把回复发送回Redis
客户端。这是最简单的模型,但是有两种例外的情况:
Redis
支持管道(Pipelining
,流水线,多数情况下习惯称为管道)操作。使用管道的情况下,Redis
客户端可以一次发送多个命令,然后等待一次性的回复(文中的回复是replies
,理解为Redis
服务端会一次性返回一个批量回复结果)。Redis
客户端订阅Pub/Sub
信道时,该协议会更改语义并成为推送协议(push protocol
),也就是说,客户端不再需要发送命令,因为Redis
服务端将自动向客户端(订阅了改信道的客户端)发送新消息(这里的意思是:在订阅/发布模式下,消息是由Redis
服务端主动推送给订阅了特定信道的Redis
客户端)。除了上述两个特例之外,Redis
协议是一种简单的请求-响应协议。
RESP
在Redis 1.2
中引入,在Redis 2.0
,RESP
正式成为与Redis
服务端通信的标准方案。也就是如果需要编写Redis
客户端,你就必须在客户端中实现此协议。
RESP
本质上是一种序列化协议,它支持的数据类型如下:单行字符串、错误消息、整型数字、定长字符串和RESP
数组。
RESP
在Redis
中用作请求-响应协议的方式如下:
Redis
客户端将命令封装为RESP
的数组类型(数组元素都是定长字符串类型,注意这一点,很重要)发送到Redis
服务器。Redis
服务端根据命令实现选择对应的RESP
数据类型之一进行回复。在RESP
中,数据类型取决于数据报的第一个字节:
+
。-
。:
。$
。RESP
数组的第一个字节为*
。另外,在RESP
中可以使用定长字符串或者数组的特殊变体来表示Null
值,后面会提及。在RESP
中,协议的不同部分始终以\r\n
(CRLF
)终止。
目前RESP
中5种数据类型的小结如下:
数据类型 | 本文翻译名称 | 基本特征 | 例子 |
---|---|---|---|
Simple String | 单行字符串 | 第一个字节是+,最后两个字节是\r\n,其他字节是字符串内容 | +OK\r\n |
Error | 错误消息 | 第一个字节是-,最后两个字节是\r\n,其他字节是异常消息的文本内容 | -ERR\r\n |
Integer | 整型数字 | 第一个字节是:,最后两个字节是\r\n,其他字节是数字的文本内容 | :100\r\n |
Bulk String | 定长字符串 | 第一个字节是$,紧接着的字节是内容字符串长度\r\n,最后两个字节是\r\n,其他字节是字符串内容 | $4\r\ndoge\r\n |
Array | RESP数组 | 第一个字节是*,紧接着的字节是元素个数\r\n,最后两个字节是\r\n,其他字节是各个元素的内容,每个元素可以是任意一种数据类型 | *2\r\n:100\r\n$4\r\ndoge\r\n |
下面的小节是对每种数据类型的更细致的分析。
简单字符串的编码方式如下:
+
。CR
或者LF
字符的字符串。CRLF
终止。简单字符串能够保证在最小开销的前提下传输非二进制安全的字符串。例如很多Redis
命令执行成功后服务端需要回复OK
字符串,此时通过简单字符串编码为5字节的数据报如下:
+OK\r\n
如果需要发送二进制安全的字符串,那么需要使用定长字符串。
当Redis
服务端用简单字符串响应时,Redis
客户端库应该向调用者返回一个字符串,该响应到调用者的字符串由+
之后直到字符串内容末尾的字符组成(其实就是上面提到的第(2)部分的内容),不包括最后的CRLF
字节。
错误消息类型是RESP
特定的数据类型。实际上,错误消息类型和简单字符串类型基本一致,只是其第一个字节为-
。错误消息类型跟简单字符串类型的最大区别是:错误消息作为Redis
服务端响应的时候,对于客户端而言应该感知为异常,而错误消息中的字符串内容应该感知为Redis
服务端返回的错误信息。错误消息的编码方式如下:
-
。CR
或者LF
字符的字符串。CRLF
终止。一个简单的例子如下:
-Error message\r\n
Redis
服务端只有在真正发生错误或者感知错误的时候才会回复错误消息,例如尝试对错误的数据类型执行操作或者命令不存在等等。Redis
客户端接收到错误消息的时候,应该触发异常(一般情况就是直接抛出异常,可以根据错误消息的内容进行异常分类)。下面是错误消息响应的一些例子:
-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value
-
之后的第一个单词到第一个空格或换行符之间的内容,代表返回的错误类型。这只是Redis
使用的约定,不是RESP
错误消息格式的一部分。
例如,ERR
是通用错误,WRONGTYPE
则是更具体的错误,表示客户端试图针对错误的数据类型执行操作。这种定义方式称为错误前缀,是一种使客户端能够理解服务器返回的错误类型的方法,而不必依赖于所给出的确切消息定义,该消息可能会随时间而变化。
客户端实现可以针对不同的错误类型返回不同种类的异常,或者可以通过将错误类型的名称作为字符串直接提供给调用方来提供捕获错误的通用方法。
但是,不应该将错误消息分类处理的功能视为至关重要的功能,因为它作用并不巨大,并且有些的客户端实现可能会简单地返回特定值去屏蔽错误消息作为通用的异常处理,例如直接返回false
。
整型数字的编码方式如下:
:
。CR
或者LF
字符的字符串,也就是数字要先转换为字符序列,最终要输出为字节。CRLF
终止。例如:
:0\r\n
:1000\r\n
许多Redis
命令返回整型数字,像INCR
,LLEN
和LASTSAVE
命令等等。
返回的整型数字没有特殊的含义,像INCR
返回的是增量的总量,而LASTSAVE
是UNIX
时间戳。但是Redis
服务端保证返回的整型数字在带符号的64位整数范围内。
有些情况下,返回的整型数字会指代true
或者false
。如EXISTS
或者SISMEMBER
命令执行返回1代表true
,0代表false
。
有些情况下,返回的整型数字会指代命令是否真正产生了效果。如SADD
,SREM
和SETNX
命令执行返回1代表命令执行生效,0代表命令执行不生效(等价于命令没有执行)。
下面的一组命令执行后都是返回整型数字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD
。
定长字符串用于表示一个最大长度为512MB
的二进制安全的字符串(Bulk
,本身有体积大的含义)。定长字符串的编码方式如下:
$
。prefixed length
,也就是前缀长度),前缀长度分块以CRLF
终止。CR
或者LF
字符的字符串,也就是数字要先转换为字符序列,最终要输出为字节。CRLF
终止。举个例子,doge
使用定长字符串编码如下:
第一个字节 | 前缀长度 | CRLF | 字符串内容 | CRLF | 定长字符串 | |
---|---|---|---|---|---|---|
$ | 4 | \r\n | doge | \r\n | ===> | $4\r\ndoge\r\n |
foobar
使用定长字符串编码如下:
第一个字节 | 前缀长度 | CRLF | 字符串内容 | CRLF | 定长字符串 | |
---|---|---|---|---|---|---|
$ | 6 | \r\n | foobar | \r\n | ===> | $6\r\nfoobar\r\n |
表示空字符串(Empty String
,对应Java中的""
) 的时候,使用定长字符串编码如下:
第一个字节 | 前缀长度 | CRLF | CRLF | 定长字符串 | |
---|---|---|---|---|---|
$ | 0 | \r\n | \r\n | ===> | $0\r\n\r\n |
定长字符串也可以使用特殊的格式来表示Null
值,指代值不存在。在这种特殊格式中,前缀长度为-1,并且没有数据,因此使用定长字符串对Null
值进行编码如下:
第一个字节 | 前缀长度 | CRLF | 定长字符串 | |
---|---|---|---|---|
$ | -1 | \r\n | ===> | $-1\r\n |
当Redis
服务端返回定长字符串编码的Null
值的时候,客户端不应该返回空字符串,而应该返回对应编程语言中的Null
对象。例如Ruby
中对应nil
,C
语言中对应NULL
,Java
中对应null
,以此类推。
Redis
客户端使用RESP
数组发送命令到Redis
服务端。与此相似,某些Redis
命令执行完毕后服务端需要使用RESP
数组类型将元素集合返回给客户端,如返回一个元素列表的LRANGE
命令。RESP
数组和我们认知中的数组并不完全一致,它的编码格式如下:
*
。RESP
数组的元素个数(十进制数,但是最终需要转换为字节序列,如10需要转换为1
和0
两个相邻的字节),元素个数分块以CRLF
终止。RESP
数组的每个元素内容,每个元素可以是任意的RESP
数据类型。一个空的RESP
数组的编码如下:
*0\r\n
一个包含2个定长字符串元素内容分别为foo
和bar
的RESP
数组的编码如下:
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
通用格式就是:*<count>CRLF
作为RESP
数组的前缀部分,而组成RESP
数组的其他数据类型的元素只是一个接一个地串联在一起。例如一个包含3个整数类型元素的RESP
数组的编码如下:
*3\r\n:1\r\n:2\r\n:3\r\n
RESP
数组的元素不一定是同一种数据类型,可以包含混合类型的元素。例如下面是一个包含4个整数类型元素和1个定长字符串类型元素(一共有5个元素)的RESP
数组的编码(为了看得更清楚,分多行进行编码,实际上不能这样做):
# 元素个数
*5\r\n
# 第1个整型类型的元素
:1\r\n
# 第2个整型类型的元素
:2\r\n
# 第3个整型类型的元素
:3\r\n
# 第4个整型类型的元素
:4\r\n
# 定长字符串类型的元素
$6\r\n
foobar\r\n
Redis
服务端响应报的首行*5\r\n
定义了后面会紧跟着5个回复数据,然后每个回复数据分别作元素项,构成了用于传输的多元素定长回复(Multi Bulk Reply
,感觉比较难翻译,这里的大概意思就是每个回复行都是整个回复报中的一个项)。
这里可以类比为Java
中的ArrayList
(泛型擦除),有点类似于下面的伪代码:
List encode = new ArrayList();
// 添加元素个数
encode.add(elementCount);
encode.add(CRLF);
// 添加第1个整型类型的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 添加第2个整型类型的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 添加第3个整型类型的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 添加第4个整型类型的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 添加定长字符串类型的元素
encode.add('$');
// 前缀长度
encode.add(6);
// 字符串内容
encode.add("foobar");
encode.add(CRLF);
RESP
数组中也存在Null
值的概念,下面称为RESP Null Array
。处于历史原因,RESP
数组中采用了另一种特殊的编码格式定义Null
值,区别于定长字符串中的Null
值字符串。例如,BLPOP
命令执行超时的时候,就会返回一个RESP Null Array
类型的响应。RESP Null Array
的编码如下:
*-1\r\n
当Redis
服务端的回复是RESP Null Array
类型的时候,客户端应该返回一个Null
对象,而不是一个空数组或者空列表。这一点比较重要,它是区分回复是空数组(也就是命令正确执行完毕,返回结果正常)或者其他原因(如BLPOP
命令的超时等)的关键。
RESP
数组的元素也可以是RESP
数组,下面是一个包含2个RESP
数组类型的元素的RESP
数组,编码如下(为了看得更清楚,分多行进行编码,实际上不能这样做):
# 元素个数
*2\r\n
# 第1个RESP数组元素
*3\r\n
:1\r\n
:2\r\n
:3\r\n
# 第2个RESP数组元素
*2\r\n
+Foo\r\n
-Bar\r\n
上面的RESP
数组的包含2个RESP
数组类型的元素,第1个RESP
数组元素包含3个整型类型的元素,而第2个RESP
数组元素包含1个简单字符串类型的元素和1个错误消息类型的元素。
RESP
数组中的Null元素
RESP
数组中的单个元素也有Null
值的概念,下面称为Null
元素。Redis
服务端回复如果是RESP
数组类型,并且RESP
数组中存在Null
元素,那么意味着元素丢失,绝对不能用空字符串替代。缺少指定键的前提下,当与GET
模式选项一起使用时,SORT
命令可能会发生这种情况。
下面是一个包含Null
元素的RESP
数组的例子(为了看得更清楚,分多行进行编码,实际上不能这样做):
*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n
RESP
数组中的第2个元素是Null
元素,客户端API
最终返回的内容应该是:
# Ruby
["foo",nil,"bar"]
# Java
["foo",null,"bar"]
主要包括:
Inline Commands
)。其实文档中还有一节使用C
语言编写高性能RESP
解析器,这里不做翻译,因为掌握RESP
的相关内容后,可以基于任何语言编写解析器。
如果已经相对熟悉RESP
中的序列化格式,那么编写Redis
客户端类库就会变得很容易。我们可以进一步指定客户端和服务器之间的交互方式:
Redis
客户端向Redis
服务端发送仅仅包含定长字符串类型元素的RESP
数组。Redis
服务端可以采用任意一种RESP
数据类型向Redis
客户端进行回复,具体的数据类型一般取决于命令类型。下面是典型的交互例子:Redis
客户端发送命令LLEN mylist
以获得KEY
为mylist
的长度,Redis
服务端将以整数类型进行回复,如以下示例所示(C
是客户端,S
服务器),伪代码如下:
C: *2\r\n
C: $4\r\n
C: LLEN\r\n
C: $6\r\n
C: mylist\r\n
S: :48293\r\n
为了简单起见,我们使用换行符来分隔协议的不同部分(这里指上面的代码分行展示),但是实际交互的时候Redis客户端在发送*2\r\n4\r\nLLEN\r\n6\r\nmylist\r\n的时候是整体发送的。
Redis
客户端可以使用相同的连接发送批量命令。Redis
支持管道特性,因此Redis
客户端可以通过一次写操作发送多个命令,而无需在发送下一个命令之前读取Redis
服务端对上一个命令的回复。批量发送命令之后,所有的回复可以在最后得到(合并为一个回复)。更多相关信息可以查看Using pipelining to speedup Redis queries。
有些场景下,我们可能只有telnet
命令可以使用,在这种条件下,我们需要发送命令到Redis
服务端。尽管Redis
协议易于实现,但在交互式会话中并不理想,并且redis-cli
有些情况下不一定可用。处于这类原因,Redis
设计了一种专为人类设计的命令格式,称为内联命令(Inline Command
格式。
以下是服务器/客户端使用内联命令进行聊天的示例(S代表服务端,C代表客户端):
C: PING
S: +PONG
以下是使用内联命令返回整数的另一个示例:
C: EXISTS somekey
S: :0
基本上只需在telnet
会话中编写以空格分隔的参数。由于除了统一的请求协议之外没有命令会以*
开头,Redis
能够检测到这种情况并解析输入的命令。
因为JDK
原生提供的字节缓冲区java.nio.ByteBuffer
存在不能自动扩容、需要切换读写模式等等问题,这里直接引入Netty
并且使用Netty
提供的ByteBuf
进行RESP
数据类型解析。编写本文的时候(2019-10-09)Netty
的最新版本为4.1.42.Final
。引入依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.42.Final</version>
</dependency>
定义解码器接口:
public interface RespDecoder<V>{
V decode(ByteBuf buffer);
}
定义常量:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
下面的章节中解析模块的实现已经忽略第一个字节的解析,因为第一个字节是决定具体的数据类型。
简单字符串类型就是单行字符串,它的解析结果对应的就是Java
中的String
类型。解码器实现如下:
// 解析单行字符串
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 计算字节长度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
这里抽取出一个类LineStringDecoder
用于解析单行字符串,这样在解析错误消息的时候可以做一次继承即可。测试一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// +OK\r\n
buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:OK
错误消息的本质也是单行字符串,所以其解码的实现可以和简单字符串的解码实现一致。错误消息数据类型的解码器如下:
public class RespErrorDecoder extends LineStringDecoder {
}
测试一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// -ERR unknown command 'foobar'\r\n
buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
整型数字类型,本质就是需要从字节序列中还原出带符号的64bit
的长整型,因为是带符号的,类型标识位:
后的第一个字节需要判断是否负数字符-
,因为是从左向右解析,然后每解析出一个新的位,当前的数字值要乘10
。其解码器的实现如下:
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 没有行尾,异常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 负数
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
整型数字类型的解析相对复杂,一定要注意负数判断。测试一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// :-1000\r\n
buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
Long value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:-1000
定长字符串类型解析的关键是先读取类型标识符$
后的第一个字节序列分块解析成64bit
带符号的整数,用来确定后面需要解析的字符串内容的字节长度,然后再按照该长度读取后面的字节。其解码器实现如下:
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 使用RespIntegerDecoder读取长度
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真实字节内容的长度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
测试一下:
public static void main(String[] args) throws Exception{
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// $6\r\nthrowable\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:throwable
RESP
数组类型解析的关键:
*
后的第一个字节序列分块解析成64bit
带符号的整数,确定数组中的元素个数。参考过不少Redis
协议解析框架,不少是用栈或者状态机实现,这里先简单点用递归实现,解码器代码如下:
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素个数
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 递归
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
测试一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
List value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
对RESP
的内容和其编码解码的过程有相对深刻的认识后,就可以基于Netty
编写Redis
服务的编码解码模块,作为Netty
入门的十分有意义的例子。本文的最后一节只演示了RESP
的解码部分,编码模块和更多细节会在另一篇用Netty
实现Redis
客户端的文章中展示。
参考资料:
本文涉及的所有代码:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 计算字节长度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public interface RespCodec {
RespCodec X = DefaultRespCodec.X;
<IN, OUT> OUT decode(ByteBuf buffer);
<IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
X;
static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
static {
DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
}
@SuppressWarnings("unchecked")
@Override
public <IN, OUT> OUT decode(ByteBuf buffer) {
return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
}
private ReplyType determineReplyType(ByteBuf buffer) {
byte firstByte = buffer.readByte();
ReplyType replyType;
switch (firstByte) {
case RespConstants.PLUS_BYTE:
replyType = ReplyType.SIMPLE_STRING;
break;
case RespConstants.MINUS_BYTE:
replyType = ReplyType.ERROR;
break;
case RespConstants.COLON_BYTE:
replyType = ReplyType.INTEGER;
break;
case RespConstants.DOLLAR_BYTE:
replyType = ReplyType.BULK_STRING;
break;
case RespConstants.ASTERISK_BYTE:
replyType = ReplyType.RESP_ARRAY;
break;
default: {
throw new IllegalArgumentException("first byte:" + firstByte);
}
}
return replyType;
}
@Override
public <IN, OUT> ByteBuf encode(IN in) {
// TODO
throw new UnsupportedOperationException("encode");
}
}
public interface RespDecoder<V> {
V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
throw new IllegalStateException("decoder");
}
}
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 没有行尾,异常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 负数
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真实字节内容的长度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置读游标为\r\n之后的第一个字节
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素个数
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 递归
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
(本文完 e-a-20191009 c-2-d)