task_dispatch.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/videoup-report/model/archive"
  8. tmod "go-common/app/job/main/videoup-report/model/task"
  9. "go-common/app/job/main/videoup-report/model/utils"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. )
  13. func (s *Service) hdlVideoTask(c context.Context, fn string) (err error) {
  14. var (
  15. v *archive.Video
  16. a *archive.Archive
  17. state int8
  18. dID int64
  19. )
  20. if v, a, err = s.archiveVideo(c, fn); err != nil {
  21. log.Error("s.archiveVideo(%s) error(%v)", fn, err)
  22. return
  23. }
  24. if a.State == archive.StateForbidUpDelete {
  25. log.Info("task archive(%d) deleted", a.ID)
  26. return
  27. }
  28. if v.Status != archive.VideoStatusWait {
  29. log.Info("task archive(%d) filename(%s) already status(%d)", a.ID, v.Filename, v.Status)
  30. return
  31. }
  32. if dID, state, err = s.arc.DispatchState(c, v.Aid, v.Cid); err != nil {
  33. log.Error("task s.arc.DispatchState(%d,%d) error(%v)", v.Aid, v.Cid, err)
  34. return
  35. }
  36. if dID != 0 && state <= tmod.StateForTaskWork {
  37. log.Info("task aid(%d) cid(%d) filename(%s) already in dispatch state(%d)", v.Aid, v.Cid, v.Filename, state)
  38. return
  39. }
  40. log.Info("archive(%d) filename(%s) video(%d) tranVideoTask begin", a.ID, v.Filename, v.Cid)
  41. if err = s.addVideoTask(c, a, v); err != nil {
  42. log.Error("task s.addVideoTask error(%v)", err)
  43. return
  44. }
  45. return
  46. }
  47. func (s *Service) archiveVideo(c context.Context, filename string) (v *archive.Video, a *archive.Archive, err error) {
  48. if v, err = s.arc.NewVideo(c, filename); err != nil {
  49. log.Error("s.arc.NewVideo(%s) error(%v)", filename, err)
  50. return
  51. }
  52. if v == nil {
  53. log.Error("s.arc.NewVideo(%s) video is nil", filename)
  54. err = fmt.Errorf("video(%s) is not exists", filename)
  55. return
  56. }
  57. if a, err = s.arc.ArchiveByAid(c, v.Aid); err != nil {
  58. log.Error("s.arc.ArchiveByAid(%d) filename(%s) error(%v)", v.Aid, filename, err)
  59. return
  60. }
  61. return
  62. }
  63. func (s *Service) addVideoTask(c context.Context, a *archive.Archive, v *archive.Video) (err error) {
  64. var (
  65. task *tmod.Task
  66. lastID, fans int64
  67. cfitems []*tmod.ConfigItem
  68. descb []byte
  69. accfailed bool
  70. )
  71. task = s.setTaskAssign(c, a, v)
  72. fans, accfailed = s.setTaskUPSpecial(c, task, a.Mid)
  73. s.setTaskTimed(c, task)
  74. cfitems = s.getConfWeight(c, task, a)
  75. if lastID, err = s.arc.AddDispatch(c, task); err != nil {
  76. log.Error("s.arc.AddDispatch error(%v)", err)
  77. return
  78. }
  79. // 允许日志记录错误
  80. if _, err = s.arc.AddTaskHis(c, tmod.PoolForFirst, 6, lastID, v.Cid, task.UID, v.Status, "videoup-job"); err != nil {
  81. log.Error("s.arc.AddTaskHis error(%v)", err)
  82. }
  83. log.Info("archive(%d) filename(%s) taskUid(%d)", a.ID, v.Filename, task.UID)
  84. // 保存权重配置信息,错误不影响正常流程
  85. tp := &tmod.WeightParams{
  86. TaskID: lastID,
  87. Mid: a.Mid,
  88. Special: task.UPSpecial,
  89. Ctime: utils.NewFormatTime(time.Now()),
  90. Ptime: task.Ptime,
  91. CfItems: cfitems,
  92. Fans: fans,
  93. AccFailed: accfailed,
  94. TypeID: a.TypeID,
  95. }
  96. s.setTaskUpFrom(c, a.ID, tp)
  97. s.setTaskUpGroup(c, a.Mid, tp)
  98. s.redis.SetWeight(c, map[int64]*tmod.WeightParams{lastID: tp})
  99. if len(cfitems) > 0 {
  100. if descb, err = json.Marshal(cfitems); err != nil {
  101. log.Error("json.Marshal error(%v)", err)
  102. } else {
  103. if _, err = s.arc.InDispatchExtend(c, lastID, string(descb)); err != nil {
  104. log.Error("s.task.InDispatchExtend(%d) error(%v)", lastID, err)
  105. }
  106. }
  107. err = nil
  108. }
  109. return
  110. }
  111. func (s *Service) moveDispatch() (err error) {
  112. var (
  113. tx *sql.Tx
  114. c = context.TODO()
  115. mtime = time.Now().Add(-24 * time.Hour)
  116. startTime = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 0, 0, 0, mtime.Location())
  117. endTime = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 59, 59, 0, mtime.Location())
  118. dispatchRows int64
  119. dispatchDoneRows int64
  120. )
  121. if tx, err = s.arc.BeginTran(c); err != nil {
  122. log.Error("s.arc.BeginTran error(%v)")
  123. return
  124. }
  125. if dispatchRows, err = s.arc.TxAddDispatchDone(c, tx, startTime, endTime); err != nil {
  126. tx.Rollback()
  127. log.Error("s.arc.TxAddDispatchDone(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  128. return
  129. }
  130. if dispatchDoneRows, err = s.arc.TxDelDispatchByTime(c, tx, startTime, endTime); err != nil {
  131. tx.Rollback()
  132. log.Error("s.arc.TxDelDispatchByTime(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  133. return
  134. }
  135. if dispatchRows != dispatchDoneRows {
  136. // no way here !
  137. tx.Rollback()
  138. log.Error("moveDispatch error dispatchRows(%d) dispatchDoneRows(%d)", dispatchRows, dispatchDoneRows)
  139. return
  140. }
  141. if err = tx.Commit(); err != nil {
  142. log.Error("tx.Commit error(%v)")
  143. return
  144. }
  145. log.Info("moveDispatch mtime(%s) to mtime(%s) rows(%d)", startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), dispatchDoneRows)
  146. return
  147. }
  148. func (s *Service) moveTaskOperHis(limit int64) (moved int64, err error) {
  149. var (
  150. tx *sql.Tx
  151. c = context.TODO()
  152. mtime = time.Now().Add(-2 * 30 * 24 * time.Hour)
  153. before = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 0, 0, 0, mtime.Location())
  154. movedRows int64
  155. delRows int64
  156. )
  157. if tx, err = s.arc.BeginTran(c); err != nil {
  158. log.Error("s.arc.BeginTran error(%v)")
  159. return
  160. }
  161. if movedRows, err = s.arc.TxMoveTaskOperDone(tx, before, limit); err != nil {
  162. tx.Rollback()
  163. log.Error("s.arc.TxMoveTaskOperDone(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  164. return
  165. }
  166. if delRows, err = s.arc.TxDelTaskOper(tx, before, limit); err != nil {
  167. tx.Rollback()
  168. log.Error("s.arc.TxDelTaskOper(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  169. return
  170. }
  171. if movedRows != delRows {
  172. tx.Rollback()
  173. log.Error("moveOperHistory error mvRows(%d) delRows(%d)", movedRows, delRows)
  174. return
  175. }
  176. if err = tx.Commit(); err != nil {
  177. log.Error("tx.Commit error(%v)")
  178. return
  179. }
  180. log.Info("moveTaskOperHistory before mtime(%s) rows(%d)", before.Format("2006-01-02 15:04:05"), movedRows)
  181. return movedRows, nil
  182. }
  183. func (s *Service) delTaskDispatchDone(limit int64) (delRows int64, err error) {
  184. var (
  185. c = context.TODO()
  186. mtime = time.Now().Add(-30 * 24 * time.Hour)
  187. before = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 0, 0, 0, mtime.Location())
  188. )
  189. if delRows, err = s.arc.DelTaskDoneBefore(c, before, limit); err != nil {
  190. log.Error("s.arc.DelTaskDoneBefore(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  191. return
  192. }
  193. if delRows > 0 {
  194. log.Info("delTaskDispatchDone before mtime(%s) rows(%d)", before.Format("2006-01-02 15:04:05"), delRows)
  195. }
  196. if delRows, err = s.arc.DelTaskBefore(c, before, limit); err != nil {
  197. log.Error("s.arc.DelTaskBefore(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  198. return
  199. }
  200. if delRows > 0 {
  201. log.Info("DelTaskBefore before mtime(%s) rows(%d)", before.Format("2006-01-02 15:04:05"), delRows)
  202. }
  203. return
  204. }
  205. func (s *Service) delTaskHistoryDone(limit int64) (delRows int64, err error) {
  206. var (
  207. c = context.TODO()
  208. mtime = time.Now().Add(-3 * 30 * 24 * time.Hour)
  209. before = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 0, 0, 0, mtime.Location())
  210. )
  211. if delRows, err = s.arc.DelTaskHistoryDone(c, before, limit); err != nil {
  212. log.Error("s.arc.DelTaskHistoryDone(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  213. return
  214. }
  215. log.Info("delTaskHistoryDone before mtime(%s) rows(%d)", before.Format("2006-01-02 15:04:05"), delRows)
  216. return
  217. }
  218. func (s *Service) delTaskExtend(limit int64) (delRows int64, err error) {
  219. var (
  220. c = context.TODO()
  221. mtime = time.Now().Add(-20 * 24 * time.Hour)
  222. before = time.Date(mtime.Year(), mtime.Month(), mtime.Day(), mtime.Hour(), 0, 0, 0, mtime.Location())
  223. )
  224. if delRows, err = s.arc.DelTaskExtend(c, before, limit); err != nil {
  225. log.Error("s.arc.DelTaskExtend(%s) error(%v)", mtime.Format("2006-01-02 15:04:05"), err)
  226. return
  227. }
  228. log.Info("delTaskExtend before mtime(%s) rows(%d)", before.Format("2006-01-02 15:04:05"), delRows)
  229. return
  230. }
  231. /*
  232. 1.移动task_dispatch到task_dispatch_done
  233. 2.移动task_oper_history到task_oper_history_done
  234. 3.删除过于久远的task_dispatch_done,task_oper_history_done,task_dispatch_extend
  235. */
  236. func (s *Service) movetaskproc() {
  237. defer s.waiter.Done()
  238. for {
  239. if s.closed {
  240. return
  241. }
  242. s.moveDispatch()
  243. time.Sleep(1 * time.Hour)
  244. }
  245. }
  246. func (s *Service) deltaskproc() {
  247. for {
  248. for {
  249. rows, _ := s.moveTaskOperHis(100)
  250. time.Sleep(1 * time.Second)
  251. if rows == 0 {
  252. break
  253. }
  254. }
  255. for {
  256. rows, _ := s.delTaskDispatchDone(100)
  257. time.Sleep(1 * time.Second)
  258. if rows == 0 {
  259. break
  260. }
  261. }
  262. for {
  263. rows, _ := s.delTaskHistoryDone(100)
  264. time.Sleep(1 * time.Second)
  265. if rows == 0 {
  266. break
  267. }
  268. }
  269. for {
  270. rows, _ := s.delTaskExtend(100)
  271. time.Sleep(1 * time.Second)
  272. if rows == 0 {
  273. break
  274. }
  275. }
  276. time.Sleep(nextDay(10))
  277. }
  278. }