Goroutines有高怠速唤醒电话

发布于 2025-01-23 07:40:23 字数 1739 浏览 1 评论 0 原文

我正在使用Golang同时使用Goroutines同时运行两个Websocket客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很好。两个客户都会收到从WebSocket服务器传输的数据。我相信我可能已经设置了问题,但是,自从我检查活动监视器时,我的程序始终具有500-1500个空闲唤醒UPS,并且使用了我的CPU的200%。对于两个WebSocket客户端的简单内容,这似乎并不正常。

我将代码放入片段中,以便阅读较少的内容(希望可以更易于理解),但是如果您需要整个代码,我也可以发布该代码。这是我的主要功能中运行WS客户端的代码

comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup

wg.Add(1)
go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)
wg.Add(1)
go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)

<- comms
cancel()
wg.Wait()

,这是客户如何运行GO例程的代码

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing public socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
                pubJsonDecoder(message, testing)
                //tradesParser(message);
            }
        }
    }
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing private socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
            }
        }
    }
}

。我应该使用多线程而不是并发吗?在此先感谢您的任何帮助!

I'm using GoLang to run two websocket clients (one for private and one for public data) simultaneously using goroutines. On the surface, everything seems to work fine. Both clients receive data transmitted from the websocket server. I believe I may have set something up wrong, however, since when I check activity monitor, my program consistently has between 500 - 1500 Idle Wake Ups and is using >200% of my CPU. This doesn't seem normal for something as simple as two websocket clients.

I've put the code in snippets so there's less to read (hopefully that makes it easier to understand), but if you need the entire code, I can post that as well. Here is the code in my main func that runs the ws clients

comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup

wg.Add(1)
go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)
wg.Add(1)
go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)

<- comms
cancel()
wg.Wait()

Here is the code for how the clients run the go routines

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing public socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
                pubJsonDecoder(message, testing)
                //tradesParser(message);
            }
        }
    }
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing private socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
            }
        }
    }
}

Any ideas on why the Idle Wake Ups are so high? Should I be using multithreading instead of concurrency? Thanks in advance for any help!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

梦回旧景 2025-01-30 07:40:23

您在这里浪费CPU(多余的环路):

  for {
       // ...
        default:
        // High CPU usage here.
        }
    }

尝试这样的事情:

 func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
        pubJsonDecoder(message, testing)
        //tradesParser(message);
    }

    <-ctx.Done()
    log.Println("closing public socket")
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
    }

    <-ctx.Done()
    log.Println("closing private socket")
}

也可能会有所帮助:

You're wasting CPU here (superfluous loop):

  for {
       // ...
        default:
        // High CPU usage here.
        }
    }

Try something like this:

 func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
        pubJsonDecoder(message, testing)
        //tradesParser(message);
    }

    <-ctx.Done()
    log.Println("closing public socket")
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    defer socket.Close()

    socket.OnTextMessage = func(message string, socket Socket) {
        log.Println(message)
    }

    <-ctx.Done()
    log.Println("closing private socket")
}

Also this may help:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go

爱格式化 2025-01-30 07:40:23

TL/DR:Websocket很难:)

看起来您可能有几个旋转器。您是在a for -select语句的默认情况下为ontextMessage()分配处理程序函数。如果没有其他情况,默认情况总是执行。因为在默认情况下没有任何阻止,所以循环只是失控。这样的两种旋转旋转都可能会钉2个核心。 WebSocket是网络IO,这些Goroutines可能会并行运行。这就是为什么您看到200%使用的原因。

看看大猩猩/Websocket库。我不会说它比任何其他Websocket库更好或更糟,我对此有很多经验。

https://github.com/gorilla/websocket/websocket

以下是我多次使用过的实现。
设置的方式是收到某个消息时触发的注册处理程序功能。假设消息中的一个值是“类型”:“ start-job”,WebSocket服务器将调用您分配给“ start-Job” Websocket消息的处理程序。感觉就像为HTTP路由器编写端点。

package serverws

context.go

package serverws

import (
    "errors"
    "fmt"
    "strings"
    "sync"
)

// ConnContext is the connection context to track a connected websocket user
type ConnContext struct {
    specialKey  string
    supportGzip string
    UserID      string
    mu         sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.
}

