mirror of
https://github.com/m5stack/StackChan.git
synced 2026-04-27 19:12:40 +00:00
server code
This commit is contained in:
@@ -0,0 +1,830 @@
|
||||
/*
|
||||
SPDX-FileCopyrightText: 2026 M5Stack Technology CO LTD
|
||||
SPDX-License-Identifier: MIT
|
||||
*/
|
||||
|
||||
package web_socket
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"stackChan/internal/service"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/net/ghttp"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const (
|
||||
Opus byte = 0x01
|
||||
Jpeg byte = 0x02
|
||||
ControlAvatar byte = 0x03
|
||||
ControlMotion byte = 0x04
|
||||
OnCamera byte = 0x05
|
||||
OffCamera byte = 0x06
|
||||
|
||||
TextMessage byte = 0x07
|
||||
RequestCall byte = 0x09
|
||||
RefuseCall byte = 0x0A
|
||||
AgreeCall byte = 0x0B
|
||||
HangupCall byte = 0x0C
|
||||
|
||||
UpdateDeviceName byte = 0x0D
|
||||
GetDeviceName byte = 0x0E
|
||||
|
||||
inCall byte = 0x0F
|
||||
|
||||
ping byte = 0x10
|
||||
pong byte = 0x11
|
||||
|
||||
OnPhoneScreen byte = 0x12
|
||||
OffPhoneScreen byte = 0x13
|
||||
Dance byte = 0x14
|
||||
GetAvatarPosture byte = 0x15
|
||||
|
||||
DeviceOffline byte = 0x16
|
||||
DeviceOnline byte = 0x17
|
||||
)
|
||||
|
||||
var (
|
||||
wsUpGrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
|
||||
logger.Errorf(r.Context(), "WebSocket Upgrade failed: %v", reason)
|
||||
},
|
||||
}
|
||||
logger = g.Log()
|
||||
stackChanClientPool = sync.Map{}
|
||||
appClientPool = sync.Map{}
|
||||
)
|
||||
|
||||
// AppClient indicates a WebSocket client connection on the App side
|
||||
type AppClient struct {
|
||||
Mac string
|
||||
Conn *websocket.Conn
|
||||
mu *sync.RWMutex
|
||||
DeviceId string
|
||||
LastTime time.Time
|
||||
}
|
||||
|
||||
// StackChanClient indicates a WebSocket client connection for the device end of a StackChan
|
||||
type StackChanClient struct {
|
||||
Mac string
|
||||
Conn *websocket.Conn
|
||||
mu *sync.RWMutex
|
||||
CameraSubscriptionList []*AppClient
|
||||
CallAppClient *AppClient
|
||||
phoneScreen bool
|
||||
LastTime time.Time
|
||||
}
|
||||
|
||||
func Handler(r *ghttp.Request) {
|
||||
ctx := r.Context()
|
||||
mac := r.Get("mac").String()
|
||||
deviceType := r.Get("deviceType").String()
|
||||
if mac == "" || deviceType == "" {
|
||||
r.Response.Write("The mac and deviceType parameters are empty.")
|
||||
return
|
||||
}
|
||||
|
||||
ws, err := wsUpGrader.Upgrade(r.Response.Writer, r.Request, nil)
|
||||
if err != nil {
|
||||
r.Response.Write(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if deviceType == "StackChan" {
|
||||
isHave := false
|
||||
var client *StackChanClient
|
||||
|
||||
stackChanClientPool.Range(func(key, value any) bool {
|
||||
macAddr := key.(string)
|
||||
stackChanClient := value.(*StackChanClient)
|
||||
|
||||
if macAddr == mac {
|
||||
isHave = true
|
||||
client = stackChanClient
|
||||
client.mu.Lock()
|
||||
client.Conn = ws
|
||||
if client.CallAppClient != nil {
|
||||
reconnectMsg := createStringMessage(TextMessage, "The equipment has been reconnected.")
|
||||
msgType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.CallAppClient.Conn, &msgType, reconnectMsg, client.CallAppClient.mu)
|
||||
}
|
||||
if len(client.CameraSubscriptionList) > 0 {
|
||||
onMsg := createMessage(OnCamera, nil)
|
||||
onType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.Conn, &onType, onMsg, client.mu)
|
||||
}
|
||||
client.LastTime = time.Now()
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if !isHave {
|
||||
client = &StackChanClient{
|
||||
Mac: mac,
|
||||
Conn: ws,
|
||||
mu: &sync.RWMutex{},
|
||||
phoneScreen: false,
|
||||
LastTime: time.Now(),
|
||||
}
|
||||
addStackChenClient(ctx, client)
|
||||
} else {
|
||||
// notify app
|
||||
onlineMsg := createStringMessage(DeviceOnline, "Your StackChan has been launched.")
|
||||
msgType := websocket.BinaryMessage
|
||||
// Notify App
|
||||
appClients := getAppClients(client.Mac)
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, &msgType, onlineMsg, appClient.mu)
|
||||
}
|
||||
}
|
||||
logger.Info(ctx, "There is a StackChen connected to the service.", client.Mac)
|
||||
defer func() {
|
||||
logger.Info(ctx, "There is a StackChan that has disconnected.", mac, deviceType)
|
||||
}()
|
||||
for {
|
||||
messageType, msg, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||
logger.Infof(ctx, "StackChan Normal disconnection: mac=%s, deviceType=%s, Reason=%v", mac, deviceType, err)
|
||||
break
|
||||
}
|
||||
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Temporary() {
|
||||
logger.Infof(ctx, "StackChan Temporary network error. Continue reading.: mac=%s,deviceType=%s,Error=%v", mac, deviceType, err)
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Errorf(ctx, "StackChan Abnormal disconnection: mac=%s, deviceType=%s, Error=%v", mac, deviceType, err)
|
||||
break
|
||||
}
|
||||
//logger.Infof(ctx, "收到StackChan端消息%d", len(msg))
|
||||
readStackChanMessage(ctx, client, &messageType, &msg)
|
||||
}
|
||||
} else if deviceType == "App" {
|
||||
deviceId := r.Get("deviceId").String()
|
||||
if deviceId == "" {
|
||||
r.Response.Write("The deviceId parameter in the App end is empty.")
|
||||
return
|
||||
}
|
||||
var client *AppClient
|
||||
found := false
|
||||
clients := getAppClients(mac)
|
||||
for _, appClient := range clients {
|
||||
if appClient.DeviceId == deviceId && appClient.Mac == mac {
|
||||
// Already available. Update the connection.
|
||||
client = appClient
|
||||
client.mu.Lock()
|
||||
client.Conn = ws
|
||||
client.mu.Unlock()
|
||||
client.LastTime = time.Now()
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
client = &AppClient{
|
||||
Mac: mac,
|
||||
Conn: ws,
|
||||
DeviceId: deviceId,
|
||||
mu: &sync.RWMutex{},
|
||||
LastTime: time.Now(),
|
||||
}
|
||||
addAppClient(client)
|
||||
}
|
||||
logger.Info(ctx, "There is an App connected to the service.", client.Mac)
|
||||
|
||||
// check StackChan status
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient == nil {
|
||||
offlineMsg := createStringMessage(DeviceOffline, "Your StackChan is offline.")
|
||||
msgType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.Conn, &msgType, offlineMsg, client.mu)
|
||||
} else {
|
||||
onlineMsg := createStringMessage(DeviceOnline, "Your StackChan has been launched.")
|
||||
msgType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.Conn, &msgType, onlineMsg, client.mu)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
logger.Info(ctx, "There is an App that has disconnected.", mac, deviceType)
|
||||
}()
|
||||
for {
|
||||
messageType, msg, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
var ne net.Error
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||
logger.Infof(ctx, "App Normal disconnection: mac=%s, deviceType=%s, Error=%v", mac, deviceType, err)
|
||||
break
|
||||
}
|
||||
if errors.As(err, &ne) && ne.Temporary() {
|
||||
logger.Infof(ctx, "App Temporary network error. Continue reading.: mac=%s,deviceType=%s,Error=%v", mac, deviceType, err)
|
||||
continue
|
||||
}
|
||||
if errors.As(err, &ne) && ne.Timeout() {
|
||||
logger.Infof(ctx, "App Timeout disconnection: mac=%s, deviceType=%s", mac, deviceType)
|
||||
break
|
||||
}
|
||||
logger.Errorf(ctx, "App Abnormal disconnection: mac=%s, deviceType=%s, Error=%v", mac, deviceType, err)
|
||||
break
|
||||
}
|
||||
client.LastTime = time.Now()
|
||||
readAppClientMessage(ctx, client, &messageType, &msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addStackChenClient adds a StackChan client to the connection pool and ensures the MAC is registered
|
||||
func addStackChenClient(ctx context.Context, c *StackChanClient) {
|
||||
stackChanClientPool.Store(c.Mac, c)
|
||||
_, _ = service.CreateMacIfNotExists(ctx, c.Mac)
|
||||
}
|
||||
|
||||
// addAppClient adds an App client to the App connection pool (multiple Apps per MAC allowed)
|
||||
func addAppClient(c *AppClient) {
|
||||
val, _ := appClientPool.Load(c.Mac)
|
||||
var clients []*AppClient
|
||||
if val == nil {
|
||||
clients = []*AppClient{c}
|
||||
} else {
|
||||
clients = val.([]*AppClient)
|
||||
clients = append(clients, c)
|
||||
}
|
||||
appClientPool.Store(c.Mac, clients)
|
||||
}
|
||||
|
||||
// getAppClients gets all App clients for the specified MAC address
|
||||
func getAppClients(mac string) []*AppClient {
|
||||
if val, ok := appClientPool.Load(mac); ok {
|
||||
return val.([]*AppClient)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getStackChanClient gets the StackChan client corresponding to the specified MAC address
|
||||
func getStackChanClient(mac string) *StackChanClient {
|
||||
if val, ok := stackChanClientPool.Load(mac); ok {
|
||||
return val.(*StackChanClient)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseBinaryMessage parses a custom binary protocol message, returns type, length, payload, and success status
|
||||
func parseBinaryMessage(ctx context.Context, msg *[]byte) (byte, int, []byte, bool) {
|
||||
if len(*msg) < 1+4 {
|
||||
logger.Warning(ctx, "Message too short, cannot parse header, message not forwarded")
|
||||
return 0, 0, nil, false
|
||||
}
|
||||
|
||||
msgType := (*msg)[0]
|
||||
dataLen := int(binary.BigEndian.Uint32((*msg)[1:5]))
|
||||
payload := (*msg)[5 : 5+dataLen]
|
||||
|
||||
if len(*msg)-5 != dataLen {
|
||||
logger.Warningf(ctx, "Length mismatch: header says %d, actual is %d, message not forwarded", dataLen, len(*msg)-5)
|
||||
return 0, 0, nil, false
|
||||
}
|
||||
|
||||
return msgType, dataLen, payload, true
|
||||
}
|
||||
|
||||
// StartPingTime sends Ping messages to all connected clients for heartbeat detection
|
||||
func StartPingTime(ctx context.Context) {
|
||||
message := createMessage(ping, nil)
|
||||
messageType := websocket.BinaryMessage
|
||||
|
||||
// Iterate over StackChanClientPool
|
||||
stackChanClientPool.Range(func(_, value any) bool {
|
||||
client := value.(*StackChanClient)
|
||||
forwardMessage(ctx, client.Conn, &messageType, message, client.mu)
|
||||
return true // continue iteration
|
||||
})
|
||||
|
||||
// Iterate over AppClientPool
|
||||
appClientPool.Range(func(_, value any) bool {
|
||||
clients := value.([]*AppClient)
|
||||
for _, client := range clients {
|
||||
forwardMessage(ctx, client.Conn, &messageType, message, client.mu)
|
||||
}
|
||||
return true // continue iteration
|
||||
})
|
||||
}
|
||||
|
||||
// CheckExpiredLinks checks and cleans up App client connections that have been inactive for over 60 seconds
|
||||
func CheckExpiredLinks(ctx context.Context) {
|
||||
now := time.Now()
|
||||
var expiredClients []*AppClient
|
||||
|
||||
// First, iterate over AppClientPool
|
||||
appClientPool.Range(func(mac, value any) bool {
|
||||
clients := value.([]*AppClient)
|
||||
newClients := clients[:0]
|
||||
for _, client := range clients {
|
||||
if now.Sub(client.LastTime) > time.Second*15 {
|
||||
// Found expired client
|
||||
// Iterate over StackChanClientPool to clean up CallAppClient and CameraSubscriptionList
|
||||
stackChanClientPool.Range(func(_, scValue any) bool {
|
||||
stackChanClient, ok := scValue.(*StackChanClient)
|
||||
stackChanClient.mu.Lock()
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
// Clean up CallAppClient
|
||||
if stackChanClient.CallAppClient == client {
|
||||
stackChanClient.CallAppClient = nil
|
||||
}
|
||||
|
||||
// Update camera subscription list
|
||||
newCamera := stackChanClient.CameraSubscriptionList[:0]
|
||||
removedCamera := false
|
||||
for _, sub := range stackChanClient.CameraSubscriptionList {
|
||||
if sub != client {
|
||||
newCamera = append(newCamera, sub)
|
||||
} else {
|
||||
removedCamera = true
|
||||
}
|
||||
}
|
||||
stackChanClient.CameraSubscriptionList = newCamera
|
||||
stackChanClient.mu.Unlock()
|
||||
if removedCamera && len(newCamera) == 0 {
|
||||
msg := createMessage(OffCamera, nil)
|
||||
msgType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, stackChanClient.Conn, &msgType, msg, stackChanClient.mu)
|
||||
}
|
||||
return true
|
||||
})
|
||||
expiredClients = append(expiredClients, client)
|
||||
} else {
|
||||
newClients = append(newClients, client)
|
||||
}
|
||||
}
|
||||
if len(newClients) == 0 {
|
||||
appClientPool.Delete(mac)
|
||||
} else {
|
||||
appClientPool.Store(mac, newClients)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
for _, client := range expiredClients {
|
||||
logger.Infof(ctx, "Kicked out an expired App client: %s", client.Mac)
|
||||
err := client.Conn.Close()
|
||||
if err != nil {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readStackChanMessage handles messages from the StackChan device side
|
||||
func readStackChanMessage(ctx context.Context, client *StackChanClient, messageType *int, msg *[]byte) {
|
||||
if *messageType == websocket.BinaryMessage {
|
||||
msgType, _, _, ok := parseBinaryMessage(ctx, msg)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch msgType {
|
||||
case pong:
|
||||
break
|
||||
case ControlAvatar, ControlMotion, OnCamera, OffCamera:
|
||||
break
|
||||
case RefuseCall:
|
||||
// Refused call, remove and notify appClient
|
||||
appClient := client.CallAppClient
|
||||
if appClient != nil {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
client.mu.Lock()
|
||||
client.CallAppClient = nil
|
||||
client.mu.Unlock()
|
||||
}
|
||||
break
|
||||
case AgreeCall:
|
||||
// Agreed to call
|
||||
appClient := client.CallAppClient
|
||||
if appClient != nil {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
client.mu.Lock()
|
||||
client.CameraSubscriptionList = append(client.CameraSubscriptionList, appClient)
|
||||
client.mu.Unlock()
|
||||
if len(client.CameraSubscriptionList) == 1 {
|
||||
onMsg := createMessage(OnCamera, nil)
|
||||
onType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.Conn, &onType, onMsg, client.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case HangupCall:
|
||||
// Hang up call
|
||||
appClient := client.CallAppClient
|
||||
if appClient != nil {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
// Remove the client from the subscription list
|
||||
newList := client.CameraSubscriptionList[:0]
|
||||
for _, subClient := range client.CameraSubscriptionList {
|
||||
if subClient != appClient {
|
||||
newList = append(newList, subClient)
|
||||
}
|
||||
}
|
||||
client.mu.Lock()
|
||||
client.CameraSubscriptionList = newList
|
||||
client.mu.Unlock()
|
||||
// If the subscription list is empty, notify to turn off the camera
|
||||
if len(client.CameraSubscriptionList) == 0 {
|
||||
offMsg := createMessage(OffCamera, nil)
|
||||
offType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, client.Conn, &offType, offMsg, client.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case GetDeviceName:
|
||||
// Query device name
|
||||
name, err := service.GetDeviceName(ctx, client.Mac)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if name == "" {
|
||||
logger.Infof(ctx, "Queried device nickname is empty")
|
||||
return
|
||||
}
|
||||
newMsg := createStringMessage(GetDeviceName, name)
|
||||
forwardMessage(ctx, client.Conn, messageType, newMsg, client.mu)
|
||||
break
|
||||
case Opus:
|
||||
|
||||
break
|
||||
case Jpeg:
|
||||
subscribers := client.CameraSubscriptionList
|
||||
if len(subscribers) > 0 {
|
||||
var isAll = true
|
||||
for _, subClient := range subscribers {
|
||||
if subClient.Conn != nil {
|
||||
isAll = false
|
||||
}
|
||||
forwardMessage(ctx, subClient.Conn, messageType, msg, subClient.mu)
|
||||
}
|
||||
if isAll {
|
||||
msg = createMessage(OffCamera, nil)
|
||||
forwardMessage(ctx, client.Conn, messageType, msg, client.mu)
|
||||
}
|
||||
} else {
|
||||
msg = createMessage(OffCamera, nil)
|
||||
forwardMessage(ctx, client.Conn, messageType, msg, client.mu)
|
||||
}
|
||||
break
|
||||
case GetAvatarPosture:
|
||||
appClients := getAppClients(client.Mac)
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
}
|
||||
break
|
||||
default:
|
||||
logger.Infof(ctx, "Unknown binary msgType: %d", msgType)
|
||||
appClients := getAppClients(client.Mac)
|
||||
if appClients != nil {
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if *messageType == websocket.TextMessage {
|
||||
appClients := getAppClients(client.Mac)
|
||||
if appClients != nil {
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
}
|
||||
}
|
||||
} else if *messageType == websocket.PingMessage {
|
||||
logger.Info(ctx, "Received ping message from StackChan side")
|
||||
}
|
||||
}
|
||||
|
||||
// readAppClientMessage handles messages from the App side
|
||||
func readAppClientMessage(ctx context.Context, client *AppClient, messageType *int, msg *[]byte) {
|
||||
if *messageType == websocket.BinaryMessage {
|
||||
msgType, _, payload, ok := parseBinaryMessage(ctx, msg)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch msgType {
|
||||
case pong:
|
||||
break
|
||||
case GetDeviceName:
|
||||
// Query device name
|
||||
name, err := service.GetDeviceName(ctx, client.Mac)
|
||||
if err != nil {
|
||||
logger.Errorf(ctx, err.Error())
|
||||
return
|
||||
}
|
||||
if name == "" {
|
||||
logger.Infof(ctx, "Queried device nickname is empty")
|
||||
return
|
||||
}
|
||||
newMsg := createStringMessage(GetDeviceName, name)
|
||||
logger.Infof(ctx, "Device name found, returning: "+name)
|
||||
forwardMessage(ctx, client.Conn, messageType, newMsg, client.mu)
|
||||
break
|
||||
case UpdateDeviceName:
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
appClients := getAppClients(client.Mac)
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, msg, appClient.mu)
|
||||
}
|
||||
break
|
||||
case Opus:
|
||||
break
|
||||
case Jpeg:
|
||||
if len(payload) < 12 {
|
||||
logger.Warningf(ctx, "Payload too short, cannot parse MAC address: %v", payload)
|
||||
return
|
||||
}
|
||||
macAddrBytes := payload[:12]
|
||||
data := payload[12:]
|
||||
macAddr := string(macAddrBytes)
|
||||
newMsg := createMessage(msgType, data)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
if stackChanClient.phoneScreen {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, newMsg, stackChanClient.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case ControlAvatar, ControlMotion:
|
||||
if len(payload) < 12 {
|
||||
logger.Warningf(ctx, "Payload too short, cannot parse MAC address: %v", payload)
|
||||
return
|
||||
}
|
||||
macAddrBytes := payload[:12]
|
||||
data := payload[12:]
|
||||
macAddr := string(macAddrBytes)
|
||||
newMsg := createMessage(msgType, data)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, newMsg, stackChanClient.mu)
|
||||
} else {
|
||||
logger.Infof(ctx, "StackChan is currently offline")
|
||||
}
|
||||
break
|
||||
case TextMessage:
|
||||
if len(payload) < 12 {
|
||||
logger.Warningf(ctx, "Payload too short, cannot parse MAC address: %v", payload)
|
||||
return
|
||||
}
|
||||
macAddr := string(payload[:12])
|
||||
data := payload[12:]
|
||||
newMsg := createMessage(msgType, data)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, newMsg, stackChanClient.mu)
|
||||
}
|
||||
appClients := getAppClients(macAddr)
|
||||
if appClients != nil {
|
||||
for _, appClient := range appClients {
|
||||
forwardMessage(ctx, appClient.Conn, messageType, newMsg, appClient.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case RequestCall:
|
||||
// Request call
|
||||
if len(payload) < 12 {
|
||||
logger.Warningf(ctx, "Payload too short, cannot parse MAC address: %v", payload)
|
||||
return
|
||||
}
|
||||
macAddr := string(payload[:12])
|
||||
data := payload[12:]
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
stackChanClient.mu.Lock()
|
||||
if stackChanClient.CallAppClient == nil || stackChanClient.CallAppClient == client {
|
||||
stackChanClient.CallAppClient = client
|
||||
stackChanClient.mu.Unlock()
|
||||
newMsg := createMessage(msgType, data)
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, newMsg, stackChanClient.mu)
|
||||
} else {
|
||||
stackChanClient.mu.Unlock()
|
||||
// Notify App that the other side is already in a call
|
||||
newMsg := createStringMessage(inCall, "The other party is currently in a call")
|
||||
forwardMessage(ctx, client.Conn, messageType, newMsg, client.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case HangupCall:
|
||||
stackChanClientPool.Range(func(_, value any) bool {
|
||||
stackChanClient := value.(*StackChanClient)
|
||||
if stackChanClient.CallAppClient == client {
|
||||
// Found corresponding call
|
||||
stackChanClient.mu.Lock()
|
||||
stackChanClient.CallAppClient = nil
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
|
||||
newList := stackChanClient.CameraSubscriptionList[:0]
|
||||
for _, sub := range stackChanClient.CameraSubscriptionList {
|
||||
if sub != client {
|
||||
newList = append(newList, sub)
|
||||
}
|
||||
}
|
||||
stackChanClient.CameraSubscriptionList = newList
|
||||
stackChanClient.mu.Unlock()
|
||||
if len(stackChanClient.CameraSubscriptionList) == 0 {
|
||||
offMsg := createMessage(OffCamera, nil)
|
||||
offType := websocket.BinaryMessage
|
||||
forwardMessage(ctx, stackChanClient.Conn, &offType, offMsg, stackChanClient.mu)
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
break
|
||||
case OnCamera:
|
||||
macAddr := string(payload)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
stackChanClient.mu.Lock()
|
||||
alreadySubscribed := false
|
||||
for _, sub := range stackChanClient.CameraSubscriptionList {
|
||||
if sub == client {
|
||||
alreadySubscribed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !alreadySubscribed {
|
||||
stackChanClient.CameraSubscriptionList = append(stackChanClient.CameraSubscriptionList, client)
|
||||
stackChanClient.mu.Unlock()
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
} else {
|
||||
stackChanClient.mu.Unlock()
|
||||
}
|
||||
}
|
||||
break
|
||||
case OffCamera:
|
||||
macAddr := string(payload)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
stackChanClient.mu.Lock()
|
||||
existed := false
|
||||
newList := stackChanClient.CameraSubscriptionList[:0]
|
||||
for _, subClient := range stackChanClient.CameraSubscriptionList {
|
||||
if subClient == client {
|
||||
existed = true
|
||||
} else {
|
||||
newList = append(newList, subClient)
|
||||
}
|
||||
}
|
||||
shouldNotify := existed && len(newList) == 0
|
||||
stackChanClient.CameraSubscriptionList = newList
|
||||
stackChanClient.mu.Unlock()
|
||||
if shouldNotify {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
}
|
||||
break
|
||||
case OnPhoneScreen:
|
||||
// Show phone screen
|
||||
macAddr := string(payload)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
stackChanClient.mu.Lock()
|
||||
if stackChanClient.phoneScreen == false {
|
||||
stackChanClient.phoneScreen = true
|
||||
stackChanClient.mu.Unlock()
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
} else {
|
||||
stackChanClient.mu.Unlock()
|
||||
}
|
||||
}
|
||||
break
|
||||
case OffPhoneScreen:
|
||||
// Hide phone screen
|
||||
macAddr := string(payload)
|
||||
stackChanClient := getStackChanClient(macAddr)
|
||||
if stackChanClient != nil {
|
||||
stackChanClient.mu.Lock()
|
||||
if stackChanClient.phoneScreen == true {
|
||||
stackChanClient.phoneScreen = false
|
||||
stackChanClient.mu.Unlock()
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
} else {
|
||||
stackChanClient.mu.Unlock()
|
||||
}
|
||||
}
|
||||
break
|
||||
case Dance:
|
||||
// Dance message
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
break
|
||||
case GetAvatarPosture:
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
default:
|
||||
logger.Infof(ctx, "Unknown binary msgType: %d", msgType)
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
}
|
||||
} else if *messageType == websocket.TextMessage {
|
||||
// Directly forward other message types
|
||||
stackChanClient := getStackChanClient(client.Mac)
|
||||
if stackChanClient != nil {
|
||||
forwardMessage(ctx, stackChanClient.Conn, messageType, msg, stackChanClient.mu)
|
||||
}
|
||||
} else if *messageType == websocket.PingMessage {
|
||||
logger.Info(ctx, "Received ping message from App side")
|
||||
}
|
||||
}
|
||||
|
||||
// forwardMessage forwards a message to the specified connection, with mutex for concurrency safety
|
||||
func forwardMessage(ctx context.Context, conn *websocket.Conn, messageType *int, msg *[]byte, mu *sync.RWMutex) {
|
||||
if conn == nil {
|
||||
logger.Infof(ctx, "StackChan is currently offline")
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
err := conn.WriteMessage(*messageType, *msg)
|
||||
if err != nil {
|
||||
//logger.Info(ctx, "Message forwarding failed: %v", err)
|
||||
} else {
|
||||
//logger.Info(ctx, "Message sent successfully")
|
||||
}
|
||||
}
|
||||
|
||||
// createMessage encapsulates a binary message according to custom protocol (type + length + data)
|
||||
func createMessage(msgType byte, data []byte) *[]byte {
|
||||
var dataLen int
|
||||
if data != nil {
|
||||
dataLen = len(data)
|
||||
} else {
|
||||
dataLen = 0
|
||||
}
|
||||
msg := make([]byte, 1+4+dataLen)
|
||||
msg[0] = msgType
|
||||
binary.BigEndian.PutUint32(msg[1:5], uint32(dataLen))
|
||||
if dataLen > 0 {
|
||||
copy(msg[5:], data)
|
||||
}
|
||||
return &msg
|
||||
}
|
||||
|
||||
// createStringMessage creates a binary message with a string payload
|
||||
func createStringMessage(msgType byte, data string) *[]byte {
|
||||
return createMessage(msgType, []byte(data))
|
||||
}
|
||||
|
||||
// GetRandomStackChanDevice get Random StackChan Device list
|
||||
func GetRandomStackChanDevice(userMac string, maxLength int) (list []string) {
|
||||
if maxLength <= 0 {
|
||||
return []string{}
|
||||
}
|
||||
var macs []string
|
||||
|
||||
stackChanClientPool.Range(func(key, value interface{}) bool {
|
||||
mac := key.(string)
|
||||
client := value.(*StackChanClient)
|
||||
|
||||
if mac == userMac {
|
||||
return true
|
||||
}
|
||||
|
||||
client.mu.RLock()
|
||||
online := client.Conn != nil
|
||||
client.mu.RUnlock()
|
||||
|
||||
if online {
|
||||
macs = append(macs, mac)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if len(macs) == 0 {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
r.Shuffle(len(macs), func(i, j int) {
|
||||
macs[i], macs[j] = macs[j], macs[i]
|
||||
})
|
||||
|
||||
if len(macs) > maxLength {
|
||||
macs = macs[:maxLength]
|
||||
}
|
||||
|
||||
return macs
|
||||
}
|
||||
Reference in New Issue
Block a user