Skip to content

rpc

向uid绑定的所有在线client_id发起RPC远程过程调用,把第一个响应数据作为调用结果,同步返回给调用方。

  1. 此方法需要配合 bindUid 使用
  2. uid与client_id是一对多的关系,允许多个client_id绑定一个uid
  3. 如果uid绑定多个client_id,则多个client_id都会收到RPC调用请求
  4. 得益于go 语言的 Goroutine 轻量级线程,可以高效处理大量并发任务

使用步骤

  1. 客户端上线后,先使用 bindUid 将client_id与uid绑定
  2. 使用该接口发起对uid的RPC请求
  3. RPC调用的响应结果,会同步返回给调用方
  4. 内置超时机制(10秒未响应,则认为RPC调用超时)

应用场景

  1. 微服务架构
  2. 复杂业务逻辑
  3. 分布式计算
  4. 高性能低延迟
  5. 进程间通信
  6. 跨平台调用
  7. 简化开发与维护

接口URL

/api/rpc

请求方式

POST

Content-Type

application/json;charset=utf-8

普通模式(Token验证)

参数名示例值参数类型是否必填参数描述
tokendavidstring认证Token,即 config.json 文件中的Token
uid1string客户端绑定的UID

安全模式(签名验证)

参数名示例值参数类型是否必填参数描述
Timestampdavidint请求毫秒时间戳
Signaturedavidstring请求签名值签名算法
uid1string客户端绑定的UID

请求Body参数

JSON-RPC远程过程调用协议

text
消息内容为JSON-RPC协议序列化之后的纯文本JSON字符串,服务器不做任何处理,直接透传给客户端。

请求Body示例

json
{
  "_id": 1,
  "_method": "cus-sec_SpcGetEntName",
  "args": {}
}

响应示例

成功

json
{
  "code": 0,
  "message": "成功",
  "data": {
    "_id": 1,
    "_method": "cus-sec_SpcGetEntName",
    "_status": "00",
    "_args": {
      "Result": true,
      "Data": [
        "海南xx科技有限公司"
      ],
      "Error": []
    }
  }
}

失败

json
{
  "code": 116,
  "message": "缺少_id参数或_id不能为0"
}
json
{
  "code": 117,
  "message": "缺少_method参数"
}
json
{
  "code": 118,
  "message": "UID不在线或不存在"
}
json
{
  "code": 113,
  "message": "无法发送RPC请求,可能是通道已满或连接已关闭"
}
json
{
  "code": 114,
  "message": "RPC调用超时"
}

实现RPC的 go 源码

RPC调用相关代码
go
// HTTP处理函数,RPC调用接口
func rpcHandler(c *gin.Context) {
	uid := c.GetHeader("uid")
	if uid == "" {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingUIDHeader))
		return
	}

	body, err := c.GetRawData()
	if err != nil {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeReadBodyFailed))
		return
	}

	if len(body) == 0 {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeEmptyBody))
		return
	}

	req := &RPCRequest{}
	if err := json.Unmarshal(body, req); err != nil {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeInvalidParams))
		return
	}

	if req.ID == 0 {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingID))
		return
	}

	if req.Method == "" {
		c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingMethod))
		return
	}

	c.JSON(http.StatusOK, hub.callRPC(uid, body, req))
}

// callRPC 发起RPC调用并等待响应
func (h *Hub) callRPC(uid string, body []byte, req *RPCRequest) ResponseInfo {
	h.uidMapMutex.RLock()
	clientIDs, ok := h.uidMap[uid]
	h.uidMapMutex.RUnlock()
	if !ok || len(clientIDs) == 0 {
		return NewErrorResponse(ErrCodeUIDNotOnline)
	}

	// RCP等待通道,唯一key:uid + _id + _method
	rpcKey := fmt.Sprintf("%s_%d_%s", uid, req.ID, req.Method)
	rpcChan := make(chan interface{}, 1)

	h.rpcMutex.Lock()
	h.rpcCalls[rpcKey] = rpcChan
	h.rpcMutex.Unlock()

	defer func() {
		h.rpcMutex.Lock()
		delete(h.rpcCalls, rpcKey)
		h.rpcMutex.Unlock()
	}()

	sent := false
	h.clientsMutex.RLock()
	for _, clientID := range clientIDs {
		client, clientExists := h.clients[clientID]
		if !clientExists {
			continue
		}

		select {
		case client.Send <- body:
			sent = true
		default:
			// 发送失败,可能是通道已满或连接已关闭
		}
	}
	h.clientsMutex.RUnlock()

	if !sent {
		return NewErrorResponse(ErrCodeUnableToSendRPC)
	}

	// 等待响应或超时
	select {
	case result := <-rpcChan:
		return NewSuccessResponse(result)
	case <-time.After(10 * time.Second):
		return NewErrorResponse(ErrCodeRPCTimeout)
	}
}