// HashKeyAsCtx returns a ConnContext based on the hash provided
func HashKeyAsCtx(hashKey string) (*ConnContext, error) {
    values := strings.Split(hashKey, ":")
    if len(values) != 3 {
        return nil, errors.New("Invalid Key received: " + hashKey)
    }
    return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil
}

// AsHashKey returns the hash key for a given connection context ConnContext
func (ctx *ConnContext) AsHashKey() string {
    return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")
}

// String returns a string of the hash of a given connection context ConnContext
func (ctx *ConnContext) String() string {
    return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)
}

wshandler.go

package serverws

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/zerolog/log"
)

var (
    receiveFunctionMap = make(map[string]ReceiveObjectFunc)
    ctxHashMap         sync.Map
)

// ReceiveObjectFunc is a function signature for a websocket request handler
type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})

// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)
type WebSocketHandler struct {
    wsupgrader websocket.Upgrader
}

// WebSocketMessage that is sent over a websocket.   Messages must have a conversation type so the server and the client JS know
// what is being discussed and what signals to raise on the server and the client.
// The "Notification" message instructs the client to display an alert popup.
type WebSocketMessage struct {
    MessageType string      `json:"type"`
    Message     interface{} `json:"message"`
}

// NewWebSocketHandler sets up a new websocket.
func NewWebSocketHandler() *WebSocketHandler {
    wsh := new(WebSocketHandler)
    wsh.wsupgrader = websocket.Upgrader{
        ReadBufferSize:  4096,
        WriteBufferSize: 4096,
    }
    return wsh

}

// RegisterMessageType sets up an event bus for a message type.   When messages arrive from the client that match messageTypeName,
// the function you wrote to handle that message is then called.
func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {
    receiveFunctionMap[messageTypeName] = f
}

// onMessage triggers when the underlying websocket has received a message.
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
    //  Handling text messages or binary messages. Binary is usually some gzip text.
    if msgType == websocket.TextMessage {
        wsh.processIncomingTextMsg(conn, ctx, msg)
    }
    if msgType == websocket.BinaryMessage {

    }
}

// onOpen triggers when the underlying websocket has established a connection.
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
    //user, err := gothic.GetFromSession("ID", r)
    user := "TestUser"
    if err := r.ParseForm(); err != nil {
        return nil, errors.New("parameter check error")
    }

    specialKey := r.FormValue("specialKey")
    supportGzip := r.FormValue("support_gzip")

    if user != "" && err == nil {
        ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}
    } else {
        ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}
    }

    keyString := ctx.AsHashKey()

    if oldConn, ok := ctxHashMap.Load(keyString); ok {
        wsh.onClose(oldConn.(*websocket.Conn), ctx)
        oldConn.(*websocket.Conn).Close()
    }
    ctxHashMap.Store(keyString, conn)
    return ctx, nil
}

// onClose triggers when the underlying websocket has been closed down
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
    //log.Info().Msg(("client close itself as " + ctx.String()))
    wsh.closeConnWithCtx(ctx)
}

// onError triggers when a websocket connection breaks
func (wsh *WebSocketHandler) onError(errMsg string) {
    //log.Error().Msg(errMsg)
}

// HandleConn happens when a user connects to us at the listening point.  We ask
// the user to authenticate and then send the required HTTP Upgrade return code.
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {

    user := ""
    if r.URL.Path == "/websocket" {
        user = "TestUser" // authenticate however you want
        if user == "" {
            fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))
            return
        }
    }
    // don't do this.  You need to check the origin, but this is here as a place holder
    wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {
        return true
    }

    conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Error().Msg("Failed to set websocket upgrade: " + err.Error())
        return
    }
    defer conn.Close()

    ctx, err := wsh.onOpen(conn, r)
    if err != nil {
        log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)
        if user != "" {
            ctx.UserID = user
        }
        return
    }

    if user != "" {
        ctx.UserID = user
    }
    conn.SetPingHandler(func(message string) error {
        conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
        return nil
    })

    // Message pump for the underlying websocket connection
    for {
        t, msg, err := conn.ReadMessage()
        if err != nil {
            // Read errors are when the user closes the tab. Ignore.
            wsh.onClose(conn, ctx)
            return
        }

        switch t {
        case websocket.TextMessage, websocket.BinaryMessage:
            wsh.onMessage(conn, ctx, msg, t)
        case websocket.CloseMessage:
            wsh.onClose(conn, ctx)
            return
        case websocket.PingMessage:
        case websocket.PongMessage:
        }

    }

}

