start_live.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/interface/live/push-live/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. "go-common/library/sync/errgroup"
  10. "sync"
  11. "github.com/pkg/errors"
  12. )
  13. // LiveStartMessage 直播开播提醒推送消息
  14. func (s *Service) LiveStartMessage(ctx context.Context, msg *databus.Message) (err error) {
  15. defer msg.Commit()
  16. var total int
  17. // message
  18. m := new(model.StartLiveMessage)
  19. if err = json.Unmarshal(msg.Value, &m); err != nil {
  20. log.Error("[service.start_live|LiveStartMessage] json Unmarshal error(%v)", err)
  21. return
  22. }
  23. task := s.InitPushTask(m)
  24. midMap := s.GetMids(ctx, task)
  25. // do push
  26. total = s.Push(task, midMap)
  27. // create push task
  28. go s.CreatePushTask(task, total)
  29. log.Info("[service.push|LiveStartMessage] start live push done, total(%d), task(%v), model(%v), err(%v)",
  30. total, task, m, err)
  31. return
  32. }
  33. // InitPushTask 初始化开播提醒推送task
  34. func (s *Service) InitPushTask(m *model.StartLiveMessage) (task *model.ApPushTask) {
  35. s.mutex.RLock()
  36. currentPushTypes := s.pushTypes
  37. s.mutex.RUnlock()
  38. // push task model
  39. task = &model.ApPushTask{
  40. Type: model.LivePushType,
  41. TargetID: m.TargetID,
  42. AlertTitle: m.Uname,
  43. AlertBody: m.RoomTitle,
  44. MidSource: s.getSourceByTypes(currentPushTypes),
  45. LinkType: s.c.Push.LinkType,
  46. LinkValue: m.LinkValue,
  47. ExpireTime: m.ExpireTime,
  48. }
  49. return task
  50. }
  51. // GetMids 开播提醒,根据配置的策略从不同来源获取需要推送的用户id
  52. func (s *Service) GetMids(c context.Context, task *model.ApPushTask) map[int][]int64 {
  53. var (
  54. mutex sync.Mutex
  55. group = errgroup.Group{}
  56. fans = make(map[int64]bool)
  57. fansSP = make(map[int64]bool)
  58. midMap = make(map[int][]int64)
  59. midBlackList = make(map[int64]bool)
  60. )
  61. // 获取黑名单
  62. mb, err := s.dao.GetBlackList(c, task)
  63. if err != nil {
  64. log.Error("[service.start_live|GetMids] get black list error(%v), task(%+v)", err, task)
  65. } else {
  66. midBlackList = mb
  67. log.Info("[service.start_live|GetMids] get black list len(%d), task(%+v)", len(midBlackList), task)
  68. }
  69. // try get latest push options and expired time
  70. s.mutex.RLock()
  71. currentPushTypes := s.pushTypes
  72. s.mutex.RUnlock()
  73. // 开多个协程获取后求并集
  74. for _, t := range currentPushTypes {
  75. tp := string(t)
  76. group.Go(func() (e error) {
  77. var mFans, mSpe map[int64]bool
  78. switch tp {
  79. case model.StrategySwitch:
  80. // 直播开关
  81. mFans, mSpe, e = s.GetFansBySwitch(context.TODO(), task.TargetID)
  82. case model.StrategySpecial:
  83. // 只获取特别关注
  84. mFans, mSpe, e = s.dao.Fans(context.TODO(), task.TargetID, model.RelationSpecial)
  85. case model.StrategyFans:
  86. // 只获取普通关注
  87. mFans, mSpe, e = s.dao.Fans(context.TODO(), task.TargetID, model.RelationAttention)
  88. case model.StrategySwitchSpecial:
  89. // 只获取特别关注(直播开关中的特别关注)
  90. mFans, mSpe, e = s.GetFansBySwitchAndSpecial(context.TODO(), task.TargetID)
  91. default:
  92. log.Error("[service.mids|GetMids] strategy invalid, type(%s), task(%+v)", tp, task)
  93. e = fmt.Errorf("[service.mids|GetMids] strategy invalid, type(%s), task(%+v)", tp, task)
  94. return e
  95. }
  96. if e != nil {
  97. log.Error("[service.mids|GetMids] get mid error(%v), type(%s), task(%+v)", e, tp, task)
  98. return e
  99. }
  100. // 来源之间求并集,并过滤重复出现的id
  101. // filter by black list
  102. mutex.Lock()
  103. for fansID := range mFans {
  104. if _, ok := midBlackList[fansID]; !ok {
  105. fans[fansID] = true
  106. }
  107. }
  108. for fansID := range mSpe {
  109. if _, ok := midBlackList[fansID]; !ok {
  110. fansSP[fansID] = true
  111. }
  112. }
  113. mutex.Unlock()
  114. log.Info("[service.mids|GetMids] get mids by type(%s), task(%+v), common(%d), special(%d)",
  115. tp, task, len(mFans), len(mSpe))
  116. return e
  117. })
  118. }
  119. group.Wait()
  120. if len(fansSP) > 0 {
  121. midMap[model.RelationSpecial] = s.midFilter(fansSP, model.StartLiveBusiness, task)
  122. }
  123. if len(fans) > 0 {
  124. midMap[model.RelationAttention] = s.midFilter(fans, model.StartLiveBusiness, task)
  125. }
  126. return midMap
  127. }
  128. // GetFansBySwitch 开播提醒,获取开关mids
  129. func (s *Service) GetFansBySwitch(c context.Context, targetID int64) (fans map[int64]bool, fansSP map[int64]bool, err error) {
  130. // 获取直播侧开关数据(可能包含普通关注与特别关注)
  131. m, err := s.dao.GetFansBySwitch(c, targetID)
  132. if err != nil {
  133. err = errors.WithStack(err)
  134. log.Error("[service.mids|GetMidsBySwitch] get switch mids error(%v), targetID(%v)", err, targetID)
  135. return
  136. }
  137. // 区分普通关注与特别关注
  138. fans, fansSP, err = s.dao.SeparateFans(c, targetID, m)
  139. return
  140. }
  141. // GetFansBySwitchAndSpecial 开播提醒,获取开关用户与特别关注用户的交集
  142. func (s *Service) GetFansBySwitchAndSpecial(c context.Context, targetID int64) (fans map[int64]bool, fansSP map[int64]bool, err error) {
  143. // 获取直播侧开关数据(可能包含普通关注与特别关注)
  144. m, err := s.dao.GetFansBySwitch(c, targetID)
  145. if err != nil {
  146. err = errors.WithStack(err)
  147. log.Error("[service.mids|GetMidsBySwitch] get switch mids error(%v), targetID(%v)", err, targetID)
  148. return
  149. }
  150. // 从开关数据中获取到特别关注的部分
  151. _, fansSP, err = s.dao.SeparateFans(c, targetID, m)
  152. return
  153. }
  154. // getSourceByTypes 根据不同的推送策略构造Task.MidSource字段
  155. func (s *Service) getSourceByTypes(types []string) int {
  156. var source, midSource int
  157. for _, t := range types {
  158. switch t {
  159. case model.StrategySwitch:
  160. source = model.TaskSourceSwitch
  161. case model.StrategySpecial:
  162. source = model.TaskSourceSpecial
  163. case model.StrategyFans:
  164. source = model.TaskSourceFans
  165. case model.StrategySwitchSpecial:
  166. source = model.TaskSourceSwitchSpe
  167. default:
  168. source = 0
  169. }
  170. midSource = midSource ^ source
  171. }
  172. return midSource
  173. }