123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- package online
- import (
- "fmt"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "github.com/patrickmn/go-cache"
- "github.com/sirupsen/logrus"
- "gogs.daxia.dev/huanan/pkg.daxia.dev/event2"
- "google.golang.org/protobuf/proto"
- "nn.daxia.dev/gameproto"
- "nn.daxia.dev/model"
- )
- type Msg struct {
- RoomID uint32
- Offset int
- NotifyType uint32
- Data string
- }
- type EventMsg struct {
- Msg Msg
- }
- type EventConnect struct {
- UserModel *model.User
- Conn *websocket.Conn
- }
- type EventDisconnect struct {
- UserModel *model.User
- Conn *websocket.Conn
- }
- type EventPing struct {
- UserModel *model.User
- Conn *websocket.Conn
- }
- type OnlineUser struct {
- UserID uint
- RoomID uint32
- ChairID uint32
- Conn *websocket.Conn
- Locker sync.Mutex
- IsRunning bool
- Ready bool
- Event chan bool
- MsgList []Msg
- MsgIndex int
- }
- func NewOnlineUser(userID uint, roomID, chairID uint32, conn *websocket.Conn) *OnlineUser {
- return &OnlineUser{
- UserID: userID,
- RoomID: roomID,
- ChairID: chairID,
- Conn: conn,
- Locker: sync.Mutex{},
- IsRunning: false,
- Ready: false,
- Event: make(chan bool),
- MsgList: make([]Msg, 0),
- MsgIndex: 0,
- }
- }
- func (p *OnlineUser) Start() {
- p.IsRunning = true
- //监听消息
- evnetUser := model.EventNotifyMsg + fmt.Sprintf("user_%d", p.UserID)
- event2.On[EventMsg](evnetUser, event2.ListenerFunc[EventMsg](func(e event2.Event[EventMsg]) error {
- p.Locker.Lock()
- defer p.Locker.Unlock()
- if !p.IsRunning {
- return nil
- }
- msg := e.Data().Msg
- p.MsgIndex = p.MsgIndex + 1
- msg.Offset = p.MsgIndex
- p.MsgList = append(p.MsgList, msg)
- logrus.Debugf("event ready send: %v", evnetUser)
- go func() {
- defer func() {
- recover()
- }()
- p.Event <- true
- }()
- return nil
- }))
- //退出则清理一下
- defer func() {
- //cancel event listener
- event2.On[EventMsg](evnetUser, event2.ListenerFunc[EventMsg](func(e event2.Event[EventMsg]) error {
- return nil
- }))
- p.Stop()
- }()
- for {
- if !p.IsRunning {
- break
- }
- <-p.Event
- if !p.Ready {
- continue
- }
- //遍历,发送通知
- sendMsgList := func() []Msg {
- p.Locker.Lock()
- defer p.Locker.Unlock()
- msgTmp := make([]Msg, len(p.MsgList))
- copy(msgTmp, p.MsgList)
- p.MsgList = make([]Msg, 0)
- return msgTmp
- }()
- for _, item := range sendMsgList {
- err := SendMsg(p.Conn, EmitData{
- UserID: p.UserID,
- NotifyType: gameproto.NotifyTypeEnum(item.NotifyType),
- Encrypt: false,
- Data: item.Data,
- })
- if err != nil {
- logrus.Error(err)
- return
- }
- }
- fmt.Println("running")
- }
- }
- func (p *OnlineUser) SetReady() {
- if !p.IsRunning {
- return
- }
- p.Locker.Lock()
- defer p.Locker.Unlock()
- p.Ready = true
- }
- func (p *OnlineUser) Stop() {
- if !p.IsRunning {
- return
- }
- p.Locker.Lock()
- defer p.Locker.Unlock()
- p.IsRunning = false
- go func() {
- p.Event <- true
- close(p.Event)
- }()
- p.Conn.Close()
- }
- var OnlineUserCache = cache.New(time.Minute*3, 10*time.Minute)
- var locker = sync.Mutex{}
- func Init() {
- //删除的时候,取消
- OnlineUserCache.OnEvicted(func(key string, value interface{}) {
- onlineUser := value.(*OnlineUser)
- onlineUser.Stop()
- })
- event2.On[EventConnect](model.EventConnect, event2.ListenerFunc[EventConnect](func(e event2.Event[EventConnect]) error {
- userModel := e.Data().UserModel
- conn := e.Data().Conn
- Active(userModel.ID, conn)
- return nil
- }))
- event2.On[EventPing](model.EventPing, event2.ListenerFunc[EventPing](func(e event2.Event[EventPing]) error {
- userModel := e.Data().UserModel
- conn := e.Data().Conn
- Active(userModel.ID, conn)
- return nil
- }))
- event2.On[EventDisconnect](model.EventDisconnect, event2.ListenerFunc[EventDisconnect](func(e event2.Event[EventDisconnect]) error {
- userModel := e.Data().UserModel
- conn := e.Data().Conn
- UnActive(userModel.ID, conn)
- return nil
- }))
- }
- func Active(userID uint, conn *websocket.Conn) {
- //logrus.Infof("active user id:%d", userID)
- locker.Lock()
- defer locker.Unlock()
- var onlineUser *OnlineUser
- userKey := GetKey(userID)
- user, exists := OnlineUserCache.Get(userKey)
- if !exists {
- onlineUser = &OnlineUser{
- UserID: userID,
- RoomID: 0,
- ChairID: 0,
- Conn: conn,
- Locker: sync.Mutex{},
- IsRunning: false,
- Event: make(chan bool),
- MsgList: make([]Msg, 0),
- MsgIndex: 0,
- }
- go onlineUser.Start()
- OnlineUserCache.Set(userKey, onlineUser, time.Minute*3)
- } else {
- currConn := user.(*OnlineUser).Conn
- if currConn != conn {
- //TODO:发送掉线信息
- data, err := proto.Marshal(&gameproto.Disconnect{
- UserID: uint32(userID),
- })
- if err != nil {
- logrus.Error(err)
- return
- }
- SendMsg(currConn, EmitData{
- UserID: userID,
- NotifyType: gameproto.NotifyTypeEnum_NotifyTypeDisconnect,
- Encrypt: false,
- Data: string(data),
- })
- currConn.Close()
- user.(*OnlineUser).Conn = conn
- }
- OnlineUserCache.Set(userKey, user, time.Minute*3)
- }
- }
- func SetReady(userID uint) {
- locker.Lock()
- defer locker.Unlock()
- userKey := GetKey(userID)
- userObj, exists := OnlineUserCache.Get(userKey)
- if !exists {
- logrus.Error("用户不存在:", userID)
- return
- }
- user := userObj.(*OnlineUser)
- user.SetReady()
- }
- func IsActive(userID uint) bool {
- locker.Lock()
- defer locker.Unlock()
- userKey := GetKey(userID)
- _, exists := OnlineUserCache.Get(userKey)
- return exists
- }
- func UnActive(userID uint, conn *websocket.Conn) {
- locker.Lock()
- defer locker.Unlock()
- userKey := GetKey(userID)
- user, exists := OnlineUserCache.Get(userKey)
- if !exists {
- return
- }
- currConn := user.(*OnlineUser).Conn
- if currConn != conn {
- return
- }
- OnlineUserCache.Delete(userKey)
- }
- func GetUserByRoom(roomID uint32) []uint32 {
- onlineUserIDList := make([]uint32, 0)
- roomModel := model.Room{}
- userList, err := roomModel.GetRoomUserList(roomID)
- if err != nil {
- logrus.Error(err)
- return onlineUserIDList
- }
- return userList
- }
- func GetUserByNotInRoom(roomID uint32) []uint32 {
- onlineUserIDList := make([]uint32, 0)
- locker.Lock()
- defer locker.Unlock()
- items := OnlineUserCache.Items()
- for _, item := range items {
- onlineUser := item.Object.(*OnlineUser)
- if onlineUser.RoomID == roomID {
- continue
- }
- onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
- }
- return onlineUserIDList
- }
- func GetAllUser() []uint32 {
- onlineUserIDList := make([]uint32, 0)
- locker.Lock()
- defer locker.Unlock()
- items := OnlineUserCache.Items()
- for _, item := range items {
- onlineUser := item.Object.(*OnlineUser)
- onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
- }
- return onlineUserIDList
- }
- func GetKey(userID uint) string {
- return fmt.Sprintf("online_user_%d", userID)
- }
|