mysql_task.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "strconv"
  7. "time"
  8. pamdl "go-common/app/admin/main/push/model"
  9. pushmdl "go-common/app/service/main/push/model"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _addTaskSQL = "INSERT INTO push_tasks (job,type,app_id,business_id,platform,platform_id,title,summary,link_type,link_value,build,sound,vibration,pass_through,mid_file,push_time,expire_time,status,`group`,image_url,extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
  15. _delTasksSQL = `DELETE FROM push_tasks where mtime <= ? limit ?`
  16. _upadteTaskStatusSQL = "UPDATE push_tasks SET status=? WHERE id=?"
  17. _taskByStatusSQL = "SELECT id,job,type,app_id,business_id,platform,title,summary,link_type,link_value,build,sound,vibration,pass_through,mid_file,progress,push_time,expire_time,status,`group`,image_url,extra FROM push_tasks WHERE status=? AND dtime=0 LIMIT 1 FOR UPDATE"
  18. _upadteTaskSQL = "UPDATE push_tasks SET mid_file=?,status=? WHERE id=?"
  19. // dataplatform
  20. _txDpCondByStatusSQL = `SELECT id,job,task,conditions,sql_stmt,status,status_url,file FROM push_dataplatform_conditions WHERE status=? LIMIT 1 FOR UPDATE`
  21. _updateDpCondSQL = `UPDATE push_dataplatform_conditions SET job=?,task=?,conditions=?,sql_stmt=?,status=?,status_url=?,file=? WHERE id=?`
  22. _UpdateDpCondStatusSQL = `UPDATE push_dataplatform_conditions SET status=? WHERE id=?`
  23. )
  24. // DelTasks deletes tasks.
  25. func (d *Dao) DelTasks(c context.Context, t time.Time, limit int) (rows int64, err error) {
  26. res, err := d.delTasksStmt.Exec(c, t, limit)
  27. if err != nil {
  28. log.Error("d.DelTasks(%v) error(%v)", t, err)
  29. PromError("mysql:DelTasks")
  30. return
  31. }
  32. rows, err = res.RowsAffected()
  33. return
  34. }
  35. // TxTaskByStatus gets task by status by tx.
  36. func (d *Dao) TxTaskByStatus(tx *xsql.Tx, status int8) (t *pushmdl.Task, err error) {
  37. var (
  38. id int64
  39. platform string
  40. build string
  41. progress string
  42. extra string
  43. now = time.Now()
  44. )
  45. t = &pushmdl.Task{Progress: &pushmdl.Progress{}, Extra: &pushmdl.TaskExtra{}}
  46. if err = tx.QueryRow(_taskByStatusSQL, status).Scan(&id, &t.Job, &t.Type, &t.APPID, &t.BusinessID, &platform, &t.Title, &t.Summary, &t.LinkType, &t.LinkValue, &build,
  47. &t.Sound, &t.Vibration, &t.PassThrough, &t.MidFile, &progress, &t.PushTime, &t.ExpireTime, &t.Status, &t.Group, &t.ImageURL, &extra); err != nil {
  48. t = nil
  49. if err == sql.ErrNoRows {
  50. err = nil
  51. return
  52. }
  53. log.Error("d.TxTaskByStatus() QueryRow(%d,%v) error(%v)", status, now, err)
  54. PromError("mysql:按状态查询任务")
  55. return
  56. }
  57. t.ID = strconv.FormatInt(id, 10)
  58. t.Platform = pushmdl.SplitInts(platform)
  59. t.Build = pushmdl.ParseBuild(build)
  60. if progress != "" {
  61. if err = json.Unmarshal([]byte(progress), t.Progress); err != nil {
  62. log.Error("json.Unmarshal(%s) error(%v)", progress, err)
  63. return
  64. }
  65. }
  66. if extra != "" {
  67. if err = json.Unmarshal([]byte(extra), t.Extra); err != nil {
  68. log.Error("json.Unmarshal(%s) error(%v)", extra, err)
  69. }
  70. }
  71. return
  72. }
  73. // TxUpdateTaskStatus updates task status by tx.
  74. func (d *Dao) TxUpdateTaskStatus(tx *xsql.Tx, taskID string, status int8) (err error) {
  75. id, _ := strconv.ParseInt(taskID, 10, 64)
  76. if _, err = tx.Exec(_upadteTaskStatusSQL, status, id); err != nil {
  77. log.Error("d.TxUpdateTaskStatus() Exec(%s,%d) error(%v)", taskID, status, err)
  78. PromError("mysql:更新推送任务状态")
  79. }
  80. return
  81. }
  82. // UpdateTaskStatus update task status.
  83. func (d *Dao) UpdateTaskStatus(c context.Context, taskID int64, status int8) (err error) {
  84. if _, err = d.updateTaskStatusStmt.Exec(c, status, taskID); err != nil {
  85. log.Error("d.updateTaskStatusStmt.Exec(%d,%d) error(%v)", taskID, status, err)
  86. PromError("mysql:更新推送任务状态")
  87. }
  88. return
  89. }
  90. // UpdateTask update task.
  91. func (d *Dao) UpdateTask(c context.Context, taskID string, file string, status int8) (err error) {
  92. id, _ := strconv.ParseInt(taskID, 10, 64)
  93. if _, err = d.updateTaskStmt.Exec(c, file, status, id); err != nil {
  94. log.Error("d.updateTaskFileStmt.Exec(%d,%s,%d) error(%v)", id, file, status, err)
  95. PromError("mysql:更新推送任务file")
  96. }
  97. return
  98. }
  99. // AddTask adds task
  100. func (d *Dao) AddTask(ctx context.Context, t *pushmdl.Task) (err error) {
  101. var (
  102. platform = pushmdl.JoinInts(t.Platform)
  103. build, _ = json.Marshal(t.Build)
  104. extra, _ = json.Marshal(t.Extra)
  105. )
  106. if _, err = d.db.Exec(ctx, _addTaskSQL, t.Job, t.Type, t.APPID, t.BusinessID, platform, t.PlatformID, t.Title, t.Summary, t.LinkType, t.LinkValue,
  107. build, t.Sound, t.Vibration, t.PassThrough, t.MidFile, t.PushTime, t.ExpireTime, t.Status, t.Group, t.ImageURL, extra); err != nil {
  108. log.Error("d.AddTask(%+v) error(%v)", t, err)
  109. }
  110. return
  111. }
  112. // TxCondByStatus gets condition by status.
  113. func (d *Dao) TxCondByStatus(tx *xsql.Tx, status int) (cond *pamdl.DPCondition, err error) {
  114. cond = new(pamdl.DPCondition)
  115. if err = tx.QueryRow(_txDpCondByStatusSQL, status).Scan(&cond.ID, &cond.Job, &cond.Task, &cond.Condition, &cond.SQL, &cond.Status, &cond.StatusURL, &cond.File); err != nil {
  116. if err == sql.ErrNoRows {
  117. cond = nil
  118. err = nil
  119. }
  120. return
  121. }
  122. return
  123. }
  124. // UpdateDpCond update data platform query condition
  125. func (d *Dao) UpdateDpCond(ctx context.Context, cond *pamdl.DPCondition) (err error) {
  126. if _, err = d.updateDpCondStmt.Exec(ctx, cond.Job, cond.Task, cond.Condition, cond.SQL, cond.Status, cond.StatusURL, cond.File, cond.ID); err != nil {
  127. log.Error("d.UpdateDpCond(%+v) error(%v)", cond, err)
  128. }
  129. return
  130. }
  131. // UpdateDpCondStatus update data platform query condition
  132. func (d *Dao) UpdateDpCondStatus(ctx context.Context, id int64, status int) (err error) {
  133. if _, err = d.db.Exec(ctx, _UpdateDpCondStatusSQL, status, id); err != nil {
  134. log.Error("d.UpdateCondStatus(%d,%d) error(%v)", id, status, err)
  135. }
  136. return
  137. }
  138. // TxUpdateCondStatus update data platform query condition status
  139. func (d *Dao) TxUpdateCondStatus(tx *xsql.Tx, id int64, status int) (err error) {
  140. if _, err = tx.Exec(_UpdateDpCondStatusSQL, status, id); err != nil {
  141. log.Error("d.TxUpdateCondStatus(%d,%d) error(%v)", id, status, err)
  142. }
  143. return
  144. }