task_assign.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package service
  2. import (
  3. "context"
  4. "math/rand"
  5. "time"
  6. "go-common/app/job/main/videoup-report/model/archive"
  7. "go-common/app/job/main/videoup-report/model/task"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. )
  11. // 设置指派任务
  12. func (s *Service) setTaskAssign(c context.Context, a *archive.Archive, v *archive.Video) (t *task.Task) {
  13. var (
  14. now = time.Now()
  15. arruid = []int64{}
  16. mapUID = make(map[int64]int64)
  17. )
  18. t = &task.Task{
  19. Pool: task.PoolForFirst,
  20. Aid: a.ID,
  21. Cid: v.Cid,
  22. Subject: task.SubjectForNormal,
  23. AdminID: int64(0),
  24. UID: int64(0),
  25. State: task.StateForTaskDefault,
  26. }
  27. for _, tc := range s.assignCache {
  28. log.Info("task doing(%v) aid(%d) type(%d) filename(%s) tc(%d)", t, a.ID, a.TypeID, v.Filename, tc.ID)
  29. if tc.STime.After(now) || tc.ETime.Before(now) {
  30. log.Error("task time is error stime(%v) etime(%v)", tc.STime, tc.ETime)
  31. continue
  32. }
  33. var midOk, tidOk, durationOk = true, true, true
  34. if len(tc.MIDs) > 0 {
  35. if _, midOk = tc.MIDs[a.Mid]; !midOk {
  36. log.Info("task mid(%d) wrong", a.Mid)
  37. }
  38. }
  39. if len(tc.TIDs) > 0 {
  40. if _, tidOk = tc.TIDs[a.TypeID]; !tidOk {
  41. log.Info("task type(%d) wrong", a.TypeID)
  42. }
  43. }
  44. if tc.MinDuration != tc.MaxDuration && (v.Duration < tc.MinDuration || v.Duration > tc.MaxDuration) {
  45. log.Error("task minDur(%d) maxDur(%d) wrong", tc.MinDuration, tc.MaxDuration)
  46. durationOk = false
  47. }
  48. if midOk && tidOk && durationOk {
  49. for _, uid := range tc.UIDs {
  50. if _, ok := mapUID[uid]; !ok {
  51. mapUID[uid] = tc.AdminID
  52. arruid = append(arruid, uid)
  53. }
  54. }
  55. }
  56. }
  57. if len(arruid) > 0 {
  58. uids, err := s.arc.ConsumerOnline(c, xstr.JoinInts(arruid))
  59. if err != nil || len(uids) == 0 {
  60. log.Warn("task s.arc.ConsumerOnline(%v) (%v) err(%v)", arruid, uids, err)
  61. return
  62. }
  63. if len(uids) == 1 {
  64. t.UID = uids[0]
  65. } else {
  66. inx := rand.Intn(len(uids) - 1)
  67. log.Info("task uids(%v) rand inx(%d)", uids, inx)
  68. t.UID = uids[inx]
  69. }
  70. }
  71. if t.UID != 0 {
  72. t.Subject = task.SubjectForTask
  73. t.AdminID = mapUID[t.UID] // 命中多个指派者配置,选择其中一个就行
  74. t.State = task.StateForTaskDefault
  75. }
  76. return
  77. }
  78. // 指派配置
  79. func (s *Service) assignConf(c context.Context) (tcs map[int64]*task.AssignConfig, err error) {
  80. var ids []int64
  81. if tcs, err = s.arc.AssignConfigs(context.TODO()); err != nil {
  82. log.Error("s.arc.AssignConfigs(%v) error(%v)", err)
  83. return
  84. }
  85. for k, v := range tcs {
  86. if !v.ETime.IsZero() && v.ETime.Before(time.Now()) {
  87. delete(tcs, k)
  88. ids = append(ids, k)
  89. }
  90. }
  91. if len(ids) > 0 {
  92. log.Info("task config(%v) 指派配置已过期,自动失效", ids)
  93. s.arc.DelAssignConfs(c, ids)
  94. }
  95. return
  96. }