func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
    keyString := ctx.AsHashKey()
    ctxHashMap.Delete(keyString)
}

func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
    //log.Debug().Msg("CLIENT SAID " + string(msg))
    data := WebSocketMessage{}

    // try to turn this into data
    err := json.Unmarshal(msg, &data)

    // And try to get at the data underneath
    var raw = make(map[string]interface{})
    terr := json.Unmarshal(msg, &raw)

    if err == nil {
        // What kind of message is this?
        if receiveFunctionMap[data.MessageType] != nil {
            // We'll try to cast this message and call the handler for it
            if terr == nil {
                if v, ok := raw["message"].(map[string]interface{}); ok {
                    receiveFunctionMap[data.MessageType](conn, ctx, v)
                } else {
                    log.Debug().Msg("Nonsense sent over the websocket.")
                }
            } else {
                log.Debug().Msg("Nonsense sent over the websocket.")
            }
        }
    } else {
        // Received garbage from the transmitter.
    }
}

// SendJSONToSocket sends a specific message to a specific websocket
func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {
    fields := strings.Split(socketID, ":")
    message, _ := json.Marshal(msg)

    ctxHashMap.Range(func(key interface{}, value interface{}) bool {
        if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
            wsh.onError(err.Error())
        } else {
            if ctx.specialKey == fields[0] {
                ctx.mu.Lock()
                if value != nil {
                    err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)
                }
                ctx.mu.Unlock()
            }
            if err != nil {
                ctx.mu.Lock() // We'll lock here even though we're going to destroy this
                wsh.onClose(value.(*websocket.Conn), ctx)
                value.(*websocket.Conn).Close()
                ctxHashMap.Delete(key) // Remove the websocket immediately
                //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())
            }
        }
        return true
    })
}

package wsocket

types.go

package wsocket



// Acknowledgement is for ACKing simple messages and sending errors
type Acknowledgement struct {
    ResponseID string `json:"responseId"`
    Status     string `json:"status"`
    IPAddress  string `json:"ipaddress"`
    ErrorText  string `json:"errortext"`
}


wsocket.go

package wsocket

import (
    "fmt"
    server "project/serverws"
    "project/utils"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    // "github.com/mitchellh/mapstructure"
    "github.com/inconshreveable/log15"
)
var (
    WebSocket         *server.WebSocketHandler // So other packages can send out websocket messages
    WebSocketLocation string
    Log               log15.Logger = log15.New("package", "wsocket"
)

func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {

    WebSocket = socket
    WebSocketLocation = "example.mydomain.com"
    //WebSocketLocation = "example.mydomain.com"
    r.GET("/websocket", func(c *gin.Context) {
        socket.HandleConn(c.Writer, c.Request)

    })

socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "Hello",
            Status:     fmt.Sprintf("OK/%v", ctx.AuthID),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)
    })

socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "starting_job",
            Status:     fmt.Sprintf("%s is being dialed.", data["did"]),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)

    })

此实现适用于Web应用程序。这是JavaScript中客户端的简化版本。您可以处理与此实现的许多并发连接,并且您要做的就是定义包含响应ID的对象/结构,该响应ID与下面的开关中的情况匹配,它基本上是一个长开关语句,序列化并将其发送到另一侧,另一侧会ack。我有一些在几个生产环境中运行的版本。

Websocket.js

