online.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package online
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/gookit/event"
  7. "github.com/gorilla/websocket"
  8. "github.com/patrickmn/go-cache"
  9. "github.com/sirupsen/logrus"
  10. "google.golang.org/protobuf/proto"
  11. "nn.daxia.dev/gameproto"
  12. "nn.daxia.dev/model"
  13. )
  14. type Msg struct {
  15. RoomID uint32
  16. Offset int
  17. NotifyType uint32
  18. Data string
  19. }
  20. type OnlineUser struct {
  21. UserID uint
  22. RoomID uint32
  23. ChairID uint32
  24. Conn *websocket.Conn
  25. Locker sync.Mutex
  26. IsRunning bool
  27. Event chan bool
  28. MsgList []Msg
  29. MsgIndex int
  30. }
  31. func NewOnlineUser(userID uint, roomID, chairID uint32, conn *websocket.Conn) *OnlineUser {
  32. return &OnlineUser{
  33. UserID: userID,
  34. RoomID: roomID,
  35. ChairID: chairID,
  36. Conn: conn,
  37. Locker: sync.Mutex{},
  38. IsRunning: false,
  39. Event: make(chan bool),
  40. MsgList: make([]Msg, 0),
  41. MsgIndex: 0,
  42. }
  43. }
  44. func (p *OnlineUser) Start() {
  45. p.IsRunning = true
  46. //监听消息
  47. evnetUser := model.EventNotifyMsg + fmt.Sprintf("user_%d", p.UserID)
  48. event.On(evnetUser, event.ListenerFunc(func(e event.Event) error {
  49. p.Locker.Lock()
  50. defer p.Locker.Unlock()
  51. if !p.IsRunning {
  52. return nil
  53. }
  54. msg := e.Get("msg").(Msg)
  55. p.MsgIndex = p.MsgIndex + 1
  56. msg.Offset = p.MsgIndex
  57. p.MsgList = append(p.MsgList, msg)
  58. logrus.Debugf("event ready send: %v", evnetUser)
  59. go func() {
  60. defer func() {
  61. recover()
  62. }()
  63. p.Event <- true
  64. }()
  65. return nil
  66. }))
  67. //退出则清理一下
  68. defer func() {
  69. //cancel event listener
  70. event.On(evnetUser, event.ListenerFunc(func(e event.Event) error { return nil }))
  71. p.Stop()
  72. }()
  73. for {
  74. if !p.IsRunning {
  75. break
  76. }
  77. <-p.Event
  78. //遍历,发送通知
  79. sendMsgList := func() []Msg {
  80. p.Locker.Lock()
  81. defer p.Locker.Unlock()
  82. msgTmp := make([]Msg, len(p.MsgList))
  83. copy(msgTmp, p.MsgList)
  84. p.MsgList = make([]Msg, 0)
  85. return msgTmp
  86. }()
  87. for _, item := range sendMsgList {
  88. err := SendMsg(p.Conn, EmitData{
  89. UserID: p.UserID,
  90. NotifyType: gameproto.NotifyTypeEnum(item.NotifyType),
  91. Encrypt: false,
  92. Data: item.Data,
  93. })
  94. if err != nil {
  95. logrus.Error(err)
  96. return
  97. }
  98. }
  99. fmt.Println("running")
  100. }
  101. }
  102. func (p *OnlineUser) Stop() {
  103. if !p.IsRunning {
  104. return
  105. }
  106. p.Locker.Lock()
  107. defer p.Locker.Unlock()
  108. p.IsRunning = false
  109. go func() {
  110. p.Event <- true
  111. close(p.Event)
  112. }()
  113. p.Conn.Close()
  114. }
  115. var OnlineUserCache = cache.New(time.Minute*3, 10*time.Minute)
  116. var locker = sync.Mutex{}
  117. func Init() {
  118. //删除的时候,取消
  119. OnlineUserCache.OnEvicted(func(key string, value interface{}) {
  120. onlineUser := value.(*OnlineUser)
  121. onlineUser.Stop()
  122. })
  123. event.On(model.EventConnect, event.ListenerFunc(func(e event.Event) error {
  124. userModel := e.Get("user_model").(*model.User)
  125. conn := e.Get("conn").(*websocket.Conn)
  126. Active(userModel.ID, conn)
  127. return nil
  128. }))
  129. event.On(model.EventPing, event.ListenerFunc(func(e event.Event) error {
  130. userModel := e.Get("user_model").(*model.User)
  131. conn := e.Get("conn").(*websocket.Conn)
  132. Active(userModel.ID, conn)
  133. return nil
  134. }))
  135. event.On(model.EventDisconnect, event.ListenerFunc(func(e event.Event) error {
  136. userModel := e.Get("user_model").(*model.User)
  137. conn := e.Get("conn").(*websocket.Conn)
  138. UnActive(userModel.ID, conn)
  139. return nil
  140. }))
  141. }
  142. func Active(userID uint, conn *websocket.Conn) {
  143. //logrus.Infof("active user id:%d", userID)
  144. locker.Lock()
  145. defer locker.Unlock()
  146. var onlineUser *OnlineUser
  147. userKey := GetKey(userID)
  148. user, exists := OnlineUserCache.Get(userKey)
  149. if !exists {
  150. onlineUser = &OnlineUser{
  151. UserID: userID,
  152. RoomID: 0,
  153. ChairID: 0,
  154. Conn: conn,
  155. Locker: sync.Mutex{},
  156. IsRunning: false,
  157. Event: make(chan bool),
  158. MsgList: make([]Msg, 0),
  159. MsgIndex: 0,
  160. }
  161. go onlineUser.Start()
  162. OnlineUserCache.Set(userKey, onlineUser, time.Minute*3)
  163. } else {
  164. currConn := user.(*OnlineUser).Conn
  165. if currConn != conn {
  166. //TODO:发送掉线信息
  167. data, err := proto.Marshal(&gameproto.Disconnect{
  168. UserID: uint32(userID),
  169. })
  170. if err != nil {
  171. logrus.Error(err)
  172. return
  173. }
  174. SendMsg(currConn, EmitData{
  175. UserID: userID,
  176. NotifyType: gameproto.NotifyTypeEnum_NotifyTypeDisconnect,
  177. Encrypt: false,
  178. Data: string(data),
  179. })
  180. currConn.Close()
  181. user.(*OnlineUser).Conn = conn
  182. }
  183. OnlineUserCache.Set(userKey, user, time.Minute*3)
  184. }
  185. }
  186. func IsActive(userID uint) bool {
  187. locker.Lock()
  188. defer locker.Unlock()
  189. userKey := GetKey(userID)
  190. _, exists := OnlineUserCache.Get(userKey)
  191. return exists
  192. }
  193. func UnActive(userID uint, conn *websocket.Conn) {
  194. locker.Lock()
  195. defer locker.Unlock()
  196. userKey := GetKey(userID)
  197. user, exists := OnlineUserCache.Get(userKey)
  198. if !exists {
  199. return
  200. }
  201. currConn := user.(*OnlineUser).Conn
  202. if currConn != conn {
  203. return
  204. }
  205. OnlineUserCache.Delete(userKey)
  206. }
  207. func GetUserByRoom(roomID uint32) []uint32 {
  208. onlineUserIDList := make([]uint32, 0)
  209. roomModel := model.Room{}
  210. userList, err := roomModel.GetRoomUserList(roomID)
  211. if err != nil {
  212. logrus.Error(err)
  213. return onlineUserIDList
  214. }
  215. return userList
  216. }
  217. func GetUserByNotInRoom(roomID uint32) []uint32 {
  218. onlineUserIDList := make([]uint32, 0)
  219. locker.Lock()
  220. defer locker.Unlock()
  221. items := OnlineUserCache.Items()
  222. for _, item := range items {
  223. onlineUser := item.Object.(*OnlineUser)
  224. if onlineUser.RoomID == roomID {
  225. continue
  226. }
  227. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  228. }
  229. return onlineUserIDList
  230. }
  231. func GetAllUser() []uint32 {
  232. onlineUserIDList := make([]uint32, 0)
  233. locker.Lock()
  234. defer locker.Unlock()
  235. items := OnlineUserCache.Items()
  236. for _, item := range items {
  237. onlineUser := item.Object.(*OnlineUser)
  238. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  239. }
  240. return onlineUserIDList
  241. }
  242. func GetKey(userID uint) string {
  243. return fmt.Sprintf("online_user_%d", userID)
  244. }