Advertisement

通信协议

阅读量:

说明

本节旨在阐述SCHAT服务框架所采用的序列化协议方案。基于其核心功能特点,该协议系统可划分为两大类。

  • 服务器进程之间通信协议
  • 服务器与客户端通信协议
服务进程通信协议

服务器内部进程通常会部署于同一个IDC内,在线内网之间的通信通常具有较高的稳定性,并且运行效率较高。Protobuf被用作一种详细的协议描述以及数据序列化工具,在性能方面表现优异,并且兼容性良好。由于官方对于Go语言也提供了良好的支持,在此之前我们已经介绍了相关的具体实现细节以及相关技术背景。

描述
下面摘自 schat/proto/ss/ss.proto 文档,描述了大概的协议轮廓:

复制代码
    syntax="proto3";

    package ss;
    
    import "basic.proto";
    import "chat.proto";
    import "disp.proto"; 
    import "user_info.proto";
    /*
    Server-Server Proto
    */
    
    //proto type
    enum SS_PROTO_TYPE {
    HEART_BEAT_REQ = 0;
    HEART_BEAT_RSP = 1;
    PING_REQ = 2;
    PING_RSP = 3;
    LOGIN_REQ = 4;
    LOGIN_RSP = 5;
    LOGOUT_REQ = 6;
    LOGOUT_RSP = 7;
    REG_REQ = 8;
    REG_RSP = 9;
    CREATE_GROUP_REQ = 12;
    CREATE_GROUP_RSP = 13;
    USE_DISP_PROTO = 14;  // serv <--> disp <--> serv
    APPLY_GROUP_REQ = 16;
    ...
    }

就包含了内部通信的宏观定义。其详细的通信架构则完整地体现在SSMSG之中。

复制代码
    //main msg

    message SSMsg {
    SS_PROTO_TYPE proto_type = 1;
    oneof msg_body {
      MsgHeartBeatReq heart_beat_req = 20;       
      MsgPingReq ping_req = 22;
      MsgPingRsp ping_rsp = 23;
      MsgLoginReq login_req = 24;
      MsgLoginRsp login_rsp = 25;
      MsgLogoutReq logout_req = 26;
      MsgLogoutRsp logout_rsp = 27;
      MsgRegReq reg_req = 28;
      MsgRegRsp reg_rsp = 29;	
      MsgCreateGrpReq create_group_req = 30;
      MsgCreateGrpRsp create_group_rsp = 31;
      MsgDisp  msg_disp = 32;
      MsgApplyGroupReq apply_group_req = 34;
      MsgApplyGroupRsp apply_group_rsp = 35;
      MsgApplyGroupNotify apply_group_notify = 36;
      MsgApplyGroupAudit  apply_group_audit = 37;
      MsgFetchApplyGrpReq fetch_apply_req = 38;
      MsgFetchApplyGrpRsp fetch_apply_rsp = 39;
      MsgCommonNotify   common_notify = 40;
      ...
      }
     }

基于前面所述,在本次开发中完整地制定了内部通信协议的结构体,并可通过执行命令protoc --go_out=. ss.proto能够自动生成相应的Go描述文件

序列化 源代码位于文件s cath/proto/ss/api.go中包含了对协议的序列化与反序列化的接口功能。

复制代码
    const (

     MAX_SS_MSG_SIZE=(200*1024) //200k
    )
    
    
    func Pack(i interface{}) ([]byte, error) {	  
       switch i.(type) {
     	  case proto.Message:
     		  m , _ := i.(proto.Message);
     		  return proto.Marshal(m);
     	
     	  default:
     		  break;	
       }
     
       return nil , errors.New("this type not support!");
    }
    
    
    func UnPack(b []byte, i interface{}) error {
     fmt.Printf("try to unpack...\n");
     switch i.(type) {
     	case proto.Message:
     	    m , _ := i.(proto.Message);
     	    return proto.Unmarshal(b, m);
     	default:
     		break;    
       }
     
       return errors.New("this type not support");
     }

