rpc 新
向uid绑定的所有在线client_id发起RPC远程过程调用,把第一个响应数据作为调用结果,同步返回给调用方。
- 此方法需要配合 bindUid 使用
- uid与client_id是一对多的关系,允许多个client_id绑定一个uid
- 如果uid绑定多个client_id,则多个client_id都会收到RPC调用请求
- 得益于
go
语言的Goroutine
轻量级线程,可以高效处理大量并发任务
使用步骤
- 客户端上线后,先使用 bindUid 将client_id与uid绑定
- 使用该接口发起对uid的RPC请求
- RPC调用的响应结果,会同步返回给调用方
- 内置超时机制(10秒未响应,则认为RPC调用超时)
应用场景
- 微服务架构
- 复杂业务逻辑
- 分布式计算
- 高性能低延迟
- 进程间通信
- 跨平台调用
- 简化开发与维护
接口URL
/api/rpc
请求方式
POST
Content-Type
application/json;charset=utf-8
请求Header参数
普通模式(Token验证)
参数名 | 示例值 | 参数类型 | 是否必填 | 参数描述 |
---|---|---|---|---|
token | david | string | 是 | 认证Token,即 config.json 文件中的Token |
uid | 1 | string | 是 | 客户端绑定的UID |
安全模式(签名验证)
参数名 | 示例值 | 参数类型 | 是否必填 | 参数描述 |
---|---|---|---|---|
Timestamp | david | int | 是 | 请求毫秒时间戳 |
Signature | david | string | 是 | 请求签名值签名算法 |
uid | 1 | string | 是 | 客户端绑定的UID |
请求Body参数
text
消息内容为JSON-RPC协议序列化之后的纯文本JSON字符串,服务器不做任何处理,直接透传给客户端。
请求Body示例
json
{
"_id": 1,
"_method": "cus-sec_SpcGetEntName",
"args": {}
}
响应示例
成功
json
{
"code": 0,
"message": "成功",
"data": {
"_id": 1,
"_method": "cus-sec_SpcGetEntName",
"_status": "00",
"_args": {
"Result": true,
"Data": [
"海南xx科技有限公司"
],
"Error": []
}
}
}
失败
json
{
"code": 116,
"message": "缺少_id参数或_id不能为0"
}
json
{
"code": 117,
"message": "缺少_method参数"
}
json
{
"code": 118,
"message": "UID不在线或不存在"
}
json
{
"code": 113,
"message": "无法发送RPC请求,可能是通道已满或连接已关闭"
}
json
{
"code": 114,
"message": "RPC调用超时"
}
实现RPC的 go
源码
RPC调用相关代码
go
// HTTP处理函数,RPC调用接口
func rpcHandler(c *gin.Context) {
uid := c.GetHeader("uid")
if uid == "" {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingUIDHeader))
return
}
body, err := c.GetRawData()
if err != nil {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeReadBodyFailed))
return
}
if len(body) == 0 {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeEmptyBody))
return
}
req := &RPCRequest{}
if err := json.Unmarshal(body, req); err != nil {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeInvalidParams))
return
}
if req.ID == 0 {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingID))
return
}
if req.Method == "" {
c.JSON(http.StatusBadRequest, NewErrorResponse(ErrCodeMissingMethod))
return
}
c.JSON(http.StatusOK, hub.callRPC(uid, body, req))
}
// callRPC 发起RPC调用并等待响应
func (h *Hub) callRPC(uid string, body []byte, req *RPCRequest) ResponseInfo {
h.uidMapMutex.RLock()
clientIDs, ok := h.uidMap[uid]
h.uidMapMutex.RUnlock()
if !ok || len(clientIDs) == 0 {
return NewErrorResponse(ErrCodeUIDNotOnline)
}
// RCP等待通道,唯一key:uid + _id + _method
rpcKey := fmt.Sprintf("%s_%d_%s", uid, req.ID, req.Method)
rpcChan := make(chan interface{}, 1)
h.rpcMutex.Lock()
h.rpcCalls[rpcKey] = rpcChan
h.rpcMutex.Unlock()
defer func() {
h.rpcMutex.Lock()
delete(h.rpcCalls, rpcKey)
h.rpcMutex.Unlock()
}()
sent := false
h.clientsMutex.RLock()
for _, clientID := range clientIDs {
client, clientExists := h.clients[clientID]
if !clientExists {
continue
}
select {
case client.Send <- body:
sent = true
default:
// 发送失败,可能是通道已满或连接已关闭
}
}
h.clientsMutex.RUnlock()
if !sent {
return NewErrorResponse(ErrCodeUnableToSendRPC)
}
// 等待响应或超时
select {
case result := <-rpcChan:
return NewSuccessResponse(result)
case <-time.After(10 * time.Second):
return NewErrorResponse(ErrCodeRPCTimeout)
}
}