實現 Redis 協議解析器
本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹 以及 的實現,若您對協議有所了解可以直接閱讀協議解析器部分。
Redis 通信協議
Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機可以高效的進行解析且易於被人類讀懂。
RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。客戶端和服務器發送的命令或數據一律以 \r\n
(CRLF)結尾。
RESP 定義了5種格式:
- 簡單字符串(Simple String): 服務器用來返回簡單的結果,比如”OK”。非二進制安全,且不允許換行。
- 錯誤信息(Error): 服務器用來返回簡單的結果,比如”ERR Invalid Synatx”。非二進制安全,且不允許換行。
- 整數(Integer):
llen
、scard
等命令的返回值, 64位有符號整數 - 字符串(Bulk String): 二進制安全字符串,
get
等命令的返回值 - 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及
lrange
等命令響應的格式
RESP 通過第一個字符來表示格式:
- 簡單字符串:以”+” 開始, 如:”+OK\r\n”
- 錯誤:以”-” 開始,如:”-ERR Invalid Synatx\r\n”
- 整數:以”:”開始,如:”:1\r\n”
- 字符串:以
$
開始 - 數組:以
*
開始
Bulk String有兩行,第一行為 $
+正文長度,第二行為實際內容。如:
$3\r\nSET\r\n
Bulk String 是二進制安全的可以包含任意字節,就是說可以在 Bulk String 內部包含 “\r\n” 字符(行尾的CRLF被隱藏):
$4
a\r\nb
$-1
表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1
。
Array 格式第一行為 “*”+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]
的報文:
*2
$3
foo
$3
bar
客戶端也使用 Array 格式向服務端發送指令。命令本身將作為第一個參數,如 SET key value
指令的RESP報文:
*3
$3
SET
$3
key
$5
value
將換行符打印出來:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
協議解析器
我們在 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。
協議解析器將接收 Socket 傳來的數據,並將其數據還原為 [][]byte
格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"
將被還原為 ['SET', 'key', 'value']
。
本文完整代碼:
來自客戶端的請求均為數組格式,它在第一行中標記報文的總行數並使用CRLF
作為分行符。
bufio
標準庫可以將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n')
來保證每次讀取到完整的一行。
需要注意的是RESP是二進制安全
的協議,它允許在正文中使用CRLF
字符。舉例來說 Redis 可以正確接收並執行SET "a\r\nb" 1
指令, 這條指令的正確報文是這樣的:
*3
$3
SET
$4
a\r\nb
$7
myvalue
當 ReadBytes
讀取到第五行 “a\r\nb\r\n”時會將其誤認為兩行:
*3
$3
SET
$4
a // 錯誤的分行
b // 錯誤的分行
$7
myvalue
因此當讀取到第四行$4
后, 不應該繼續使用 ReadBytes('\n')
讀取下一行, 應使用 io.ReadFull(reader, msg)
方法來讀取指定長度的內容。
msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)
定義 Client
結構體作為客戶端抽象:
type Client struct {
/* 與客戶端的 Tcp 連接 */
conn net.Conn
/*
* 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
* 當響應被完整發送前保持 waiting 狀態, 阻止鏈接被關閉
*/
waitingReply wait.Wait
/* 標記客戶端是否正在發送指令 */
sending atomic.AtomicBool
/* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
expectedArgsCount uint32
/* 已經接收的參數數量, 即 len(args)*/
receivedCount uint32
/*
* 已經接收到的命令參數,每個參數由一個 []byte 表示
*/
args [][]byte
}
定義解析器:
type Handler struct {
/*
* 記錄活躍的客戶端鏈接
* 類型為 *Client -> placeholder
*/
activeConn sync.Map
/* 數據庫引擎,執行指令並返回結果 */
db db.DB
/* 關閉狀態標誌位,關閉過程中時拒絕新建連接和新請求 */
closing atomic.AtomicBool
}
接下來可以編寫主要部分了:
func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// 關閉過程中不接受新連接
_ = conn.Close()
}
/* 初始化客戶端狀態 */
client := &Client {
conn: conn,
}
h.activeConn.Store(client, 1)
reader := bufio.NewReader(conn)
var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
var err error
var msg []byte
for {
/* 讀取下一行數據 */
if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
msg, err = reader.ReadBytes('\n')
// 判斷是否以 \r\n 結尾
if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
} else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
msg = make([]byte, fixedLen + 2)
_, err = io.ReadFull(reader, msg)
// 判斷是否以 \r\n 結尾
if len(msg) == 0 ||
msg[len(msg) - 2] != '\r' ||
msg[len(msg) - 1] != '\n'{
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
// Bulk String 讀取完畢,重新使用正常模式
fixedLen = 0
}
// 處理 IO 異常
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
logger.Info("connection close")
} else {
logger.Warn(err)
}
_ = client.Close()
h.activeConn.Delete(client)
return // io error, disconnect with client
}
/* 解析收到的數據 */
if !client.sending.Get() {
// sending == false 表明收到了一條新指令
if msg[0] == '*' {
// 讀取第一行獲取參數個數
expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
_, _ = client.conn.Write(UnknownErrReplyBytes)
continue
}
// 初始化客戶端狀態
client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
client.sending.Set(true) // 正在接收指令中
// 初始化計數器和緩衝區
client.expectedArgsCount = uint32(expectedLine)
client.receivedCount = 0
client.args = make([][]byte, expectedLine)
} else {
// TODO: text protocol
}
} else {
// 收到了指令的剩餘部分(非首行)
line := msg[0:len(msg)-2] // 移除換行符
if line[0] == '$' {
// BulkString 的首行,讀取String長度
fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
errReply := &reply.ProtocolErrReply{Msg:err.Error()}
_, _ = client.conn.Write(errReply.ToBytes())
}
if fixedLen <= 0 {
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
} else {
// 收到參數
client.args[client.receivedCount] = line
client.receivedCount++
}
// 一條命令發送完畢
if client.receivedCount == client.expectedArgsCount {
client.sending.Set(false)
// 執行命令並響應
result := h.db.Exec(client.args)
if result != nil {
_, _ = conn.Write(result.ToBytes())
} else {
_, _ = conn.Write(UnknownErrReplyBytes)
}
// 重置客戶端狀態,等待下一條指令
client.expectedArgsCount = 0
client.receivedCount = 0
client.args = nil
client.waitingReply.Done()
}
}
}
}
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】
※網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益
※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象