$(() => {

    function wsMessage(object) {
        switch (object.responseId) {
            case "Hello": // HELLO! :-)
                console.log("Heartbeat received, we're connected.");
                break;

            case "Notification":
                if (object.errortext != "") {
                    $.notify({
                        // options
                        message: '<center><B><i class="fas fa-exclamation-triangle"></i>  ' + object.errortext + '</B></center>',
                    }, {
                        // settings
                        type: 'danger',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                       
                    });

                } else {
                    $.notify({
                        // options
                        message: '<center><B>' + object.status + '</B></center>',
                    }, {
                        // settings
                        type: 'success',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                    });
                }
                break;
           
        }

    }



    $(document).ready(function () {

        function heartbeat() {
            if (!websocket) return;
            if (websocket.readyState !== 1) return;
            websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
            setTimeout(heartbeat, 24000);
        }
        //TODO: CHANGE TO WSS once tls is enabled.
        function wireUpWebsocket() {
            websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');

            websocket.onopen = function (event) {
                console.log("Websocket connected.");

                heartbeat();
                //if it exists
                if (typeof (wsReady) !== 'undefined') {
                    //execute it
                    wsReady();
                }

            };

            websocket.onerror = function (event) {
                console.log("WEBSOCKET ERROR " + event.data);

            };

            websocket.onmessage = function (event) {
                wsMessage(JSON.parse(event.data));
            };

            websocket.onclose = function () {
                // Don't close!
                // Replace key
                console.log("WEBSOCKET CLOSED");
                WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
                websocketreconnects++;
                if (websocketreconnects > 30) { // Too much, time to bounce
                    // location.reload(); Don't reload the page anymore, just re-connect.
                }
                setTimeout(function () { wireUpWebsocket(); }, 3000);
            };
        }

        wireUpWebsocket();
    });

});

function getCookie(name) {
    var value = "; " + document.cookie;
    var parts = value.split("; " + name + "=");
    if (parts.length == 2) return parts.pop().split(";").shift();
}

function setCookie(cname, cvalue, exdays) {
    var d = new Date();
    d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
    var expires = "expires=" + d.toUTCString();
    document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}

在无限循环中一遍又一遍地分配处理程序函数绝对不会工作。

https://github.com/gorilla/gorilla/websocket

tl/dr: websockets are hard :)

It looks like you might have a couple of spinners. You are assigning the handler function for OnTextMessage() in the default case of a for - select statement. The default case always executes if no other cases are ready. Because there is nothing that blocks in the default case, that for loop just spins out of control. Both goroutines spinning like this will likely peg 2 cores. Websockets are network IO and those goroutines are likely to run in parallel. This is why you are seeing 200% utilization.

Take a look at the gorilla/websocket library. I'm not going to say that it is better or worse than any other websocket library, I have a lot of experience with it.

https://github.com/gorilla/websocket

Below is an implementation that I have used many times.
The way it is set up is you register handler functions that are triggered when a certain message is received. Say one of the values in your message was "type" : "start-job", the websocket server will call the handler you assigned to the "start-job" websocket message. It feels like writing endpoints for an http router.

Package serverws

context.go

package serverws

import (
    "errors"
    "fmt"
    "strings"
    "sync"
)

// ConnContext is the connection context to track a connected websocket user
type ConnContext struct {
    specialKey  string
    supportGzip string
    UserID      string
    mu         sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.
}

// HashKeyAsCtx returns a ConnContext based on the hash provided
func HashKeyAsCtx(hashKey string) (*ConnContext, error) {
    values := strings.Split(hashKey, ":")
    if len(values) != 3 {
        return nil, errors.New("Invalid Key received: " + hashKey)
    }
    return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil
}

// AsHashKey returns the hash key for a given connection context ConnContext
func (ctx *ConnContext) AsHashKey() string {
    return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")
}

// String returns a string of the hash of a given connection context ConnContext
func (ctx *ConnContext) String() string {
    return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)
}

wshandler.go

package serverws

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/zerolog/log"
)

var (
    receiveFunctionMap = make(map[string]ReceiveObjectFunc)
    ctxHashMap         sync.Map
)

// ReceiveObjectFunc is a function signature for a websocket request handler
type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})

// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)
type WebSocketHandler struct {
    wsupgrader websocket.Upgrader
}

// WebSocketMessage that is sent over a websocket.   Messages must have a conversation type so the server and the client JS know
// what is being discussed and what signals to raise on the server and the client.
// The "Notification" message instructs the client to display an alert popup.
type WebSocketMessage struct {
    MessageType string      `json:"type"`
    Message     interface{} `json:"message"`
}

// NewWebSocketHandler sets up a new websocket.
func NewWebSocketHandler() *WebSocketHandler {
    wsh := new(WebSocketHandler)
    wsh.wsupgrader = websocket.Upgrader{
        ReadBufferSize:  4096,
        WriteBufferSize: 4096,
    }
    return wsh

}

// RegisterMessageType sets up an event bus for a message type.   When messages arrive from the client that match messageTypeName,
// the function you wrote to handle that message is then called.
func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {
    receiveFunctionMap[messageTypeName] = f
}

