package online import ( "fmt" "sync" "time" "github.com/gookit/event" "github.com/gorilla/websocket" "github.com/patrickmn/go-cache" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "nn.daxia.dev/gameproto" "nn.daxia.dev/model" ) type Msg struct { RoomID uint32 Offset int NotifyType uint32 Data string } type OnlineUser struct { UserID uint RoomID uint32 ChairID uint32 Conn *websocket.Conn Locker sync.Mutex IsRunning bool Event chan bool MsgList []Msg MsgIndex int } func NewOnlineUser(userID uint, roomID, chairID uint32, conn *websocket.Conn) *OnlineUser { return &OnlineUser{ UserID: userID, RoomID: roomID, ChairID: chairID, Conn: conn, Locker: sync.Mutex{}, IsRunning: false, Event: make(chan bool), MsgList: make([]Msg, 0), MsgIndex: 0, } } func (p *OnlineUser) Start() { p.IsRunning = true //监听消息 evnetUser := model.EventNotifyMsg + fmt.Sprintf("user_%d", p.UserID) event.On(evnetUser, event.ListenerFunc(func(e event.Event) error { p.Locker.Lock() defer p.Locker.Unlock() if !p.IsRunning { return nil } msg := e.Get("msg").(Msg) p.MsgIndex = p.MsgIndex + 1 msg.Offset = p.MsgIndex p.MsgList = append(p.MsgList, msg) logrus.Debugf("event ready send: %v", evnetUser) go func() { defer func() { recover() }() p.Event <- true }() return nil })) //退出则清理一下 defer func() { //cancel event listener event.On(evnetUser, event.ListenerFunc(func(e event.Event) error { return nil })) p.Stop() }() for { if !p.IsRunning { break } <-p.Event //遍历,发送通知 sendMsgList := func() []Msg { p.Locker.Lock() defer p.Locker.Unlock() msgTmp := make([]Msg, len(p.MsgList)) copy(msgTmp, p.MsgList) p.MsgList = make([]Msg, 0) return msgTmp }() for _, item := range sendMsgList { err := SendMsg(p.Conn, EmitData{ UserID: p.UserID, NotifyType: gameproto.NotifyTypeEnum(item.NotifyType), Encrypt: false, Data: item.Data, }) if err != nil { logrus.Error(err) return } } fmt.Println("running") } } func (p *OnlineUser) Stop() { if !p.IsRunning { return } p.Locker.Lock() defer p.Locker.Unlock() p.IsRunning = false go func() { p.Event <- true close(p.Event) }() p.Conn.Close() } var OnlineUserCache = cache.New(time.Minute*3, 10*time.Minute) var locker = sync.Mutex{} func Init() { //删除的时候,取消 OnlineUserCache.OnEvicted(func(key string, value interface{}) { onlineUser := value.(*OnlineUser) onlineUser.Stop() }) event.On(model.EventConnect, event.ListenerFunc(func(e event.Event) error { userModel := e.Get("user_model").(*model.User) conn := e.Get("conn").(*websocket.Conn) Active(userModel.ID, conn) return nil })) event.On(model.EventPing, event.ListenerFunc(func(e event.Event) error { userModel := e.Get("user_model").(*model.User) conn := e.Get("conn").(*websocket.Conn) Active(userModel.ID, conn) return nil })) event.On(model.EventDisconnect, event.ListenerFunc(func(e event.Event) error { userModel := e.Get("user_model").(*model.User) conn := e.Get("conn").(*websocket.Conn) UnActive(userModel.ID, conn) return nil })) } func Active(userID uint, conn *websocket.Conn) { //logrus.Infof("active user id:%d", userID) locker.Lock() defer locker.Unlock() var onlineUser *OnlineUser userKey := GetKey(userID) user, exists := OnlineUserCache.Get(userKey) if !exists { onlineUser = &OnlineUser{ UserID: userID, RoomID: 0, ChairID: 0, Conn: conn, Locker: sync.Mutex{}, IsRunning: false, Event: make(chan bool), MsgList: make([]Msg, 0), MsgIndex: 0, } go onlineUser.Start() OnlineUserCache.Set(userKey, onlineUser, time.Minute*3) } else { currConn := user.(*OnlineUser).Conn if currConn != conn { //TODO:发送掉线信息 data, err := proto.Marshal(&gameproto.Disconnect{ UserID: uint32(userID), }) if err != nil { logrus.Error(err) return } SendMsg(currConn, EmitData{ UserID: userID, NotifyType: gameproto.NotifyTypeEnum_NotifyTypeDisconnect, Encrypt: false, Data: string(data), }) currConn.Close() user.(*OnlineUser).Conn = conn } OnlineUserCache.Set(userKey, user, time.Minute*3) } } func IsActive(userID uint) bool { locker.Lock() defer locker.Unlock() userKey := GetKey(userID) _, exists := OnlineUserCache.Get(userKey) return exists } func UnActive(userID uint, conn *websocket.Conn) { locker.Lock() defer locker.Unlock() userKey := GetKey(userID) user, exists := OnlineUserCache.Get(userKey) if !exists { return } currConn := user.(*OnlineUser).Conn if currConn != conn { return } OnlineUserCache.Delete(userKey) } func GetUserByRoom(roomID uint32) []uint32 { onlineUserIDList := make([]uint32, 0) roomModel := model.Room{} userList, err := roomModel.GetRoomUserList(roomID) if err != nil { logrus.Error(err) return onlineUserIDList } return userList } func GetUserByNotInRoom(roomID uint32) []uint32 { onlineUserIDList := make([]uint32, 0) locker.Lock() defer locker.Unlock() items := OnlineUserCache.Items() for _, item := range items { onlineUser := item.Object.(*OnlineUser) if onlineUser.RoomID == roomID { continue } onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID)) } return onlineUserIDList } func GetAllUser() []uint32 { onlineUserIDList := make([]uint32, 0) locker.Lock() defer locker.Unlock() items := OnlineUserCache.Items() for _, item := range items { onlineUser := item.Object.(*OnlineUser) onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID)) } return onlineUserIDList } func GetKey(userID uint) string { return fmt.Sprintf("online_user_%d", userID) }