Golang WebSockets开发

Golang做为这一两年内个人最喜欢的一门后端语言,一直没有写过关于它的什么文字,正好这次由于业务需求,在做一个demo给需要给用户汇报,demo中涉及到的功能有基于WebRTC方面的通讯,也有RTM也即是实时消息功能,这里自然选用golang来实现后端的功能,

框架选择

  1. 包管理器: 这个自然是go moduel咯
  2. Golang web框架: Gin
  3. Golang web sockets 框架: gorilla/websocket

Go Module如下:

module AgroalTokenApi
go 1.16
require (
    github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6
    github.com/garyburd/redigo v1.6.2 // indirect
    github.com/gin-gonic/gin v1.6.3
    github.com/go-sql-driver/mysql v1.5.0
    github.com/gorilla/websocket v1.4.2
    github.com/kr/pretty v0.1.0 // indirect
    github.com/onsi/ginkgo v1.15.0 // indirect
    github.com/onsi/gomega v1.10.5 // indirect
    github.com/rs/cors v1.7.0
    github.com/satori/go.uuid v1.2.0
    gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a // indirect
    gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
    gopkg.in/redis.v4 v4.2.4
)

Gin启动和配置

Gin作为一个优秀的Web框架这里结合endless做启动

func main() {
// init Gin
gin.DisableBindValidation()
listenAddress := "0.0.0.0:1238"
route := initRoute()  // 初始化Router
initWsManager()  // 初始化Websocker相关

// 建立一个新的endless服务器设置监听和超时
server := endless.NewServer(listenAddress, route)
server.ReadTimeout = 3000 * time.Second
server.WriteTimeout = 3000 * time.Second
server.BeforeBegin = func(add string) {
    log.Printf(add)
}
err := server.ListenAndServe()
if err != nil {
    log.Println(err)
}
log.Println("Server stopped")
os.Exit(0)
}

路由的初始化:

func initRoute() *gin.Engine   {
    route := gin.Default()
    v1 := route.Group("/Api")
    // 支持跨域
    v1.Use(cors.New(cors.Options{
        AllowedMethods: []string{"PUT", "PATCH", "GET", "POST", "DELETE"},
        AllowedOrigins: []string{"*"},
        AllowedHeaders: []string{"content-type"},
        ExposedHeaders: []string{"X-Total-Count"}}))
        
    v1.POST("/RtcToken",getRtcToken)
    v1.POST("/Login",loginByUserNameAndPassword)
    // WebSocket相关路由
    // 由于WebSocket握手阶段是采用http协议的然后 uprade的,这里声明一个Web Sockets的client
    v1.GET("/ws",WebSocket.WebsocketManager.WsClient)

    return route
}

WebSocket相关模块

1.启动WebSockets manager

        func initWsManager()  {
            go WebSocket.WebsocketManager.Start()
            go WebSocket.WebsocketManager.SendService()
            go WebSocket.WebsocketManager.SendService()
            go WebSocket.WebsocketManager.SendGroupService()
            go WebSocket.WebsocketManager.SendGroupService()
            go WebSocket.WebsocketManager.SendAllService()
            //go WebSocket.TestSendGroup()
            //go WebSocket.TestSendAll()
        }
    

2.WebSocketss manger

首先的定一些结构体:

    type Manager struct {
        Group map[string]map[string]*Client
        groupCount, clientCount uint
        Lock sync.Mutex
        Register, UnRegister chan *Client
        Message    chan *MessageData
        GroupMessage chan *GroupMessageData
        BroadCastMessage chan *BroadCastMessageData
    }
    
    // Client 单个 websocket 信息
    type Client struct {
        Id, Group string
        Socket *websocket.Conn
        Message   chan []byte
    }
    // messageData 单个发送数据信息
    type MessageData struct {
        Id, Group string
        Message    []byte
    }
    // groupMessageData 组广播数据信息
    type GroupMessageData struct {
        Group string
        Message    []byte
    }
    // 广播发送数据信息
    type BroadCastMessageData struct {
        Message    []byte
        ClientId   string
    }

3.gin 处理 websocket handler

    func (manager *Manager) WsClient(ctx *gin.Context) {
        upGrader := websocket.Upgrader{
            // cross origin domain
            CheckOrigin: func(r *http.Request) bool {
                return true
            },
            // 处理 Sec-WebSocket-Protocol Header
            Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
        }
        conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
        if err != nil {
            log.Printf("websocket connect error: %s", ctx.Param("channel"))
            return
        }
        clientId := ctx.DefaultQuery("clientId",uuid.NewV4().String())
        channel := ctx.DefaultQuery("channel","")
        client := &Client{
            Id:     clientId,
            Group:  channel,
            Socket: conn,
            Message:   make(chan []byte, 4096),
        }

        manager.RegisterClient(client)
        go client.Read()
        go client.Write()
        //time.Sleep(time.Second * 15)
        // 测试单个 client 发送数据
        //manager.Send(client.Id, client.Group, []byte("Send message ----" + time.Now().Format("2006-01-02 15:04:05")))
        }
        

4.读取和发送

由于代码多,这里展现一个读取和发送的流程
读取:

    // 读信息,从 websocket 连接直接读取数据
    func (c *Client) Read() {
defer func() {
    WebsocketManager.UnRegister <- c
    log.Printf("client [%s] disconnect", c.Id)
    if err := c.Socket.Close(); err != nil {
        log.Printf("client [%s] disconnect err: %s", c.Id, err)
    }
}()
for {
    messageType, message, err := c.Socket.ReadMessage()
    if err != nil || messageType == websocket.CloseMessage {
        break
    }
    log.Printf("client [%s] receive message: %s", c.Id, string(message))
    c.Message <- message
    WebsocketManager.SendAllWithoutClientId(message,c.Id)
}
}

发送:

    func (c *Client) Write() {
        defer func() {
            log.Printf("client [%s] disconnect", c.Id)
            if err := c.Socket.Close(); err != nil {
                log.Printf("client [%s] disconnect err: %s", c.Id, err)
            }
        }()

        for {
            select {
            case message, ok := <-c.Message:
                if !ok {
                    _ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
                    return
                }
                log.Printf("client [%s] write message: %s", c.Id, string(message))
                err := c.Socket.WriteMessage(websocket.BinaryMessage, message)
                if err != nil {
                    log.Printf("client [%s] writemessage err: %s", c.Id, err)
                }
            }
        }
    }
    

Nginx配置

由于需要在nginx后运行,ws和wss的转发需要做如下配置,否则nginx后 websockets是不正常的

 map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
    server localhost:1238; # 后端真实服务地址
}

 server {
    listen 443 ssl http2;
    server_name server.name;
    # ssl on;
    ssl_certificate /usr/local/nginx/conf/test.pem;
    ssl_certificate_key /usr/local/nginx/conf/test.key;

    ssl_session_timeout 5m;
    ssl_protocols  TLSv1 TLSv1.1 TLSv1.2;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
    ssl_prefer_server_ciphers on;
    # 普通webapi
    location /Api/ {
        proxy_pass http://127.0.0.1:1238; 
    }
    # websockets 相关
    location /Api/ws {
        proxy_pass http://websocket;
        proxy_read_timeout 300s;
        proxy_send_timeout 300s;
     
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
     
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
    # 普通网页
    location {
        root /test/; 
    }
}

Lokie博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论
  • 本博客使用免费开源的 laravel-bjyblog v5.5.1.1 搭建 © 2014-2018 lokie.wang 版权所有 ICP证:沪ICP备18016993号
  • 联系邮箱:kitche1985@hotmail.com