service.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/job/main/videoup-report/conf"
  7. arcdao "go-common/app/job/main/videoup-report/dao/archive"
  8. "go-common/app/job/main/videoup-report/dao/data"
  9. "go-common/app/job/main/videoup-report/dao/email"
  10. hbasedao "go-common/app/job/main/videoup-report/dao/hbase"
  11. "go-common/app/job/main/videoup-report/dao/manager"
  12. "go-common/app/job/main/videoup-report/dao/mission"
  13. redisdao "go-common/app/job/main/videoup-report/dao/redis"
  14. "go-common/app/job/main/videoup-report/dao/tag"
  15. arcmdl "go-common/app/job/main/videoup-report/model/archive"
  16. taskmdl "go-common/app/job/main/videoup-report/model/task"
  17. account "go-common/app/service/main/account/api"
  18. arcrpc "go-common/app/service/main/archive/api/gorpc"
  19. upsrpc "go-common/app/service/main/up/api/v1"
  20. "go-common/library/log"
  21. "go-common/library/queue/databus"
  22. "strings"
  23. )
  24. const (
  25. _archiveTable = "archive"
  26. _videoTable = "archive_video"
  27. _upsTable = "ups"
  28. _jumpChanSize = int(4000)
  29. _logChanSize = int(200000)
  30. )
  31. // Service is service.
  32. type Service struct {
  33. c *conf.Config
  34. arc *arcdao.Dao
  35. redis *redisdao.Dao
  36. hbase *hbasedao.Dao
  37. dataDao *data.Dao
  38. email *email.Dao
  39. mng *manager.Dao
  40. arcUpChs []chan *arcmdl.UpInfo
  41. videoUpInfoChs []chan *arcmdl.VideoUpInfo
  42. // waiter
  43. waiter sync.WaitGroup
  44. // databus
  45. archiveSub *databus.Databus
  46. arcResultSub *databus.Databus
  47. videoupSub *databus.Databus
  48. ManagerDBSub *databus.Databus
  49. // cache
  50. sfTpsCache map[int16]*arcmdl.Type
  51. adtTpsCache map[int16]struct{}
  52. taskCache *arcmdl.TaskCache
  53. videoAuditCache *arcmdl.VideoAuditCache
  54. arcMoveTypeCache *arcmdl.ArcMoveTypeCache
  55. arcRoundFlowCache *arcmdl.ArcRoundFlowCache
  56. xcodeTimeCache *arcmdl.XcodeTimeCache
  57. assignCache map[int64]*taskmdl.AssignConfig
  58. upperCache map[int8]map[int64]struct{}
  59. weightCache map[int8]map[int64]*taskmdl.ConfigItem
  60. missTagsCache map[string]int
  61. lastjumpMap map[int64]struct{} //上轮插队的这一轮也更新,否则会出现权重只增不减
  62. jumplist *taskmdl.JumpList //插队序列
  63. jumpchan chan *taskmdl.WeightLog
  64. tasklogchan chan *taskmdl.WeightLog
  65. //rpc
  66. arcRPCGroup2 *arcrpc.Service2
  67. //grpc
  68. accRPC account.AccountClient
  69. upsRPC upsrpc.UpClient
  70. // closed
  71. closed bool
  72. tagDao *tag.Dao
  73. missionDao *mission.Dao
  74. }
  75. // New is videoup-report-job service implementation.
  76. func New(c *conf.Config) (s *Service) {
  77. s = &Service{
  78. c: c,
  79. //dao
  80. arc: arcdao.New(c),
  81. redis: redisdao.New(c),
  82. hbase: hbasedao.New(c),
  83. //databus
  84. archiveSub: databus.New(c.ArchiveSub),
  85. arcResultSub: databus.New(c.ArchiveResultSub),
  86. videoupSub: databus.New(c.VideoupSub),
  87. ManagerDBSub: databus.New(c.ManagerDBSub),
  88. dataDao: data.New(c),
  89. email: email.New(c),
  90. mng: manager.New(c),
  91. // cache
  92. taskCache: &arcmdl.TaskCache{
  93. Task: make(map[int64]*arcmdl.Task),
  94. },
  95. videoAuditCache: &arcmdl.VideoAuditCache{
  96. Data: make(map[int16]map[string]int),
  97. },
  98. arcMoveTypeCache: &arcmdl.ArcMoveTypeCache{
  99. Data: make(map[int8]map[int16]map[string]int),
  100. },
  101. arcRoundFlowCache: &arcmdl.ArcRoundFlowCache{
  102. Data: make(map[int8]map[int64]map[string]int),
  103. },
  104. xcodeTimeCache: &arcmdl.XcodeTimeCache{
  105. Data: make(map[int8][]int),
  106. },
  107. arcRPCGroup2: arcrpc.New2(c.ArchiveRPCGroup2),
  108. lastjumpMap: make(map[int64]struct{}),
  109. jumplist: taskmdl.NewJumpList(),
  110. jumpchan: make(chan *taskmdl.WeightLog, _jumpChanSize),
  111. tasklogchan: make(chan *taskmdl.WeightLog, _logChanSize),
  112. tagDao: tag.New(c),
  113. missionDao: mission.New(c),
  114. }
  115. var err error
  116. if s.accRPC, err = account.NewClient(conf.Conf.GRPC.AccRPC); err != nil {
  117. panic(err)
  118. }
  119. if s.upsRPC, err = upsrpc.NewClient(conf.Conf.GRPC.UpsRPC); err != nil {
  120. panic(err)
  121. }
  122. for i := 0; i < s.c.ChanSize; i++ {
  123. log.Info("videoup-report-job chanSize starting(%d)", i)
  124. s.arcUpChs = append(s.arcUpChs, make(chan *arcmdl.UpInfo, 10240))
  125. s.waiter.Add(1)
  126. go s.arcUpdateproc(i)
  127. s.videoUpInfoChs = append(s.videoUpInfoChs, make(chan *arcmdl.VideoUpInfo, 10240))
  128. s.waiter.Add(1)
  129. go s.upVideoproc(i)
  130. }
  131. s.loadConf()
  132. s.loadType()
  133. // load cache.
  134. s.loadTask()
  135. s.loadTaskTookSort()
  136. s.hdlTraffic()
  137. s.loadMission()
  138. go s.cacheproc()
  139. go s.monitorNotifyProc()
  140. s.waiter.Add(1)
  141. go s.hotarchiveproc()
  142. s.waiter.Add(1)
  143. go s.arcCanalConsume()
  144. s.waiter.Add(1)
  145. go s.arcResultConsume()
  146. s.waiter.Add(1)
  147. go s.taskWeightConsumer()
  148. s.waiter.Add(1)
  149. go s.taskweightproc()
  150. s.waiter.Add(1)
  151. go s.movetaskproc()
  152. go s.deltaskproc()
  153. s.waiter.Add(1)
  154. go s.videoupConsumer()
  155. s.waiter.Add(1)
  156. go s.emailProc()
  157. s.waiter.Add(1)
  158. go s.emailFastProc()
  159. s.waiter.Add(1)
  160. go s.retryProc()
  161. s.waiter.Add(1)
  162. go s.managerDBConsume()
  163. return s
  164. }
  165. func (s *Service) loadType() {
  166. tpm, err := s.arc.TypeMapping(context.TODO())
  167. if err != nil {
  168. log.Error("s.dede.TypeMapping error(%v)", err)
  169. return
  170. }
  171. s.sfTpsCache = tpm
  172. // audit types
  173. adt, err := s.arc.AuditTypesConf(context.TODO())
  174. if err != nil {
  175. log.Error("s.dede.AuditTypesConf error(%v)", err)
  176. return
  177. }
  178. s.adtTpsCache = adt
  179. wvc, err := s.arc.WeightValueConf(context.TODO())
  180. if err != nil {
  181. log.Error("s.arc.WeightValueConf error(%v)", err)
  182. return
  183. }
  184. taskmdl.WLVConf = wvc
  185. }
  186. func (s *Service) isAuditType(tpID int16) bool {
  187. _, isAt := s.adtTpsCache[tpID]
  188. return isAt
  189. }
  190. func (s *Service) topType(tpID int16) (id int16) {
  191. if tp, ok := s.sfTpsCache[tpID]; ok && tp != nil {
  192. id = tp.PID
  193. }
  194. return
  195. }
  196. func (s *Service) typeName(tpID int16) (name string) {
  197. if tp, ok := s.sfTpsCache[tpID]; ok && tp != nil {
  198. name = tp.Name
  199. }
  200. return
  201. }
  202. func (s *Service) topTypeName(tpID int16) (name string) {
  203. pid := s.topType(tpID)
  204. name = s.typeName(pid)
  205. return
  206. }
  207. func (s *Service) cacheproc() {
  208. for {
  209. time.Sleep(1 * time.Minute)
  210. // config
  211. s.loadConf()
  212. // task
  213. s.loadTask()
  214. s.loadTaskTookSort()
  215. // handle task took
  216. s.hdlTaskTook()
  217. s.hdlTaskTookByHourHalf()
  218. // handle video audit
  219. s.hdlVideoAuditCount()
  220. s.hdlMoveTypeCount()
  221. s.hdlRoundFlowCount()
  222. //handle calculate video xcode time stats, and save to DB
  223. s.hdlXcodeStats()
  224. s.hdlTraffic()
  225. s.loadMission()
  226. }
  227. }
  228. func (s *Service) monitorNotifyProc() {
  229. for {
  230. s.monitorNotify()
  231. time.Sleep(30 * time.Minute)
  232. }
  233. }
  234. // s.missTagsCache: missionName or first tag
  235. func (s *Service) loadMission() {
  236. mm, err := s.missionDao.Missions(context.TODO())
  237. if err != nil {
  238. log.Error("s.missionDao.Mission error(%v)", err)
  239. return
  240. }
  241. s.missTagsCache = make(map[string]int)
  242. for _, m := range mm {
  243. if len(m.Tags) > 0 {
  244. splitedTags := strings.Split(m.Tags, ",")
  245. s.missTagsCache[splitedTags[0]] = m.ID
  246. } else {
  247. s.missTagsCache[m.Name] = m.ID
  248. }
  249. }
  250. }
  251. // hotarchiveproc get hot archive which need to recheck
  252. func (s *Service) hotarchiveproc() {
  253. defer s.waiter.Done()
  254. for {
  255. if s.closed {
  256. return
  257. }
  258. s.addHotRecheck()
  259. time.Sleep(10 * time.Minute)
  260. }
  261. }
  262. // Close consumer close.
  263. func (s *Service) Close() {
  264. s.closed = true
  265. s.archiveSub.Close()
  266. s.arcResultSub.Close()
  267. s.videoupSub.Close()
  268. time.Sleep(2 * time.Second)
  269. for i := 0; i < s.c.ChanSize; i++ {
  270. log.Info("videoup-report-job chanSize closing(%d)", i)
  271. close(s.arcUpChs[i])
  272. close(s.videoUpInfoChs[i])
  273. }
  274. s.arc.Close()
  275. s.mng.Close()
  276. s.redis.Close()
  277. s.email.Close()
  278. s.hbase.Close()
  279. s.waiter.Wait()
  280. }
  281. // Ping check server ok.
  282. func (s *Service) Ping(c context.Context) (err error) {
  283. if err = s.arc.Ping(c); err != nil {
  284. return
  285. }
  286. if err = s.mng.Ping(c); err != nil {
  287. return
  288. }
  289. return
  290. }
  291. func (s *Service) loadConf() {
  292. var (
  293. err error
  294. assignConf map[int64]*taskmdl.AssignConfig
  295. weightConf map[int8]map[int64]*taskmdl.ConfigItem
  296. upperCache map[int8]map[int64]struct{}
  297. )
  298. if assignConf, err = s.assignConf(context.TODO()); err != nil {
  299. log.Error("s.assignConf error(%v)", err)
  300. return
  301. }
  302. s.assignCache = assignConf
  303. upperCache, err = s.upSpecial(context.TODO())
  304. if err != nil {
  305. log.Error("s.upSpecial error(%v)", err)
  306. } else {
  307. s.upperCache = upperCache
  308. }
  309. if weightConf, err = s.weightConf(context.TODO()); err != nil {
  310. log.Error(" s.weightConf error(%v)", err)
  311. return
  312. }
  313. s.weightCache = weightConf
  314. }