online.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package online
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/gookit/event"
  7. "github.com/gorilla/websocket"
  8. "github.com/patrickmn/go-cache"
  9. "github.com/sirupsen/logrus"
  10. "google.golang.org/protobuf/proto"
  11. "nn.daxia.dev/gameproto"
  12. "nn.daxia.dev/model"
  13. )
  14. type Msg struct {
  15. RoomID uint32
  16. Offset int
  17. NotifyType uint32
  18. Data string
  19. }
  20. type OnlineUser struct {
  21. UserID uint
  22. RoomID uint32
  23. ChairID uint32
  24. Conn *websocket.Conn
  25. Locker sync.Mutex
  26. IsRunning bool
  27. Event chan bool
  28. MsgList []Msg
  29. MsgIndex int
  30. }
  31. func NewOnlineUser(userID uint, roomID, chairID uint32, conn *websocket.Conn) *OnlineUser {
  32. return &OnlineUser{
  33. UserID: userID,
  34. RoomID: roomID,
  35. ChairID: chairID,
  36. Conn: conn,
  37. Locker: sync.Mutex{},
  38. IsRunning: false,
  39. Event: make(chan bool),
  40. MsgList: make([]Msg, 0),
  41. MsgIndex: 0,
  42. }
  43. }
  44. func (p *OnlineUser) Start() {
  45. p.IsRunning = true
  46. //监听消息
  47. evnetUser := model.EventNotifyMsg + fmt.Sprintf("user_%d", p.UserID)
  48. event.On(evnetUser, event.ListenerFunc(func(e event.Event) error {
  49. p.Locker.Lock()
  50. defer p.Locker.Unlock()
  51. msg := e.Get("msg").(Msg)
  52. p.MsgIndex = p.MsgIndex + 1
  53. msg.Offset = p.MsgIndex
  54. p.MsgList = append(p.MsgList, msg)
  55. logrus.Debugf("event ready send: %v", evnetUser)
  56. go func() {
  57. p.Event <- true
  58. }()
  59. return nil
  60. }))
  61. //退出则清理一下
  62. defer func() {
  63. //cancel event listener
  64. event.On(evnetUser, event.ListenerFunc(func(e event.Event) error { return nil }))
  65. p.Stop()
  66. }()
  67. for {
  68. if !p.IsRunning {
  69. break
  70. }
  71. <-p.Event
  72. //遍历,发送通知
  73. sendMsgList := func() []Msg {
  74. p.Locker.Lock()
  75. defer p.Locker.Unlock()
  76. msgTmp := make([]Msg, len(p.MsgList))
  77. copy(msgTmp, p.MsgList)
  78. p.MsgList = make([]Msg, 0)
  79. return msgTmp
  80. }()
  81. for _, item := range sendMsgList {
  82. err := SendMsg(p.Conn, EmitData{
  83. UserID: p.UserID,
  84. NotifyType: gameproto.NotifyTypeEnum(item.NotifyType),
  85. Encrypt: false,
  86. Data: item.Data,
  87. })
  88. if err != nil {
  89. logrus.Error(err)
  90. return
  91. }
  92. }
  93. fmt.Println("running")
  94. }
  95. }
  96. func (p *OnlineUser) Stop() {
  97. if !p.IsRunning {
  98. return
  99. }
  100. p.Locker.Lock()
  101. defer p.Locker.Unlock()
  102. p.IsRunning = false
  103. go func() {
  104. p.Event <- true
  105. }()
  106. p.Conn.Close()
  107. }
  108. var OnlineUserCache = cache.New(time.Minute*3, 10*time.Minute)
  109. var locker = sync.Mutex{}
  110. func Init() {
  111. //删除的时候,取消
  112. OnlineUserCache.OnEvicted(func(key string, value interface{}) {
  113. onlineUser := value.(*OnlineUser)
  114. onlineUser.Stop()
  115. })
  116. event.On(model.EventConnect, event.ListenerFunc(func(e event.Event) error {
  117. userModel := e.Get("user_model").(*model.User)
  118. conn := e.Get("conn").(*websocket.Conn)
  119. Active(userModel.ID, conn)
  120. return nil
  121. }))
  122. event.On(model.EventPing, event.ListenerFunc(func(e event.Event) error {
  123. userModel := e.Get("user_model").(*model.User)
  124. conn := e.Get("conn").(*websocket.Conn)
  125. Active(userModel.ID, conn)
  126. return nil
  127. }))
  128. event.On(model.EventDisconnect, event.ListenerFunc(func(e event.Event) error {
  129. userModel := e.Get("user_model").(*model.User)
  130. conn := e.Get("conn").(*websocket.Conn)
  131. UnActive(userModel.ID, conn)
  132. return nil
  133. }))
  134. }
  135. func Active(userID uint, conn *websocket.Conn) {
  136. //logrus.Infof("active user id:%d", userID)
  137. locker.Lock()
  138. defer locker.Unlock()
  139. var onlineUser *OnlineUser
  140. userKey := GetKey(userID)
  141. user, exists := OnlineUserCache.Get(userKey)
  142. if !exists {
  143. onlineUser = &OnlineUser{
  144. UserID: userID,
  145. RoomID: 0,
  146. ChairID: 0,
  147. Conn: conn,
  148. Locker: sync.Mutex{},
  149. IsRunning: false,
  150. Event: make(chan bool),
  151. MsgList: make([]Msg, 0),
  152. MsgIndex: 0,
  153. }
  154. go onlineUser.Start()
  155. OnlineUserCache.Set(userKey, onlineUser, time.Minute*3)
  156. } else {
  157. currConn := user.(*OnlineUser).Conn
  158. if currConn != conn {
  159. //TODO:发送掉线信息
  160. data, err := proto.Marshal(&gameproto.Disconnect{
  161. UserID: uint32(userID),
  162. })
  163. if err != nil {
  164. logrus.Error(err)
  165. return
  166. }
  167. SendMsg(currConn, EmitData{
  168. UserID: userID,
  169. NotifyType: gameproto.NotifyTypeEnum_NotifyTypeDisconnect,
  170. Encrypt: false,
  171. Data: string(data),
  172. })
  173. currConn.Close()
  174. user.(*OnlineUser).Conn = conn
  175. }
  176. OnlineUserCache.Set(userKey, user, time.Minute*3)
  177. }
  178. }
  179. func IsActive(userID uint) bool {
  180. locker.Lock()
  181. defer locker.Unlock()
  182. userKey := GetKey(userID)
  183. _, exists := OnlineUserCache.Get(userKey)
  184. return exists
  185. }
  186. func UnActive(userID uint, conn *websocket.Conn) {
  187. locker.Lock()
  188. defer locker.Unlock()
  189. userKey := GetKey(userID)
  190. user, exists := OnlineUserCache.Get(userKey)
  191. if !exists {
  192. return
  193. }
  194. currConn := user.(*OnlineUser).Conn
  195. if currConn != conn {
  196. return
  197. }
  198. OnlineUserCache.Delete(userKey)
  199. }
  200. func GetUserByRoom(roomID uint32) []uint32 {
  201. onlineUserIDList := make([]uint32, 0)
  202. roomModel := model.Room{}
  203. userList, err := roomModel.GetRoomUserList(roomID)
  204. if err != nil {
  205. logrus.Error(err)
  206. return onlineUserIDList
  207. }
  208. return userList
  209. }
  210. func GetUserByNotInRoom(roomID uint32) []uint32 {
  211. onlineUserIDList := make([]uint32, 0)
  212. locker.Lock()
  213. defer locker.Unlock()
  214. items := OnlineUserCache.Items()
  215. for _, item := range items {
  216. onlineUser := item.Object.(*OnlineUser)
  217. if onlineUser.RoomID == roomID {
  218. continue
  219. }
  220. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  221. }
  222. return onlineUserIDList
  223. }
  224. func GetAllUser() []uint32 {
  225. onlineUserIDList := make([]uint32, 0)
  226. locker.Lock()
  227. defer locker.Unlock()
  228. items := OnlineUserCache.Items()
  229. for _, item := range items {
  230. onlineUser := item.Object.(*OnlineUser)
  231. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  232. }
  233. return onlineUserIDList
  234. }
  235. func GetKey(userID uint) string {
  236. return fmt.Sprintf("online_user_%d", userID)
  237. }