通信协议
说明
本节旨在阐述SCHAT服务框架所采用的序列化协议方案。基于其核心功能特点,该协议系统可划分为两大类。
- 服务器进程之间通信协议
- 服务器与客户端通信协议
服务进程通信协议
服务器内部进程通常会部署于同一个IDC内,在线内网之间的通信通常具有较高的稳定性,并且运行效率较高。Protobuf被用作一种详细的协议描述以及数据序列化工具,在性能方面表现优异,并且兼容性良好。由于官方对于Go语言也提供了良好的支持,在此之前我们已经介绍了相关的具体实现细节以及相关技术背景。
描述
下面摘自 schat/proto/ss/ss.proto 文档,描述了大概的协议轮廓:
syntax="proto3";
package ss;
import "basic.proto";
import "chat.proto";
import "disp.proto";
import "user_info.proto";
/*
Server-Server Proto
*/
//proto type
enum SS_PROTO_TYPE {
HEART_BEAT_REQ = 0;
HEART_BEAT_RSP = 1;
PING_REQ = 2;
PING_RSP = 3;
LOGIN_REQ = 4;
LOGIN_RSP = 5;
LOGOUT_REQ = 6;
LOGOUT_RSP = 7;
REG_REQ = 8;
REG_RSP = 9;
CREATE_GROUP_REQ = 12;
CREATE_GROUP_RSP = 13;
USE_DISP_PROTO = 14; // serv <--> disp <--> serv
APPLY_GROUP_REQ = 16;
...
}
就包含了内部通信的宏观定义。其详细的通信架构则完整地体现在SSMSG之中。
//main msg
message SSMsg {
SS_PROTO_TYPE proto_type = 1;
oneof msg_body {
MsgHeartBeatReq heart_beat_req = 20;
MsgPingReq ping_req = 22;
MsgPingRsp ping_rsp = 23;
MsgLoginReq login_req = 24;
MsgLoginRsp login_rsp = 25;
MsgLogoutReq logout_req = 26;
MsgLogoutRsp logout_rsp = 27;
MsgRegReq reg_req = 28;
MsgRegRsp reg_rsp = 29;
MsgCreateGrpReq create_group_req = 30;
MsgCreateGrpRsp create_group_rsp = 31;
MsgDisp msg_disp = 32;
MsgApplyGroupReq apply_group_req = 34;
MsgApplyGroupRsp apply_group_rsp = 35;
MsgApplyGroupNotify apply_group_notify = 36;
MsgApplyGroupAudit apply_group_audit = 37;
MsgFetchApplyGrpReq fetch_apply_req = 38;
MsgFetchApplyGrpRsp fetch_apply_rsp = 39;
MsgCommonNotify common_notify = 40;
...
}
}
基于前面所述,在本次开发中完整地制定了内部通信协议的结构体,并可通过执行命令protoc --go_out=. ss.proto能够自动生成相应的Go描述文件
序列化 源代码位于文件s cath/proto/ss/api.go中包含了对协议的序列化与反序列化的接口功能。
const (
MAX_SS_MSG_SIZE=(200*1024) //200k
)
func Pack(i interface{}) ([]byte, error) {
switch i.(type) {
case proto.Message:
m , _ := i.(proto.Message);
return proto.Marshal(m);
default:
break;
}
return nil , errors.New("this type not support!");
}
func UnPack(b []byte, i interface{}) error {
fmt.Printf("try to unpack...\n");
switch i.(type) {
case proto.Message:
m , _ := i.(proto.Message);
return proto.Unmarshal(b, m);
default:
break;
}
return errors.New("this type not support");
}
Pack接口负责处理将基于proto协议生成的SSMsg结构转换为[]byte,并将其发送到网络中。另一方面,UnPack接口负责从[]byte字节流中解码出SSMsg结构。
下面源自schat/servers/connServ/lib/sendMsg.go展示了序列化并发送数据的过程
func SendToServ(pconfig *Config, target_serv int, pss_msg *ss.SSMsg) bool {
var _func_ = "<SendToServ>"
log := pconfig.Comm.Log
proc := pconfig.Comm.Proc
//pack
buff, err := ss.Pack(pss_msg)
if err != nil {
log.Err("%s pack failed! proto:%d err:%v", _func_, pss_msg.ProtoType, err)
return false
}
//send
ret := proc.Send(target_serv, buff, len(buff))
if ret < 0 {
log.Err("%s to %d failed! ret:%d", _func_, target_serv, ret)
return false
}
return true
}
以下内容来自schat/servers/conn_serv/lib/recv_msg.go,并展示了接收数据后进行反序列化的流程
func RecvMsg(pconfig *Config) int64 {
var _func_ = "RecvMsg"
var start_time int64
//init
pmsg.msg = pmsg.msg[0:cap(pmsg.msg)]
var recv int
var log = pconfig.Comm.Log
var proc = pconfig.Comm.Proc
var msg = pmsg.msg
var handle_pkg = 0
start_time = time.Now().UnixNano()
//keep recving
for {
msg = msg[0:cap(msg)]
recv = proc.Recv(msg, cap(msg), &(pmsg.sender))
if recv < 0 { //no package
break
}
handle_pkg++
//unpack
msg = msg[0:recv]
var ss_msg = new(ss.SSMsg)
err := ss.UnPack(msg, ss_msg)
if err != nil {
log.Err("unpack failed! err:%v", err)
continue
}
//log.Debug("unpack success! v:%v and %v" , *ss_req , *(ss_req.GetHeartBeat()));
//dispatch
switch ss_msg.ProtoType {
case ss.SS_PROTO_TYPE_HEART_BEAT_REQ:
RecvHeartBeatReq(pconfig, ss_msg.GetHeartBeatReq(), pmsg.sender)
case ss.SS_PROTO_TYPE_PING_RSP:
RecvPingRsp(pconfig, ss_msg.GetPingRsp())
case ss.SS_PROTO_TYPE_LOGIN_RSP:
RecvLoginRsp(pconfig, ss_msg.GetLoginRsp())
case ss.SS_PROTO_TYPE_LOGOUT_RSP:
RecvLogoutRsp(pconfig, ss_msg.GetLogoutRsp())
...
}
...
}
其中使用的proc作为进程间通信设施在后面会有介绍
服务客户端通信协议
服务器与客户端之间通信以JSON方式进行协议描述和序列化。
描述
下面摘自 schat/proto/cs/api.go 描述了大概的协议轮廓:
/*
This is a cs-proto using json format
*/
/*
CS PROTO ID
*/
const (
//proto start
CS_PROTO_START = 0
CS_PROTO_PING_REQ = 1
CS_PROTO_PING_RSP = 2
CS_PROTO_LOGIN_REQ = 3
CS_PROTO_LOGIN_RSP = 4
CS_PROTO_LOGOUT_REQ = 5
CS_PROTO_LOGOUT_RSP = 6
CS_PROTO_REG_REQ = 7
CS_PROTO_REG_RSP = 8
CS_PROTO_CREATE_GRP_REQ = 9
CS_PROTO_CREATE_GRP_RSP = 10
...
上面涵盖了CS通信的宏定义。所有具体的通信结构都嵌入在GeneralMsg中,请参考如附图所示的内容。
/* * GeneralMsg
*/
type GeneralMsg struct {
ProtoId int `json:"proto"`
SubMsg interface{} `json:"sub"`
}
type ProtoHead struct {
ProtoId int `json:"proto"`
Sub interface{} `json:"-"`
}
GernelMsg作为一种通用结构,在设计时考虑了多种应用场景,并且其内部协议SubMsg需要基于ProtoId进行相应的配置设置。在以下序列化桥接部分可以看到各个功能相关的子协议定义在schat/proto/cs/xx.proto.go中,并且每个模块都有详细的实现逻辑支持。例如chat.proto.go:中的实现就很好地体现了这一设计思想。
//create group
type CSCreateGroupReq struct {
Name string `json:"name"`
Pass string `json:"pass"`
Desc string `json:"desc"`
}
type CSCreateGroupRsp struct {
Result int `json:"result"`
GrpId int64 `json:"grp_id"`
Name string `json:"name"`
MemberCnt int `json:"member_count"`
CreateTs int64 `json:"create_ts"`
Desc string `json:"desc"`
}
//apply group
type CSApplyGroupReq struct {
GrpId int64 `json:"grp_id"`
GrpName string `json:"grp_name"`
Pass string `json:"pass"`
Msg string `json:"msg"`
}
type CSApplyGroupRsp struct {
Result int `json:"result"`
GrpId int64 `json:"grp_id"`
GrpName string `json:"grp_name"`
Flag int `json:"flag"` //0:normal 1:master activate invite
}
...
这些各类req,rsp对应了GeneralMsg.SubMsg
代码库_schat/proto/cs/api.go_中包含协议的规范序列化与反序列化功能模块:
/* * Encode GeneralMsg
* @return encoded_bytes , error
*/
func EncodeMsg(pmsg *GeneralMsg) ([]byte, error) {
//proto
if pmsg.ProtoId <= CS_PROTO_START || pmsg.ProtoId >= CS_PROTO_END {
return nil, errors.New("proto_id illegal")
}
//encode
return json.Marshal(pmsg)
}
/* * Decode GeneralMsg
* @return
*/
func DecodeMsg(data []byte, pmsg *GeneralMsg) error {
var proto_head ProtoHead
var err error
//decode proto
err = json.Unmarshal(data, &proto_head)
if err != nil {
return err
}
//switch proto
proto_id := proto_head.ProtoId
psub, err := Proto2Msg(proto_id)
if err != nil {
return err
}
pmsg.SubMsg = psub
//decode
err = json.Unmarshal(data, pmsg)
if err != nil {
return err
}
return nil
}
该算法中对消息进行处理的方式是:使用JSON编码对GeneralMsg进行分块处理;接收端则通过解码将接收到的消息块还原回GeneralMsg对象。在解码过程中,默认行为是调用Proto2Msg解析函数处理协议ID及相关数据结构;若未指定相关配置,则系统默认将所有子消息字段设为空接口状态以避免数据丢失或误报。其中Proto2Mgr解析函数定义如下:
/* * Get ProtoMsg By Proto
*/
func Proto2Msg(proto_id int) (interface{}, error) {
var pmsg interface{}
//refer
switch proto_id {
case CS_PROTO_PING_REQ:
pmsg = new(CSPingReq)
case CS_PROTO_PING_RSP:
pmsg = new(CSPingRsp)
case CS_PROTO_LOGIN_REQ:
pmsg = new(CSLoginReq)
case CS_PROTO_LOGIN_RSP:
pmsg = new(CSLoginRsp)
case CS_PROTO_LOGOUT_REQ:
pmsg = new(CSLogoutReq)
..
}
}
只是一个简单的映射关系
本节主要展示了如何从客户端接收数据,并进行了反序列化处理。
//C-->S Decode and Handle Msg
func HandleClientPkg(pconfig *Config, pclient *comm.ClientPkg) {
var _func_ = "<HandleClientPkg>"
var gmsg cs.GeneralMsg
var err error
var new_reader bool = true
log := pconfig.Comm.Log
...
client_data := pclient.Data
...
//decode msg
err = cs.DecodeMsg(client_data, &gmsg)
if err != nil {
log.Err("%s decode msg failed! err:%v", _func_, err)
return
}
proto_id := gmsg.ProtoId
var conv_err = true
//Get Uid
uid := GetClientUid(pconfig, pclient.ClientKey)
//convert
switch proto_id {
case cs.CS_PROTO_PING_REQ:
pmsg, ok := gmsg.SubMsg.(*cs.CSPingReq)
if ok {
log.Debug("%s recv ping success! v:%v", _func_, *pmsg)
SendPingReq(pconfig, pclient.ClientKey, pmsg)
conv_err = false
}
case cs.CS_PROTO_LOGIN_REQ:
pmsg, ok := gmsg.SubMsg.(*cs.CSLoginReq)
if ok {
log.Debug("%s recv proto:%d success! v:%v", _func_, proto_id, *pmsg)
SendLoginReq(pconfig, pclient.ClientKey, pmsg)
conv_err = false
}
case cs.CS_PROTO_LOGOUT_REQ:
_, ok := gmsg.SubMsg.(*cs.CSLogoutReq)
if ok {
SendLogoutReq(pconfig, uid, ss.USER_LOGOUT_REASON_LOGOUT_CLIENT_EXIT)
conv_err = false
}
...
}
}
将取到的CS协议ID进行相应的分发.
以下是来自schat/servers/conn_serv/lib/clients.go的一段代码 snippet 展示了服务端如何将数据序列化并发送给客户端
/*Encode And Send to Client
* @proto: cs proto
* @pmsg : msg from cs.Proto2Msg
*/
func SendToClient(pconfig *Config, client_key int64, proto int, pmsg interface{}) bool {
var _func_ = "<SendToClient>"
log := pconfig.Comm.Log
var gmsg cs.GeneralMsg
//check proto
if proto <= cs.CS_PROTO_START || proto >= cs.CS_PROTO_END {
log.Err("%s failed! proto:%d illegal!", _func_, proto)
return false
}
//fulfill gmsg
gmsg.ProtoId = proto
gmsg.SubMsg = pmsg
//enc msg
enc_data, err := cs.EncodeMsg(&gmsg)
if err != nil {
log.Err("%s encode msg failed! key:%v err:%v", _func_, client_key, err)
return false
}
log.Debug("%s len:%d" , _func_ , len(enc_data))
//zlib
if pconfig.FileConfig.ZlibOn == 1 {
pzenv.c_bf.Reset()
pzenv.c_w.Reset(&pzenv.c_bf)
pzenv.c_w.Write(enc_data)
pzenv.c_w.Flush()
enc_data = pzenv.c_bf.Bytes()
/*
var b bytes.Buffer;
w := zlib.NewWriter(&b);
w.Write(enc_data);
w.Close();
enc_data = b.Bytes();*/
}
//make pkg
pclient := &pkg_buff.send_pkg
pclient.PkgType = comm.CLIENT_PKG_T_NORMAL
pclient.ClientKey = client_key
pclient.Data = enc_data
//Send
ret := pconfig.TcpServ.Send(pconfig.Comm, pclient)
if ret < 0 {
log.Err("%s send to client ret:%d len:%d ", _func_, ret, len(enc_data))
return false
}
return true
}
其中pmsg就是定义在cs/xx.proto.go中的各客户端子结构
