history.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package history
  2. import (
  3. "context"
  4. hismodel "go-common/app/interface/main/history/model"
  5. "go-common/app/interface/main/tv/model/history"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup"
  8. )
  9. const (
  10. _videoPGC = "pgc"
  11. _videoUGC = "archive"
  12. _typePGC = 1
  13. _typeUGC = 2
  14. )
  15. // pick history from cursor and cache, then compare to tell whether we could use cache or not
  16. func (s *Service) cacheHis(c context.Context, mid int64) (resp *history.RespCacheHis, err error) {
  17. var (
  18. cfg = s.conf.Cfg.HisCfg
  19. hismc *history.HisMC
  20. )
  21. resp = &history.RespCacheHis{
  22. UseCache: true,
  23. }
  24. if resp.Res, err = s.dao.Cursor(c, mid, 0, cfg.Pagesize, 0, cfg.Businesses); err != nil {
  25. log.Error("history dao.Cursor Mid %d, Err %v", mid, err)
  26. return
  27. }
  28. if len(resp.Res) == 0 {
  29. log.Info("Mid %d, No history", mid)
  30. return
  31. }
  32. if hismc, err = s.dao.HisCache(c, mid); err != nil {
  33. log.Error("history dao.HisCache Mid %d, Err %v", mid, err)
  34. return
  35. }
  36. if hismc != nil { // if the first item in cache and from cursor is the same, return with cache
  37. if resp.Res[0].Unix == hismc.LastViewAt {
  38. resp.Filtered = hismc.Res
  39. return
  40. }
  41. }
  42. resp.UseCache = false
  43. return
  44. }
  45. func (s *Service) combineHis(c context.Context, req *history.ReqCombineHis) (filtered []*history.HisRes) {
  46. var (
  47. durs = make(map[int64]int64)
  48. pgcRes, ugcRes []*hismodel.Resource
  49. pgcMap, ugcMap map[int64]*history.HisRes
  50. )
  51. g, _ := errgroup.WithContext(c)
  52. for _, v := range req.OriRes { // combine pgc & ugc data
  53. if v.Business == _videoPGC { // combine pgc history data
  54. if _, ok := req.OkSids[v.Sid]; !ok {
  55. continue
  56. }
  57. pgcRes = append(pgcRes, v)
  58. } else if v.Business == _videoUGC { // combine ugc history data
  59. if _, ok := req.OkAids[v.Oid]; !ok {
  60. continue
  61. }
  62. ugcRes = append(ugcRes, v)
  63. } else {
  64. continue
  65. }
  66. }
  67. okRes := mergeRes(pgcRes, ugcRes)
  68. g.Go(func() (err error) { // get pgc info
  69. pgcMap, err = s.pgcHisRes(context.Background(), pgcRes)
  70. return
  71. })
  72. g.Go(func() (err error) { // get ugc info
  73. ugcMap, err = s.ugcHisRes(context.Background(), ugcRes)
  74. return
  75. })
  76. g.Go(func() (err error) { // get duration info
  77. durs = s.getDuration(context.Background(), okRes)
  78. return nil
  79. })
  80. if err := g.Wait(); err != nil { // wait history combine media info
  81. log.Error("getHistory For Mid %d, Err %v", req.Mid, err)
  82. }
  83. for _, v := range okRes {
  84. var resrc *history.HisRes
  85. if v.Business == _videoPGC {
  86. if res, ok := pgcMap[v.Sid]; ok {
  87. resrc = res
  88. }
  89. } else if v.Business == _videoUGC {
  90. if res, ok := ugcMap[v.Oid]; ok {
  91. resrc = res
  92. }
  93. }
  94. if resrc == nil {
  95. log.Error("okRes Business %s, CID %d, %d, Empty", v.Business, v.Sid, v.Oid)
  96. continue
  97. }
  98. if dur, ok := durs[v.Oid]; ok { // duration
  99. resrc.PageDuration = dur
  100. }
  101. filtered = append(filtered, resrc)
  102. }
  103. return
  104. }
  105. // GetHistory picks history from rpc and combine the media data from Cache & DB
  106. func (s *Service) GetHistory(c context.Context, mid int64) (filtered []*history.HisRes, err error) {
  107. var respCache *history.RespCacheHis
  108. if respCache, err = s.cacheHis(c, mid); err != nil {
  109. return
  110. }
  111. if respCache.UseCache {
  112. return respCache.Filtered, nil
  113. }
  114. okSids, okAids := s.filterIDs(c, mid, respCache.Res)
  115. filtered = s.combineHis(c, &history.ReqCombineHis{
  116. Mid: mid,
  117. OkAids: okAids,
  118. OkSids: okSids,
  119. OriRes: respCache.Res,
  120. })
  121. s.dao.SaveHisCache(c, filtered)
  122. log.Info("Mid %d, OriLen %d, Filtered %d", mid, len(respCache.Res), len(filtered))
  123. return
  124. }
  125. // filterIDs picks the original history resource, arrange them into pgc and ugc and then filter by DAO
  126. func (s *Service) filterIDs(ctx context.Context, mid int64, res []*hismodel.Resource) (okSids, okAids map[int64]int) {
  127. var ugcAIDs, pgcSIDs []int64
  128. for _, v := range res { // we pick only pgc & archive from History rpc
  129. if v.Business == _videoPGC {
  130. pgcSIDs = append(pgcSIDs, v.Sid)
  131. } else if v.Business == _videoUGC {
  132. ugcAIDs = append(ugcAIDs, v.Oid)
  133. }
  134. }
  135. okSids, okAids = s.cmsDao.MixedFilter(ctx, pgcSIDs, ugcAIDs) // we filter the okSids and okAids
  136. log.Info("Mid %d, okSids %v, okAids %v", mid, okSids, okAids)
  137. return
  138. }
  139. // mergeRes merges two slices and return a new slice
  140. func mergeRes(s1 []*hismodel.Resource, s2 []*hismodel.Resource) []*hismodel.Resource {
  141. slice := make([]*hismodel.Resource, len(s1)+len(s2))
  142. copy(slice, s1)
  143. copy(slice[len(s1):], s2)
  144. return slice
  145. }