Golang做为这一两年内个人最喜欢的一门后端语言,一直没有写过关于它的什么文字,正好这次由于业务需求,在做一个demo给需要给用户汇报,demo中涉及到的功能有基于WebRTC方面的通讯,也有RTM也即是实时消息功能,这里自然选用golang来实现后端的功能,
Go Module如下:
module AgroalTokenApi
go 1.16
require (
github.com/fvbock/endless v0.0.0-20170109170031-447134032cb6
github.com/garyburd/redigo v1.6.2 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-sql-driver/mysql v1.5.0
github.com/gorilla/websocket v1.4.2
github.com/kr/pretty v0.1.0 // indirect
github.com/onsi/ginkgo v1.15.0 // indirect
github.com/onsi/gomega v1.10.5 // indirect
github.com/rs/cors v1.7.0
github.com/satori/go.uuid v1.2.0
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/redis.v4 v4.2.4
)
Gin作为一个优秀的Web框架这里结合endless做启动
func main() {
// init Gin
gin.DisableBindValidation()
listenAddress := "0.0.0.0:1238"
route := initRoute() // 初始化Router
initWsManager() // 初始化Websocker相关
// 建立一个新的endless服务器设置监听和超时
server := endless.NewServer(listenAddress, route)
server.ReadTimeout = 3000 * time.Second
server.WriteTimeout = 3000 * time.Second
server.BeforeBegin = func(add string) {
log.Printf(add)
}
err := server.ListenAndServe()
if err != nil {
log.Println(err)
}
log.Println("Server stopped")
os.Exit(0)
}
路由的初始化:
func initRoute() *gin.Engine {
route := gin.Default()
v1 := route.Group("/Api")
// 支持跨域
v1.Use(cors.New(cors.Options{
AllowedMethods: []string{"PUT", "PATCH", "GET", "POST", "DELETE"},
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"content-type"},
ExposedHeaders: []string{"X-Total-Count"}}))
v1.POST("/RtcToken",getRtcToken)
v1.POST("/Login",loginByUserNameAndPassword)
// WebSocket相关路由
// 由于WebSocket握手阶段是采用http协议的然后 uprade的,这里声明一个Web Sockets的client
v1.GET("/ws",WebSocket.WebsocketManager.WsClient)
return route
}
func initWsManager() {
go WebSocket.WebsocketManager.Start()
go WebSocket.WebsocketManager.SendService()
go WebSocket.WebsocketManager.SendService()
go WebSocket.WebsocketManager.SendGroupService()
go WebSocket.WebsocketManager.SendGroupService()
go WebSocket.WebsocketManager.SendAllService()
//go WebSocket.TestSendGroup()
//go WebSocket.TestSendAll()
}
首先的定一些结构体:
type Manager struct {
Group map[string]map[string]*Client
groupCount, clientCount uint
Lock sync.Mutex
Register, UnRegister chan *Client
Message chan *MessageData
GroupMessage chan *GroupMessageData
BroadCastMessage chan *BroadCastMessageData
}
// Client 单个 websocket 信息
type Client struct {
Id, Group string
Socket *websocket.Conn
Message chan []byte
}
// messageData 单个发送数据信息
type MessageData struct {
Id, Group string
Message []byte
}
// groupMessageData 组广播数据信息
type GroupMessageData struct {
Group string
Message []byte
}
// 广播发送数据信息
type BroadCastMessageData struct {
Message []byte
ClientId string
}
func (manager *Manager) WsClient(ctx *gin.Context) {
upGrader := websocket.Upgrader{
// cross origin domain
CheckOrigin: func(r *http.Request) bool {
return true
},
// 处理 Sec-WebSocket-Protocol Header
Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
}
conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
log.Printf("websocket connect error: %s", ctx.Param("channel"))
return
}
clientId := ctx.DefaultQuery("clientId",uuid.NewV4().String())
channel := ctx.DefaultQuery("channel","")
client := &Client{
Id: clientId,
Group: channel,
Socket: conn,
Message: make(chan []byte, 4096),
}
manager.RegisterClient(client)
go client.Read()
go client.Write()
//time.Sleep(time.Second * 15)
// 测试单个 client 发送数据
//manager.Send(client.Id, client.Group, []byte("Send message ----" + time.Now().Format("2006-01-02 15:04:05")))
}
由于代码多,这里展现一个读取和发送的流程
读取:
// 读信息,从 websocket 连接直接读取数据
func (c *Client) Read() {
defer func() {
WebsocketManager.UnRegister <- c
log.Printf("client [%s] disconnect", c.Id)
if err := c.Socket.Close(); err != nil {
log.Printf("client [%s] disconnect err: %s", c.Id, err)
}
}()
for {
messageType, message, err := c.Socket.ReadMessage()
if err != nil || messageType == websocket.CloseMessage {
break
}
log.Printf("client [%s] receive message: %s", c.Id, string(message))
c.Message <- message
WebsocketManager.SendAllWithoutClientId(message,c.Id)
}
}
发送:
func (c *Client) Write() {
defer func() {
log.Printf("client [%s] disconnect", c.Id)
if err := c.Socket.Close(); err != nil {
log.Printf("client [%s] disconnect err: %s", c.Id, err)
}
}()
for {
select {
case message, ok := <-c.Message:
if !ok {
_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
log.Printf("client [%s] write message: %s", c.Id, string(message))
err := c.Socket.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
log.Printf("client [%s] writemessage err: %s", c.Id, err)
}
}
}
}
由于需要在nginx后运行,ws和wss的转发需要做如下配置,否则nginx后 websockets是不正常的
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
server localhost:1238; # 后端真实服务地址
}
server {
listen 443 ssl http2;
server_name server.name;
# ssl on;
ssl_certificate /usr/local/nginx/conf/test.pem;
ssl_certificate_key /usr/local/nginx/conf/test.key;
ssl_session_timeout 5m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
ssl_prefer_server_ciphers on;
# 普通webapi
location /Api/ {
proxy_pass http://127.0.0.1:1238;
}
# websockets 相关
location /Api/ws {
proxy_pass http://websocket;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
# 普通网页
location {
root /test/;
}
}
本文为Lokie.Wang原创文章,转载无需和我联系,但请注明来自lokie博客http://lokie.wang