|
package main
import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"net/http"
"strconv"
"time"
"util"
"github.com/gorilla/websocket"
)
var (
ctxHashMap = util.NewConcurrentMap()
)
//用来升级http协议到ws协议
type WebSocketHandler struct {
wsupgrader websocket.Upgrader
}
func (wsh *WebSocketHandler) NewWebSocketHandler() {
wsh.wsupgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
}
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
//处理文本消息 或者 2进制消息 2进制通常是些 gzip的文本 语音或者图片视频之类的一般会用其他云服务不然带宽会爆掉
if msgType == websocket.TextMessage {
wsh.processIncomingTextMsg(conn, ctx, msg)
}
if msgType == websocket.BinaryMessage {
}
}
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
if err := r.ParseForm(); err != nil {
return nil, errors.New("参数校验错误")
}
specialKey := r.FormValue("specialKey")
supportGzip := r.FormValue("support_gzip")
ctx = &ConnContext{specialKey, supportGzip}
//用来标识一个tcp链接
keyString := ctx.AsHashKey()
if oldConn, ok := ctxHashMap.Get(keyString); ok {
wsh.onClose(oldConn.(*websocket.Conn), ctx)
oldConn.(*websocket.Conn).Close()
}
ctxHashMap.Set(keyString, conn)
return ctx, nil
}
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
logger.Info("client close itself as " + ctx.String())
wsh.closeConnWithCtx(ctx)
return
}
func (wsh *WebSocketHandler) onError(errMsg string) {
logger.Error(errMsg)
}
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {
wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool { return true }
conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
if err != nil {
logger.Error("Failed to set websocket upgrade: " + err.Error())
return
}
defer conn.Close()
if ctx, err := wsh.onOpen(conn, r); err != nil {
logger.Error("Open connection failed " + err.Error() + r.URL.RawQuery)
return
} else {
conn.SetPingHandler(func(message string) error {
conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
return nil
})
for {
t, msg, err := conn.ReadMessage()
if err != nil {
logger.Error("READ ERR FROM " + ctx.String() + " ERR " + err.Error())
wsh.onClose(conn, ctx)
return
}
switch t {
case websocket.TextMessage, websocket.BinaryMessage:
wsh.onMessage(conn, ctx, msg, t)
case websocket.CloseMessage:
wsh.onClose(conn, ctx)
return
case websocket.PingMessage:
case websocket.PongMessage:
}
}
}
}
func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
keyString := ctx.AsHashKey()
ctxHashMap.Remove(keyString)
return
}
func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
logger.Debug("CLIENT SAID " + string(msg))
sendMessageToAll(msg)
}
func (wsh *WebSocketHandler) sendMessageToAll(msg []byte]) {
var gzMsg bytes.Buffer
gzWriter := gzip.NewWriter(&gzMsg)
gzWriter.Write(msg)
gzWriter.Flush()
gzWriter.Close()
for key, conn := range ctxHashMap.Items() {
if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
wsh.onError(err.Error())
} else {
if ctx.supportGzip == "1" {
err = conn.(*websocket.Conn).WriteMessage(websocket.BinaryMessage, gzMsg.Bytes())
logger.Debug("send binary msg to " + ctx.String())
} else {
err = conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, []byte(msg))
logger.Debug("send text msg to " + ctx.String())
}
if err != nil {
wsh.onClose(conn.(*websocket.Conn), ctx)
conn.(*websocket.Conn).Close()
wsh.onError("WRITE ERR TO " + key.(string) + " ERR:" + err.Error())
}
}
}
}
|
|
|