// onMessage triggers when the underlying websocket has received a message.
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
    //  Handling text messages or binary messages. Binary is usually some gzip text.
    if msgType == websocket.TextMessage {
        wsh.processIncomingTextMsg(conn, ctx, msg)
    }
    if msgType == websocket.BinaryMessage {

    }
}

// onOpen triggers when the underlying websocket has established a connection.
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
    //user, err := gothic.GetFromSession("ID", r)
    user := "TestUser"
    if err := r.ParseForm(); err != nil {
        return nil, errors.New("parameter check error")
    }

    specialKey := r.FormValue("specialKey")
    supportGzip := r.FormValue("support_gzip")

    if user != "" && err == nil {
        ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}
    } else {
        ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}
    }

    keyString := ctx.AsHashKey()

    if oldConn, ok := ctxHashMap.Load(keyString); ok {
        wsh.onClose(oldConn.(*websocket.Conn), ctx)
        oldConn.(*websocket.Conn).Close()
    }
    ctxHashMap.Store(keyString, conn)
    return ctx, nil
}

// onClose triggers when the underlying websocket has been closed down
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
    //log.Info().Msg(("client close itself as " + ctx.String()))
    wsh.closeConnWithCtx(ctx)
}

// onError triggers when a websocket connection breaks
func (wsh *WebSocketHandler) onError(errMsg string) {
    //log.Error().Msg(errMsg)
}

// HandleConn happens when a user connects to us at the listening point.  We ask
// the user to authenticate and then send the required HTTP Upgrade return code.
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {

    user := ""
    if r.URL.Path == "/websocket" {
        user = "TestUser" // authenticate however you want
        if user == "" {
            fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))
            return
        }
    }
    // don't do this.  You need to check the origin, but this is here as a place holder
    wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {
        return true
    }

    conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Error().Msg("Failed to set websocket upgrade: " + err.Error())
        return
    }
    defer conn.Close()

    ctx, err := wsh.onOpen(conn, r)
    if err != nil {
        log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)
        if user != "" {
            ctx.UserID = user
        }
        return
    }

    if user != "" {
        ctx.UserID = user
    }
    conn.SetPingHandler(func(message string) error {
        conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
        return nil
    })

    // Message pump for the underlying websocket connection
    for {
        t, msg, err := conn.ReadMessage()
        if err != nil {
            // Read errors are when the user closes the tab. Ignore.
            wsh.onClose(conn, ctx)
            return
        }

        switch t {
        case websocket.TextMessage, websocket.BinaryMessage:
            wsh.onMessage(conn, ctx, msg, t)
        case websocket.CloseMessage:
            wsh.onClose(conn, ctx)
            return
        case websocket.PingMessage:
        case websocket.PongMessage:
        }

    }

}

func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
    keyString := ctx.AsHashKey()
    ctxHashMap.Delete(keyString)
}

func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
    //log.Debug().Msg("CLIENT SAID " + string(msg))
    data := WebSocketMessage{}

    // try to turn this into data
    err := json.Unmarshal(msg, &data)

    // And try to get at the data underneath
    var raw = make(map[string]interface{})
    terr := json.Unmarshal(msg, &raw)

    if err == nil {
        // What kind of message is this?
        if receiveFunctionMap[data.MessageType] != nil {
            // We'll try to cast this message and call the handler for it
            if terr == nil {
                if v, ok := raw["message"].(map[string]interface{}); ok {
                    receiveFunctionMap[data.MessageType](conn, ctx, v)
                } else {
                    log.Debug().Msg("Nonsense sent over the websocket.")
                }
            } else {
                log.Debug().Msg("Nonsense sent over the websocket.")
            }
        }
    } else {
        // Received garbage from the transmitter.
    }
}

// SendJSONToSocket sends a specific message to a specific websocket
func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {
    fields := strings.Split(socketID, ":")
    message, _ := json.Marshal(msg)

    ctxHashMap.Range(func(key interface{}, value interface{}) bool {
        if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
            wsh.onError(err.Error())
        } else {
            if ctx.specialKey == fields[0] {
                ctx.mu.Lock()
                if value != nil {
                    err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)
                }
                ctx.mu.Unlock()
            }
            if err != nil {
                ctx.mu.Lock() // We'll lock here even though we're going to destroy this
                wsh.onClose(value.(*websocket.Conn), ctx)
                value.(*websocket.Conn).Close()
                ctxHashMap.Delete(key) // Remove the websocket immediately
                //wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())
            }
        }
        return true
    })
}