Pack接口负责处理将基于proto协议生成的SSMsg结构转换为[]byte,并将其发送到网络中。另一方面,UnPack接口负责从[]byte字节流中解码出SSMsg结构。

下面源自schat/servers/connServ/lib/sendMsg.go展示了序列化并发送数据的过程

复制代码
     func SendToServ(pconfig *Config, target_serv int, pss_msg *ss.SSMsg) bool {

     var _func_ = "<SendToServ>"
     log := pconfig.Comm.Log
     proc := pconfig.Comm.Proc
    
     //pack
     buff, err := ss.Pack(pss_msg)
     if err != nil {
     log.Err("%s pack failed! proto:%d err:%v", _func_, pss_msg.ProtoType, err)
     return false
     }
     //send
     ret := proc.Send(target_serv, buff, len(buff))
     if ret < 0 {
     log.Err("%s to %d failed! ret:%d", _func_, target_serv, ret)
     return false
     }
     return true
     }

以下内容来自schat/servers/conn_serv/lib/recv_msg.go,并展示了接收数据后进行反序列化的流程

复制代码
    func RecvMsg(pconfig *Config) int64 {

    
    var _func_ = "RecvMsg"
    var start_time int64
    
    //init
    pmsg.msg = pmsg.msg[0:cap(pmsg.msg)]
    var recv int
    var log = pconfig.Comm.Log
    var proc = pconfig.Comm.Proc
    var msg = pmsg.msg
    var handle_pkg = 0
    
    start_time = time.Now().UnixNano()
    //keep recving
    for {
       msg = msg[0:cap(msg)]
       recv = proc.Recv(msg, cap(msg), &(pmsg.sender))
       if recv < 0 { //no package
       	break
       }
    
       handle_pkg++
       //unpack
       msg = msg[0:recv]
       var ss_msg = new(ss.SSMsg)
       err := ss.UnPack(msg, ss_msg)
       if err != nil {
       	log.Err("unpack failed! err:%v", err)
       	continue
       }
       //log.Debug("unpack success! v:%v and %v" , *ss_req , *(ss_req.GetHeartBeat()));
    
       //dispatch
       switch ss_msg.ProtoType {
       case ss.SS_PROTO_TYPE_HEART_BEAT_REQ:
       	RecvHeartBeatReq(pconfig, ss_msg.GetHeartBeatReq(), pmsg.sender)
       case ss.SS_PROTO_TYPE_PING_RSP:
       	RecvPingRsp(pconfig, ss_msg.GetPingRsp())
       case ss.SS_PROTO_TYPE_LOGIN_RSP:
       	RecvLoginRsp(pconfig, ss_msg.GetLoginRsp())
       case ss.SS_PROTO_TYPE_LOGOUT_RSP:
       	RecvLogoutRsp(pconfig, ss_msg.GetLogoutRsp())
       ...
       }
       ...
    }		

其中使用的proc作为进程间通信设施在后面会有介绍

服务客户端通信协议

服务器与客户端之间通信以JSON方式进行协议描述和序列化。

描述
下面摘自 schat/proto/cs/api.go 描述了大概的协议轮廓:

复制代码
    /*

    This is a cs-proto using json format
    */
    /*
    CS PROTO ID
    */
    const (
    //proto start
    CS_PROTO_START      = 0
    CS_PROTO_PING_REQ   = 1
    CS_PROTO_PING_RSP   = 2
    CS_PROTO_LOGIN_REQ  = 3
    CS_PROTO_LOGIN_RSP  = 4
    CS_PROTO_LOGOUT_REQ = 5
    CS_PROTO_LOGOUT_RSP = 6
    CS_PROTO_REG_REQ    = 7
    CS_PROTO_REG_RSP    = 8
    CS_PROTO_CREATE_GRP_REQ = 9
    CS_PROTO_CREATE_GRP_RSP  = 10
    ...

上面涵盖了CS通信的宏定义。所有具体的通信结构都嵌入在GeneralMsg中,请参考如附图所示的内容。

复制代码
    /* * GeneralMsg
     */
    type GeneralMsg struct {
    ProtoId int         `json:"proto"`
    SubMsg  interface{} `json:"sub"`
    }   
    
    type ProtoHead struct {
    ProtoId int         `json:"proto"`
    Sub     interface{} `json:"-"`
    }

GernelMsg作为一种通用结构,在设计时考虑了多种应用场景,并且其内部协议SubMsg需要基于ProtoId进行相应的配置设置。在以下序列化桥接部分可以看到各个功能相关的子协议定义在schat/proto/cs/xx.proto.go中,并且每个模块都有详细的实现逻辑支持。例如chat.proto.go:中的实现就很好地体现了这一设计思想。

复制代码
    //create group

    type CSCreateGroupReq struct {
      Name string `json:"name"`
      Pass string `json:"pass"`
      Desc string `json:"desc"`
    }
    
    type CSCreateGroupRsp struct {
      Result int `json:"result"`
      GrpId int64 `json:"grp_id"`
      Name string `json:"name"`
      MemberCnt int `json:"member_count"`
      CreateTs int64 `json:"create_ts"`
      Desc string `json:"desc"`
    }
    
    //apply group
    type CSApplyGroupReq struct {
      GrpId   int64  `json:"grp_id"`
      GrpName string `json:"grp_name"`
      Pass    string `json:"pass"`
      Msg     string `json:"msg"`
    }
    
    type CSApplyGroupRsp struct {
      Result  int `json:"result"`
      GrpId   int64  `json:"grp_id"`
      GrpName string `json:"grp_name"`
      Flag      int    `json:"flag"` //0:normal 1:master activate invite
    }
    ...

这些各类req,rsp对应了GeneralMsg.SubMsg

代码库_schat/proto/cs/api.go_中包含协议的规范序列化与反序列化功能模块:

复制代码
    /* * Encode GeneralMsg
    * @return encoded_bytes , error
     */
    func EncodeMsg(pmsg *GeneralMsg) ([]byte, error) {
      //proto
      if pmsg.ProtoId <= CS_PROTO_START || pmsg.ProtoId >= CS_PROTO_END {
      	return nil, errors.New("proto_id illegal")
      }
    
      //encode
      return json.Marshal(pmsg)
    }
    
    /* * Decode GeneralMsg
    * @return
     */
    func DecodeMsg(data []byte, pmsg *GeneralMsg) error {
      var proto_head ProtoHead
      var err error
    
      //decode proto
      err = json.Unmarshal(data, &proto_head)
      if err != nil {
      	return err
      }
    
      //switch proto
      proto_id := proto_head.ProtoId
      psub, err := Proto2Msg(proto_id)
      if err != nil {
      	return err
      }
      pmsg.SubMsg = psub
    
      //decode
      err = json.Unmarshal(data, pmsg)
      if err != nil {
      	return err
      }
    
      return nil
    }

该算法中对消息进行处理的方式是:使用JSON编码对GeneralMsg进行分块处理;接收端则通过解码将接收到的消息块还原回GeneralMsg对象。在解码过程中,默认行为是调用Proto2Msg解析函数处理协议ID及相关数据结构;若未指定相关配置,则系统默认将所有子消息字段设为空接口状态以避免数据丢失或误报。其中Proto2Mgr解析函数定义如下:

复制代码
      /* * Get ProtoMsg By Proto
     */
    func Proto2Msg(proto_id int) (interface{}, error) {
      var pmsg interface{}
    
      //refer
      switch proto_id {
      case CS_PROTO_PING_REQ:
      	pmsg = new(CSPingReq)
      case CS_PROTO_PING_RSP:
      	pmsg = new(CSPingRsp)
      case CS_PROTO_LOGIN_REQ:
      	pmsg = new(CSLoginReq)
      case CS_PROTO_LOGIN_RSP:
      	pmsg = new(CSLoginRsp)
      case CS_PROTO_LOGOUT_REQ:
      	pmsg = new(CSLogoutReq)
      ..
    }
    }	

只是一个简单的映射关系

本节主要展示了如何从客户端接收数据,并进行了反序列化处理。

复制代码
    //C-->S Decode and Handle Msg

    func HandleClientPkg(pconfig *Config, pclient *comm.ClientPkg) {
      var _func_ = "<HandleClientPkg>"
      var gmsg cs.GeneralMsg
      var err error
      var new_reader bool = true
      log := pconfig.Comm.Log
      ...
      client_data := pclient.Data
      ...
      //decode msg	
      err = cs.DecodeMsg(client_data, &gmsg)
      if err != nil {
      	log.Err("%s decode msg failed! err:%v", _func_, err)
      	return
      }
    
      proto_id := gmsg.ProtoId
      var conv_err = true
      //Get Uid
      uid := GetClientUid(pconfig, pclient.ClientKey)
      //convert
      switch proto_id {
      case cs.CS_PROTO_PING_REQ:
      	pmsg, ok := gmsg.SubMsg.(*cs.CSPingReq)
      	if ok {
      		log.Debug("%s recv ping success! v:%v", _func_, *pmsg)
      		SendPingReq(pconfig, pclient.ClientKey, pmsg)
      		conv_err = false
      	}
      case cs.CS_PROTO_LOGIN_REQ:
      	pmsg, ok := gmsg.SubMsg.(*cs.CSLoginReq)
      	if ok {
      		log.Debug("%s recv proto:%d success! v:%v", _func_, proto_id, *pmsg)
      		SendLoginReq(pconfig, pclient.ClientKey, pmsg)
      		conv_err = false
      	}
      case cs.CS_PROTO_LOGOUT_REQ:
      	_, ok := gmsg.SubMsg.(*cs.CSLogoutReq)
      	if ok {
      		SendLogoutReq(pconfig, uid, ss.USER_LOGOUT_REASON_LOGOUT_CLIENT_EXIT)
      		conv_err = false
      	}
      	...
      	}
      }	

将取到的CS协议ID进行相应的分发.

以下是来自schat/servers/conn_serv/lib/clients.go的一段代码 snippet 展示了服务端如何将数据序列化并发送给客户端

复制代码
      /*Encode And Send to Client

      * @proto: cs proto
     * @pmsg : msg from cs.Proto2Msg
     */
    func SendToClient(pconfig *Config, client_key int64, proto int, pmsg interface{}) bool {
      var _func_ = "<SendToClient>"
      log := pconfig.Comm.Log
      var gmsg cs.GeneralMsg
    
      //check proto
      if proto <= cs.CS_PROTO_START || proto >= cs.CS_PROTO_END {
      	log.Err("%s failed! proto:%d illegal!", _func_, proto)
      	return false
      }
    
      //fulfill gmsg
      gmsg.ProtoId = proto
      gmsg.SubMsg = pmsg
    
      //enc msg
      enc_data, err := cs.EncodeMsg(&gmsg)
      if err != nil {
      	log.Err("%s encode msg failed! key:%v err:%v", _func_, client_key, err)
      	return false
      }
      log.Debug("%s  len:%d" , _func_  , len(enc_data))
    
      //zlib
      if pconfig.FileConfig.ZlibOn == 1 {
      	pzenv.c_bf.Reset()
      	pzenv.c_w.Reset(&pzenv.c_bf)
      	pzenv.c_w.Write(enc_data)
      	pzenv.c_w.Flush()
      	enc_data = pzenv.c_bf.Bytes()
      	/*
      		var b bytes.Buffer;
      		w := zlib.NewWriter(&b);
      		w.Write(enc_data);
      		w.Close();
      		enc_data =  b.Bytes();*/
      }
    
      //make pkg
      pclient := &pkg_buff.send_pkg
      pclient.PkgType = comm.CLIENT_PKG_T_NORMAL
      pclient.ClientKey = client_key
      pclient.Data = enc_data
    
      //Send
      ret := pconfig.TcpServ.Send(pconfig.Comm, pclient)
      if ret < 0 {
      	log.Err("%s send to client ret:%d len:%d  ", _func_, ret, len(enc_data))
      	return false
      }
      return true
    }

其中pmsg就是定义在cs/xx.proto.go中的各客户端子结构

全部评论 (0)

还没有任何评论哟~