sync_mc.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package pgc
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. appDao "go-common/app/job/main/tv/dao/app"
  9. model "go-common/app/job/main/tv/model/pgc"
  10. "go-common/library/ecode"
  11. "go-common/library/log"
  12. timex "go-common/library/time"
  13. )
  14. type cntFunc func(ctx context.Context) (count int, err error)
  15. type refreshFunc func(ctx context.Context, LastID int, nbData int) (myLast int, err error)
  16. type reqCachePro struct {
  17. cnt cntFunc
  18. proName string
  19. refresh refreshFunc
  20. ps int
  21. }
  22. func (s *Service) cacheProducer(ctx context.Context, req *reqCachePro) (err error) {
  23. var (
  24. count int
  25. pagesize = req.ps
  26. maxID = 0 // the max ID of the latest piece
  27. begin = time.Now()
  28. )
  29. if count, err = req.cnt(ctx); err != nil {
  30. log.Error("[%s] CountEP error [%v]", req.proName, err)
  31. return
  32. }
  33. nbPiece := appDao.NumPce(count, pagesize)
  34. log.Info("[%s] NumPiece %d, Pagesize %d", req.proName, nbPiece, pagesize)
  35. for i := 0; i < nbPiece; i++ {
  36. newMaxID, errR := req.refresh(ctx, maxID, pagesize)
  37. if errR != nil {
  38. log.Error("[%s] Pick Piece %d Error, Ignore it", req.proName, i)
  39. continue
  40. }
  41. if newMaxID > maxID {
  42. maxID = newMaxID
  43. } else { // fatal error
  44. log.Error("[%s] MaxID is not increasing! [%d,%d]", req.proName, newMaxID, maxID)
  45. return
  46. }
  47. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ProducerFre)) // pause after each piece produced
  48. log.Info("[%s] Pagesize %d, Num of piece %d, Time Already %v", req.proName, pagesize, i, time.Since(begin))
  49. }
  50. log.Info("[%s] Finish! Pagesize %d, Num of piece %d, Time %v", req.proName, pagesize, nbPiece, time.Since(begin))
  51. return
  52. }
  53. // refreshCache refreshes the cache of ugc and pgc
  54. func (s *Service) refreshCache() {
  55. var (
  56. ctx = context.Background()
  57. begin = time.Now()
  58. pgcPS = s.c.PlayControl.PieceSize
  59. reqEp = &reqCachePro{
  60. cnt: s.dao.CountEP,
  61. proName: "epProducer",
  62. refresh: s.dao.RefreshEPMC,
  63. ps: pgcPS,
  64. }
  65. reqSn = &reqCachePro{
  66. cnt: s.dao.CountSeason,
  67. proName: "snProducer",
  68. refresh: s.dao.RefreshSnMC,
  69. ps: pgcPS,
  70. }
  71. )
  72. if err := s.cacheProducer(ctx, reqEp); err != nil {
  73. log.Error("reqEp Err %v", err)
  74. return
  75. }
  76. if err := s.cacheProducer(ctx, reqSn); err != nil {
  77. log.Error("reqSn Err %v", err)
  78. }
  79. log.Info("refreshCache Finish, Time %v", time.Since(begin))
  80. }
  81. // stock EP&Season auth info and intervention info in MC
  82. func (s *Service) stockContent(jsonstr json.RawMessage, tableName string) (err error) {
  83. // season stock in MC
  84. if tableName == "tv_ep_season" {
  85. sn := &model.DatabusSeason{}
  86. if err = json.Unmarshal(jsonstr, sn); err != nil {
  87. log.Error("json.Unmarshal(%s) error(%v)", jsonstr, err)
  88. return
  89. }
  90. if reflect.DeepEqual(sn.Old, sn.New) { // if media fields not modified, no need to update
  91. log.Info("SeasonID %d No need to update", sn.New.ID)
  92. return
  93. }
  94. return s.stockSeason(sn)
  95. // ep stock in MC
  96. } else if tableName == "tv_content" {
  97. ep := &model.DatabusEP{}
  98. if err = json.Unmarshal(jsonstr, ep); err != nil {
  99. log.Error("json.Unmarshal(%s) error(%v)", jsonstr, err)
  100. return
  101. }
  102. if reflect.DeepEqual(ep.Old, ep.New) { // if media fields not modified, no need to update
  103. log.Info("Epid %d No need to update", ep.New.EPID)
  104. return
  105. }
  106. return s.stockEP(ep)
  107. } else {
  108. return fmt.Errorf("Databus Msg (%s) - Incorrect Table (%s) ", jsonstr, tableName)
  109. }
  110. }
  111. func (s *Service) composeSnCMS(sn *model.MediaSn) *model.SeasonCMS {
  112. var (
  113. epid, order int
  114. err error
  115. playtime int64
  116. )
  117. if epid, order, err = s.dao.NewestOrder(ctx, sn.ID); err != nil {
  118. log.Warn("stockSeason NewestOrder Sid: %d, Err %v", sn.ID, err)
  119. }
  120. if playtime, err = appDao.TimeTrans(sn.Playtime); err != nil {
  121. log.Warn("stockSeason Playtime Sid: %d, Err %v", sn.ID, err)
  122. }
  123. return &model.SeasonCMS{
  124. SeasonID: int(sn.ID),
  125. Cover: sn.Cover,
  126. Desc: sn.Desc,
  127. Title: sn.Title,
  128. UpInfo: sn.UpInfo,
  129. Category: sn.Category,
  130. Area: sn.Area,
  131. Playtime: timex.Time(playtime),
  132. Role: sn.Role,
  133. Staff: sn.Staff,
  134. TotalNum: sn.TotalNum,
  135. Style: sn.Style,
  136. NewestOrder: order,
  137. NewestEPID: epid,
  138. PayStatus: sn.Status, // databus sn logic
  139. }
  140. }
  141. // treat the databus season msg, stock the auth & media info in MC
  142. func (s *Service) stockSeason(sn *model.DatabusSeason) (err error) {
  143. var (
  144. snSub *model.TVEpSeason
  145. snAuth = sn.New.ToSimple() // auth info in MC
  146. snMedia = s.composeSnCMS(sn.New) // media info in MC
  147. )
  148. s.batchFilter(ctx, []*model.SeasonCMS{snMedia}) // treat the newest NB logic
  149. if sn.New.Check == _seasonPassed && sn.Old.Check == _seasonPassed { // keep already passed logic
  150. if snSub, err = s.dao.Season(ctx, int(sn.New.ID)); err != nil {
  151. return
  152. }
  153. s.addRetrySn(snSub)
  154. }
  155. if err = s.dao.SetSeason(ctx, snAuth); err != nil { // auth
  156. log.Error("SetSeason error(%v)", snAuth, err)
  157. return
  158. }
  159. if err = s.dao.SetSnCMSCache(ctx, snMedia); err != nil { // media
  160. log.Error("SetSnCMSCache error(%v)", snMedia, err)
  161. return
  162. }
  163. if err = s.listMtn(sn.Old, sn.New); err != nil { // maintenance of the zone list in Redis
  164. log.Error("stockContent listMtn error(%v)", sn.New, err)
  165. }
  166. return
  167. }
  168. // treat the databus ep msg, stock the auth & media info in MC
  169. func (s *Service) stockEP(ep *model.DatabusEP) (err error) {
  170. var (
  171. epAuth = ep.New.ToSimple()
  172. epMedia = ep.New.ToCMS()
  173. epSub *model.Content
  174. )
  175. if ep.New.State == _epPassed && ep.Old.State == _epPassed { // keep already passed logic
  176. if epSub, err = s.dao.Cont(ctx, ep.New.EPID); err != nil {
  177. return
  178. }
  179. s.addRetryEp(epSub)
  180. }
  181. if err = s.dao.SetEP(ctx, epAuth); err != nil { // set ep auth MC
  182. return
  183. }
  184. if err = s.dao.SetEpCMSCache(ctx, epMedia); err != nil { // set ep media MC
  185. return
  186. }
  187. err = s.updateSnCMS(epAuth.SeasonID)
  188. return
  189. }
  190. // updateSnCMS picks the season info from DB and update the CMS cache
  191. func (s *Service) updateSnCMS(sid int) (err error) {
  192. var snMedia *model.SeasonCMS
  193. if snMedia, err = s.dao.PickSeason(ctx, sid); err != nil { // pick season cms info
  194. log.Error("stockEP PickSeason Sid: %d, Err: %v", sid, err)
  195. return
  196. }
  197. if snMedia == nil { // season info not found
  198. err = ecode.NothingFound
  199. log.Error("stockEP PickSeason Sid: %d, Err: %v", sid, err)
  200. return
  201. }
  202. s.batchFilter(ctx, []*model.SeasonCMS{snMedia})
  203. if err = s.dao.SetSnCMSCache(ctx, snMedia); err != nil { // ep update, we also consider to update its season info for the "latest" info
  204. log.Error("SetSnCMSCache error(%v)", snMedia, err)
  205. }
  206. return
  207. }
  208. // consume Databus message; because daily modification is not many, so use simple loop
  209. func (s *Service) consumeContent() {
  210. defer s.waiterConsumer.Done()
  211. for {
  212. msg, ok := <-s.contentSub.Messages()
  213. if !ok {
  214. log.Info("databus: tv-job ep/season consumer exit!")
  215. return
  216. }
  217. msg.Commit()
  218. s.treatMsg(msg.Value)
  219. time.Sleep(1 * time.Millisecond)
  220. }
  221. }
  222. func (s *Service) treatMsg(msg json.RawMessage) {
  223. m := &model.DatabusRes{}
  224. log.Info("[ConsumeContent] New Message: %s", msg)
  225. if err := json.Unmarshal(msg, m); err != nil {
  226. log.Error("json.Unmarshal(%s) error(%v)", msg, err)
  227. return
  228. }
  229. if m.Action == "delete" {
  230. log.Info("[ConsumeContent] Content Deletion, We ignore:<%v>,<%v>", m, msg)
  231. return
  232. }
  233. if err := s.stockContent(msg, m.Table); err != nil {
  234. log.Error("stockContent.(%s,%s), error(%v)", msg, m.Table, err)
  235. return
  236. }
  237. }