archive.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/archive/model/archive"
  5. "go-common/app/job/main/archive/model/result"
  6. "go-common/app/job/main/archive/model/retry"
  7. arcmdl "go-common/app/service/main/archive/model/archive"
  8. "go-common/library/log"
  9. )
  10. func (s *Service) isPGC(aid int64) bool {
  11. if addit, _ := s.archiveDao.Addit(context.TODO(), aid); addit != nil && (addit.UpFrom == archive.UpFromPGC || addit.UpFrom == archive.UpFromPGCSecret) {
  12. return true
  13. }
  14. return false
  15. }
  16. func (s *Service) consumerVideoup(i int) {
  17. defer s.waiter.Done()
  18. for {
  19. var (
  20. aid int64
  21. ok bool
  22. )
  23. if aid, ok = <-s.videoupAids[i]; !ok {
  24. log.Error("s.videoupAids chan closed")
  25. return
  26. }
  27. arc, _ := s.arcServices[0].Archive3(context.TODO(), &arcmdl.ArgAid2{Aid: aid})
  28. if arc != nil && (arc.AttrVal(arcmdl.AttrBitIsPGC) == arcmdl.AttrYes || arc.AttrVal(arcmdl.AttrBitIsBangumi) == arcmdl.AttrYes) {
  29. if s.c.PGCAsync == 1 {
  30. rt := &retry.Info{Action: retry.FailResultAdd}
  31. rt.Data.Aid = aid
  32. s.PushFail(context.TODO(), rt)
  33. log.Warn("async PGC archive(%d)", aid)
  34. continue
  35. }
  36. s.pgcAids <- aid
  37. log.Info("aid(%d) title(%s) is PGC", aid, arc.Title)
  38. continue
  39. }
  40. if s.c.UGCAsync == 1 {
  41. rt := &retry.Info{Action: retry.FailResultAdd}
  42. rt.Data.Aid = aid
  43. s.PushFail(context.TODO(), rt)
  44. log.Warn("async UGC archive(%d)", aid)
  45. continue
  46. }
  47. s.arcUpdate(aid)
  48. }
  49. }
  50. func (s *Service) pgcConsumer() {
  51. defer s.waiter.Done()
  52. for {
  53. var (
  54. aid int64
  55. ok bool
  56. )
  57. if aid, ok = <-s.pgcAids; !ok {
  58. log.Error("s.pgcAids closed")
  59. return
  60. }
  61. s.arcUpdate(aid)
  62. }
  63. }
  64. func (s *Service) arcUpdate(aid int64) {
  65. var (
  66. oldResult *result.Archive
  67. newResult *result.Archive
  68. c = context.TODO()
  69. upCids []int64
  70. delCids []int64
  71. err error
  72. changed bool
  73. )
  74. log.Info("sync resultDB archive(%d) start", aid)
  75. defer func() {
  76. if err != nil {
  77. if oldResult != nil && (oldResult.AttrVal(result.AttrBitIsBangumi) == result.AttrYes || oldResult.AttrVal(result.AttrBitIsPGC) == result.AttrYes) {
  78. s.pgcAids <- aid
  79. } else {
  80. s.videoupAids[aid%int64(s.c.ChanSize)] <- aid
  81. }
  82. log.Error("s.arcUpdate(%d) error(%v)", aid, err)
  83. }
  84. }()
  85. if oldResult, err = s.resultDao.Archive(c, aid); err != nil {
  86. log.Error("s.resultDao.Archive(%d) error(%v)", aid, err)
  87. }
  88. if changed, upCids, delCids, err = s.tranResult(c, aid); err != nil || !changed {
  89. log.Error("aid(%d) nothing changed err(%+v)", aid, err)
  90. err = nil
  91. return
  92. }
  93. s.upVideoCache(aid, upCids)
  94. s.delVideoCache(aid, delCids)
  95. if newResult, err = s.resultDao.Archive(c, aid); err != nil {
  96. log.Error("s.resultDao.Archive(%d) error(%v)", aid, err)
  97. return
  98. }
  99. err = s.updateResultCache(newResult, oldResult)
  100. if oldResult != nil {
  101. s.updateResultField(newResult, oldResult)
  102. s.updateSubjectMid(newResult, oldResult)
  103. s.sendMail(newResult, oldResult)
  104. }
  105. action := "update"
  106. if oldResult == nil {
  107. action = "insert"
  108. }
  109. s.sendNotify(&result.ArchiveUpInfo{Table: "archive", Action: action, Nw: newResult, Old: oldResult})
  110. if oldResult != nil {
  111. log.Info("sync resultDB archive(%d) sync old(%+v) new(%+v) updated", aid, oldResult, newResult)
  112. return
  113. }
  114. log.Info("sync resultDB archive(%d) new(%+v) inserted", aid, newResult)
  115. }
  116. func (s *Service) hadPassed(c context.Context, aid int64) (had bool) {
  117. id, err := s.archiveDao.GetFirstPassByAID(c, aid)
  118. if err != nil {
  119. log.Error("hadPassed s.arc.GetFirstPassByAID error(%v) aid(%d)", err, aid)
  120. return
  121. }
  122. had = id > 0
  123. return
  124. }