online.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package online
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. "github.com/patrickmn/go-cache"
  8. "github.com/sirupsen/logrus"
  9. "gogs.daxia.dev/huanan/pkg.daxia.dev/event2"
  10. "google.golang.org/protobuf/proto"
  11. "nn.daxia.dev/gameproto"
  12. "nn.daxia.dev/model"
  13. )
  14. type Msg struct {
  15. RoomID uint32
  16. Offset int
  17. NotifyType uint32
  18. Data string
  19. }
  20. type EventMsg struct {
  21. Msg Msg
  22. }
  23. type EventConnect struct {
  24. UserModel *model.User
  25. Conn *websocket.Conn
  26. }
  27. type EventDisconnect struct {
  28. UserModel *model.User
  29. Conn *websocket.Conn
  30. }
  31. type EventPing struct {
  32. UserModel *model.User
  33. Conn *websocket.Conn
  34. }
  35. type OnlineUser struct {
  36. UserID uint
  37. RoomID uint32
  38. ChairID uint32
  39. Conn *websocket.Conn
  40. Locker sync.Mutex
  41. IsRunning bool
  42. Ready bool
  43. Event chan bool
  44. MsgList []Msg
  45. MsgIndex int
  46. }
  47. func NewOnlineUser(userID uint, roomID, chairID uint32, conn *websocket.Conn) *OnlineUser {
  48. return &OnlineUser{
  49. UserID: userID,
  50. RoomID: roomID,
  51. ChairID: chairID,
  52. Conn: conn,
  53. Locker: sync.Mutex{},
  54. IsRunning: false,
  55. Ready: false,
  56. Event: make(chan bool),
  57. MsgList: make([]Msg, 0),
  58. MsgIndex: 0,
  59. }
  60. }
  61. func (p *OnlineUser) Start() {
  62. p.IsRunning = true
  63. //监听消息
  64. evnetUser := model.EventNotifyMsg + fmt.Sprintf("user_%d", p.UserID)
  65. event2.On[EventMsg](evnetUser, event2.ListenerFunc[EventMsg](func(e event2.Event[EventMsg]) error {
  66. p.Locker.Lock()
  67. defer p.Locker.Unlock()
  68. if !p.IsRunning {
  69. return nil
  70. }
  71. msg := e.Data().Msg
  72. p.MsgIndex = p.MsgIndex + 1
  73. msg.Offset = p.MsgIndex
  74. p.MsgList = append(p.MsgList, msg)
  75. logrus.Debugf("event ready send: %v", evnetUser)
  76. go func() {
  77. defer func() {
  78. recover()
  79. }()
  80. p.Event <- true
  81. }()
  82. return nil
  83. }))
  84. //退出则清理一下
  85. defer func() {
  86. //cancel event listener
  87. event2.On[EventMsg](evnetUser, event2.ListenerFunc[EventMsg](func(e event2.Event[EventMsg]) error {
  88. return nil
  89. }))
  90. p.Stop()
  91. }()
  92. for {
  93. if !p.IsRunning {
  94. break
  95. }
  96. <-p.Event
  97. if !p.Ready {
  98. continue
  99. }
  100. //遍历,发送通知
  101. sendMsgList := func() []Msg {
  102. p.Locker.Lock()
  103. defer p.Locker.Unlock()
  104. msgTmp := make([]Msg, len(p.MsgList))
  105. copy(msgTmp, p.MsgList)
  106. p.MsgList = make([]Msg, 0)
  107. return msgTmp
  108. }()
  109. for _, item := range sendMsgList {
  110. err := SendMsg(p.Conn, EmitData{
  111. UserID: p.UserID,
  112. NotifyType: gameproto.NotifyTypeEnum(item.NotifyType),
  113. Encrypt: false,
  114. Data: item.Data,
  115. })
  116. if err != nil {
  117. logrus.Error(err)
  118. return
  119. }
  120. }
  121. fmt.Println("running")
  122. }
  123. }
  124. func (p *OnlineUser) SetReady() {
  125. if !p.IsRunning {
  126. return
  127. }
  128. p.Locker.Lock()
  129. defer p.Locker.Unlock()
  130. p.Ready = true
  131. }
  132. func (p *OnlineUser) Stop() {
  133. if !p.IsRunning {
  134. return
  135. }
  136. p.Locker.Lock()
  137. defer p.Locker.Unlock()
  138. p.IsRunning = false
  139. go func() {
  140. p.Event <- true
  141. close(p.Event)
  142. }()
  143. p.Conn.Close()
  144. }
  145. var OnlineUserCache = cache.New(time.Minute*3, 10*time.Minute)
  146. var locker = sync.Mutex{}
  147. func Init() {
  148. //删除的时候,取消
  149. OnlineUserCache.OnEvicted(func(key string, value interface{}) {
  150. onlineUser := value.(*OnlineUser)
  151. onlineUser.Stop()
  152. })
  153. event2.On[EventConnect](model.EventConnect, event2.ListenerFunc[EventConnect](func(e event2.Event[EventConnect]) error {
  154. userModel := e.Data().UserModel
  155. conn := e.Data().Conn
  156. Active(userModel.ID, conn)
  157. return nil
  158. }))
  159. event2.On[EventPing](model.EventPing, event2.ListenerFunc[EventPing](func(e event2.Event[EventPing]) error {
  160. userModel := e.Data().UserModel
  161. conn := e.Data().Conn
  162. Active(userModel.ID, conn)
  163. return nil
  164. }))
  165. event2.On[EventDisconnect](model.EventDisconnect, event2.ListenerFunc[EventDisconnect](func(e event2.Event[EventDisconnect]) error {
  166. userModel := e.Data().UserModel
  167. conn := e.Data().Conn
  168. UnActive(userModel.ID, conn)
  169. return nil
  170. }))
  171. }
  172. func Active(userID uint, conn *websocket.Conn) {
  173. //logrus.Infof("active user id:%d", userID)
  174. locker.Lock()
  175. defer locker.Unlock()
  176. var onlineUser *OnlineUser
  177. userKey := GetKey(userID)
  178. user, exists := OnlineUserCache.Get(userKey)
  179. if !exists {
  180. onlineUser = &OnlineUser{
  181. UserID: userID,
  182. RoomID: 0,
  183. ChairID: 0,
  184. Conn: conn,
  185. Locker: sync.Mutex{},
  186. IsRunning: false,
  187. Event: make(chan bool),
  188. MsgList: make([]Msg, 0),
  189. MsgIndex: 0,
  190. }
  191. go onlineUser.Start()
  192. OnlineUserCache.Set(userKey, onlineUser, time.Minute*3)
  193. } else {
  194. currConn := user.(*OnlineUser).Conn
  195. if currConn != conn {
  196. //TODO:发送掉线信息
  197. data, err := proto.Marshal(&gameproto.Disconnect{
  198. UserID: uint32(userID),
  199. })
  200. if err != nil {
  201. logrus.Error(err)
  202. return
  203. }
  204. SendMsg(currConn, EmitData{
  205. UserID: userID,
  206. NotifyType: gameproto.NotifyTypeEnum_NotifyTypeDisconnect,
  207. Encrypt: false,
  208. Data: string(data),
  209. })
  210. currConn.Close()
  211. user.(*OnlineUser).Conn = conn
  212. }
  213. OnlineUserCache.Set(userKey, user, time.Minute*3)
  214. }
  215. }
  216. func SetReady(userID uint) {
  217. locker.Lock()
  218. defer locker.Unlock()
  219. userKey := GetKey(userID)
  220. userObj, exists := OnlineUserCache.Get(userKey)
  221. if !exists {
  222. logrus.Error("用户不存在:", userID)
  223. return
  224. }
  225. user := userObj.(*OnlineUser)
  226. user.SetReady()
  227. }
  228. func IsActive(userID uint) bool {
  229. locker.Lock()
  230. defer locker.Unlock()
  231. userKey := GetKey(userID)
  232. _, exists := OnlineUserCache.Get(userKey)
  233. return exists
  234. }
  235. func UnActive(userID uint, conn *websocket.Conn) {
  236. locker.Lock()
  237. defer locker.Unlock()
  238. userKey := GetKey(userID)
  239. user, exists := OnlineUserCache.Get(userKey)
  240. if !exists {
  241. return
  242. }
  243. currConn := user.(*OnlineUser).Conn
  244. if currConn != conn {
  245. return
  246. }
  247. OnlineUserCache.Delete(userKey)
  248. }
  249. func GetUserByRoom(roomID uint32) []uint32 {
  250. onlineUserIDList := make([]uint32, 0)
  251. roomModel := model.Room{}
  252. userList, err := roomModel.GetRoomUserList(roomID)
  253. if err != nil {
  254. logrus.Error(err)
  255. return onlineUserIDList
  256. }
  257. return userList
  258. }
  259. func GetUserByNotInRoom(roomID uint32) []uint32 {
  260. onlineUserIDList := make([]uint32, 0)
  261. locker.Lock()
  262. defer locker.Unlock()
  263. items := OnlineUserCache.Items()
  264. for _, item := range items {
  265. onlineUser := item.Object.(*OnlineUser)
  266. if onlineUser.RoomID == roomID {
  267. continue
  268. }
  269. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  270. }
  271. return onlineUserIDList
  272. }
  273. func GetAllUser() []uint32 {
  274. onlineUserIDList := make([]uint32, 0)
  275. locker.Lock()
  276. defer locker.Unlock()
  277. items := OnlineUserCache.Items()
  278. for _, item := range items {
  279. onlineUser := item.Object.(*OnlineUser)
  280. onlineUserIDList = append(onlineUserIDList, uint32(onlineUser.UserID))
  281. }
  282. return onlineUserIDList
  283. }
  284. func GetKey(userID uint) string {
  285. return fmt.Sprintf("online_user_%d", userID)
  286. }