dm.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/dm/model"
  5. "go-common/library/ecode"
  6. "go-common/library/log"
  7. )
  8. // flushTrimQueue 将数据库数据填充到redis中
  9. func (s *Service) flushTrimQueue(c context.Context, tp int32, oid int64) (err error) {
  10. var (
  11. dms []*model.DM
  12. trims []*model.Trim
  13. )
  14. if dms, err = s.dao.DMInfos(c, tp, oid); err != nil {
  15. return
  16. }
  17. for _, dm := range dms {
  18. // NOTE 只有普通弹幕会被顶掉
  19. if dm.Pool == model.PoolNormal && (dm.State == model.StateNormal || dm.State == model.StateMonitorAfter) {
  20. trim := &model.Trim{ID: dm.ID, Attr: dm.AttrVal(model.AttrProtect)}
  21. trims = append(trims, trim)
  22. }
  23. }
  24. return s.dao.FlushTrimCache(c, tp, oid, trims)
  25. }
  26. // addTrimQueue add dm index redis trim queue and return segment need flush.
  27. func (s *Service) addTrimQueue(c context.Context, tp int32, oid, maxlimit int64, dms ...*model.DM) (err error) {
  28. var (
  29. ok bool
  30. trimCnt, count int64
  31. trims []*model.Trim
  32. dmids []int64
  33. )
  34. for _, dm := range dms {
  35. // NOTE 只有普通弹幕并且弹幕状态处于正常或者先发后审状态的弹幕会被放入顶队列
  36. if dm.Pool == model.PoolNormal && dm.NeedDisplay() {
  37. trim := &model.Trim{ID: dm.ID, Attr: dm.AttrVal(model.AttrProtect)}
  38. trims = append(trims, trim)
  39. }
  40. }
  41. if len(trims) == 0 {
  42. return
  43. }
  44. if ok, err = s.dao.ExpireTrimQueue(c, tp, oid); err != nil {
  45. return
  46. }
  47. if !ok {
  48. if err = s.flushTrimQueue(c, tp, oid); err != nil {
  49. return
  50. }
  51. }
  52. if count, err = s.dao.AddTrimQueueCache(c, tp, oid, trims); err != nil {
  53. return
  54. }
  55. // NOTE 对于满弹幕的视频,始终保持两倍的候选弹幕集
  56. if trimCnt = count - 2*maxlimit; trimCnt > 0 {
  57. if dmids, err = s.dao.TrimCache(c, tp, oid, trimCnt); err != nil || len(trims) == 0 {
  58. return
  59. }
  60. if len(dmids) == 0 {
  61. return
  62. }
  63. if _, err = s.dao.UpdateDMStates(c, oid, dmids, model.StateHide); err != nil {
  64. return
  65. }
  66. if err = s.dao.DelIdxContentCaches(c, tp, oid, dmids...); err != nil {
  67. return
  68. }
  69. log.Info("oid:%d,trimCnt:%d,trims:%v", oid, len(dmids), dmids)
  70. }
  71. return
  72. }
  73. // recoverDM delete a dm and recover a hide state dm from db.
  74. func (s *Service) recoverDM(c context.Context, typ int32, oid, rcvCnt int64) (dms []*model.DM, err error) {
  75. if dms, err = s.dao.DMHides(c, typ, oid, rcvCnt); err != nil {
  76. return
  77. }
  78. if len(dms) > 0 {
  79. var dmids []int64
  80. for _, dm := range dms {
  81. dmids = append(dmids, dm.ID)
  82. dm.State = model.StateNormal
  83. }
  84. if _, err = s.dao.UpdateDMStates(c, oid, dmids, model.StateNormal); err != nil {
  85. return
  86. }
  87. log.Info("recoverDM oid:%d dmids:%v", oid, dmids)
  88. }
  89. return
  90. }
  91. func (s *Service) subject(c context.Context, tp int32, oid int64) (sub *model.Subject, err error) {
  92. var cache = true
  93. if sub, err = s.dao.SubjectCache(c, tp, oid); err != nil {
  94. err = nil
  95. cache = false
  96. }
  97. if sub == nil {
  98. if sub, err = s.dao.Subject(c, tp, oid); err != nil {
  99. return
  100. }
  101. if sub == nil {
  102. sub = &model.Subject{
  103. Type: tp,
  104. Oid: oid,
  105. }
  106. }
  107. if cache {
  108. s.cache.Do(c, func(ctx context.Context) {
  109. s.dao.SetSubjectCache(ctx, sub)
  110. })
  111. }
  112. }
  113. if sub.ID == 0 {
  114. err = ecode.NothingFound
  115. return
  116. }
  117. return
  118. }