package wsocket

types.go

package wsocket



// Acknowledgement is for ACKing simple messages and sending errors
type Acknowledgement struct {
    ResponseID string `json:"responseId"`
    Status     string `json:"status"`
    IPAddress  string `json:"ipaddress"`
    ErrorText  string `json:"errortext"`
}


wsocket.go

package wsocket

import (
    "fmt"
    server "project/serverws"
    "project/utils"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    // "github.com/mitchellh/mapstructure"
    "github.com/inconshreveable/log15"
)
var (
    WebSocket         *server.WebSocketHandler // So other packages can send out websocket messages
    WebSocketLocation string
    Log               log15.Logger = log15.New("package", "wsocket"
)

func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {

    WebSocket = socket
    WebSocketLocation = "example.mydomain.com"
    //WebSocketLocation = "example.mydomain.com"
    r.GET("/websocket", func(c *gin.Context) {
        socket.HandleConn(c.Writer, c.Request)

    })

socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "Hello",
            Status:     fmt.Sprintf("OK/%v", ctx.AuthID),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)
    })

socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {

        response := Acknowledgement{
            ResponseID: "starting_job",
            Status:     fmt.Sprintf("%s is being dialed.", data["did"]),
            IPAddress:  conn.RemoteAddr().String(),
        }
        // mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.
        socket.SendJSONToSocket(ctx.AsHashKey(), &response)

    })

This implementation was for a web application. This is a simplified version of the client side in javascript. You can handle many concurrent connections with this implementation and all you do to communicate is define objects/structs that contain a responseID that matches a case in the switch below, it is basically one long switch statement, serialize it and send it to the other side, and the other side will ack. I have some version of this running in several production environments.

websocket.js

$(() => {

    function wsMessage(object) {
        switch (object.responseId) {
            case "Hello": // HELLO! :-)
                console.log("Heartbeat received, we're connected.");
                break;

            case "Notification":
                if (object.errortext != "") {
                    $.notify({
                        // options
                        message: '<center><B><i class="fas fa-exclamation-triangle"></i>  ' + object.errortext + '</B></center>',
                    }, {
                        // settings
                        type: 'danger',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                       
                    });

                } else {
                    $.notify({
                        // options
                        message: '<center><B>' + object.status + '</B></center>',
                    }, {
                        // settings
                        type: 'success',
                        offset: 50,
                        placement: {
                            align: 'center',
                        }
                    });
                }
                break;
           
        }

    }



    $(document).ready(function () {

        function heartbeat() {
            if (!websocket) return;
            if (websocket.readyState !== 1) return;
            websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
            setTimeout(heartbeat, 24000);
        }
        //TODO: CHANGE TO WSS once tls is enabled.
        function wireUpWebsocket() {
            websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');

            websocket.onopen = function (event) {
                console.log("Websocket connected.");

                heartbeat();
                //if it exists
                if (typeof (wsReady) !== 'undefined') {
                    //execute it
                    wsReady();
                }

            };

            websocket.onerror = function (event) {
                console.log("WEBSOCKET ERROR " + event.data);

            };

            websocket.onmessage = function (event) {
                wsMessage(JSON.parse(event.data));
            };

            websocket.onclose = function () {
                // Don't close!
                // Replace key
                console.log("WEBSOCKET CLOSED");
                WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
                websocketreconnects++;
                if (websocketreconnects > 30) { // Too much, time to bounce
                    // location.reload(); Don't reload the page anymore, just re-connect.
                }
                setTimeout(function () { wireUpWebsocket(); }, 3000);
            };
        }

        wireUpWebsocket();
    });

});

function getCookie(name) {
    var value = "; " + document.cookie;
    var parts = value.split("; " + name + "=");
    if (parts.length == 2) return parts.pop().split(";").shift();
}

function setCookie(cname, cvalue, exdays) {
    var d = new Date();
    d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
    var expires = "expires=" + d.toUTCString();
    document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}

Assigning handler functions over and over again in an infinite loop is definitely not going to work.

https://github.com/gorilla/websocket

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文