report.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package service
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. "go-common/app/job/main/push/dao"
  7. pb "go-common/app/service/main/push/api/grpc/v1"
  8. pushmdl "go-common/app/service/main/push/model"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _dbBatch = 100000
  13. _cacheBatch = 50
  14. )
  15. func (s *Service) delInvalidReportsproc() {
  16. for {
  17. arg := &pb.DelInvalidReportsRequest{Type: pushmdl.DelMiFeedback}
  18. if _, err := s.pushRPC.DelInvalidReports(context.Background(), arg); err != nil {
  19. log.Error("s.pushRPC.DelInvalidReports(%d) error(%v)", arg.Type, err)
  20. dao.PromError("report:删除mi无效上报")
  21. }
  22. // arg = &pushmdl.ArgDelInvalidReport{Type: pushmdl.DelMiUninstalled}
  23. // if err := s.pushRPC.DelInvalidReports(context.Background(), arg); err != nil {
  24. // log.Error("s.pushRPC.DelInvalidReports(%d) error(%v)", arg.Type, err)
  25. // dao.PromError("report:删除mi卸载token")
  26. // }
  27. time.Sleep(time.Duration(s.c.Job.DelInvalidReportInterval))
  28. }
  29. }
  30. func (s *Service) reportproc() {
  31. defer s.waiter.Done()
  32. var err error
  33. for {
  34. msg, ok := <-s.reportCh
  35. if !ok {
  36. log.Warn("s.reportproc() closed")
  37. return
  38. }
  39. for _, v := range msg {
  40. if v == nil {
  41. continue
  42. }
  43. arg := &pb.AddReportRequest{
  44. Report: &pb.ModelReport{
  45. APPID: int32(v.APPID),
  46. PlatformID: int32(v.PlatformID),
  47. Mid: v.Mid,
  48. Buvid: v.Buvid,
  49. DeviceToken: v.DeviceToken,
  50. Build: int32(v.Build),
  51. TimeZone: int32(v.TimeZone),
  52. NotifySwitch: int32(v.NotifySwitch),
  53. DeviceBrand: v.DeviceBrand,
  54. DeviceModel: v.DeviceModel,
  55. OSVersion: v.OSVersion,
  56. Extra: v.Extra,
  57. },
  58. }
  59. for i := 0; i < _retry; i++ {
  60. if _, err = s.pushRPC.AddReport(context.Background(), arg); err == nil {
  61. break
  62. }
  63. time.Sleep(20 * time.Millisecond)
  64. }
  65. if err != nil {
  66. log.Error("s.pushRPC.AddReport(%+v) error(%v)", v, err)
  67. dao.PromError("report:新增上报数据")
  68. }
  69. time.Sleep(time.Millisecond)
  70. }
  71. }
  72. }
  73. func (s *Service) refreshTokensproc() {
  74. for {
  75. now := time.Now()
  76. if int(now.Weekday()) != s.c.Job.SyncReportCacheWeek || int(now.Hour()) != s.c.Job.SyncReportCacheHour {
  77. time.Sleep(time.Minute)
  78. continue
  79. }
  80. s.RefreshTokenCache()
  81. time.Sleep(time.Hour)
  82. }
  83. }
  84. // RefreshTokenCache .
  85. func (s *Service) RefreshTokenCache() {
  86. var (
  87. err error
  88. maxid int64
  89. ctx = context.Background()
  90. )
  91. for i := 0; i < _retry; i++ {
  92. if maxid, err = s.dao.ReportLastID(ctx); err == nil {
  93. break
  94. }
  95. time.Sleep(time.Second)
  96. }
  97. if err != nil {
  98. log.Error("s.refreshTokensproc() error(%v)", err)
  99. return
  100. }
  101. log.Info("refresh token start, maxid(%d)", maxid)
  102. var (
  103. updatedUsers int64
  104. updatedTokens int64
  105. sli []*pb.ModelReport
  106. pool = make(map[int64][]*pb.ModelReport)
  107. )
  108. for i := int64(0); i <= maxid; i += _dbBatch {
  109. var rs []*pushmdl.Report
  110. for j := 0; j < _retry; j++ {
  111. if rs, err = s.dao.ReportsByRange(ctx, i, i+_dbBatch); err == nil {
  112. break
  113. }
  114. time.Sleep(20 * time.Millisecond)
  115. }
  116. if err != nil {
  117. log.Error("s.dao.ReportsByRange(%d,%d) error(%v)", i, i+_dbBatch, err)
  118. continue
  119. }
  120. for _, r := range rs {
  121. if r.NotifySwitch == 0 {
  122. continue
  123. }
  124. nr := &pb.ModelReport{
  125. APPID: int32(r.APPID),
  126. PlatformID: int32(r.PlatformID),
  127. Mid: r.Mid,
  128. Buvid: r.Buvid,
  129. DeviceToken: r.DeviceToken,
  130. Build: int32(r.Build),
  131. TimeZone: int32(r.TimeZone),
  132. NotifySwitch: int32(r.NotifySwitch),
  133. DeviceBrand: r.DeviceBrand,
  134. DeviceModel: r.DeviceModel,
  135. OSVersion: r.OSVersion,
  136. Extra: r.Extra,
  137. }
  138. sli = append(sli, nr)
  139. if len(sli) >= _cacheBatch {
  140. s.addTokensCache(sli)
  141. sli = []*pb.ModelReport{}
  142. }
  143. if r.Mid == 0 {
  144. continue
  145. }
  146. pool[r.Mid] = append(pool[r.Mid], nr)
  147. updatedTokens++
  148. }
  149. log.Info("refresh token sovled min(%d) max(%d)", i, i+_dbBatch)
  150. time.Sleep(time.Millisecond)
  151. }
  152. if len(sli) > 0 {
  153. s.addTokensCache(sli)
  154. }
  155. log.Info("refresh token data, users(%d) tokens(%d)", len(pool), updatedTokens)
  156. for mid, rs := range pool {
  157. arg := &pb.AddUserReportCacheRequest{Mid: mid, Reports: rs}
  158. for i := 0; i < _retry; i++ {
  159. if _, err = s.pushRPC.AddUserReportCache(ctx, arg); err == nil {
  160. break
  161. }
  162. time.Sleep(10 * time.Millisecond)
  163. }
  164. if err != nil {
  165. log.Error("s.pushRPC.AddUserReportCache(%d) error(%v)", mid, err)
  166. continue
  167. }
  168. updatedUsers++
  169. delete(pool, mid)
  170. }
  171. pool = nil
  172. runtime.GC()
  173. log.Info("refresh token end, updated users(%d) tokens(%d)", updatedUsers, updatedTokens)
  174. }
  175. func (s *Service) addTokensCache(rs []*pb.ModelReport) (err error) {
  176. arg := new(pb.AddTokensCacheRequest)
  177. arg.Reports = append(arg.Reports, rs...)
  178. for i := 0; i < _retry; i++ {
  179. if _, err = s.pushRPC.AddTokensCache(context.Background(), arg); err == nil {
  180. break
  181. }
  182. time.Sleep(10 * time.Millisecond)
  183. }
  184. if err != nil {
  185. log.Error("s.pushRPC.AddTokensCache tokens(%d) error(%v)", len(rs), err)
  186. return
  187. }
  188. log.Info("s.pushRPC.AddTokensCache tokens(%d)", len(rs))
  189. return
  190. }
  191. func (s *Service) tokensByMids(task *pushmdl.Task, mids []int64) (res map[int][]string, valid int64, err error) {
  192. rs, _, err := s.dao.ReportsCacheByMids(context.Background(), mids)
  193. if err != nil {
  194. log.Error("s.dao.ReportsCacheByMids() error(%v)", err)
  195. return
  196. }
  197. var (
  198. exist = make(map[int64]bool, len(rs))
  199. // platformCount = len(task.Platform)
  200. buildCount = len(task.Build)
  201. )
  202. for mid := range rs {
  203. exist[mid] = true
  204. }
  205. for _, mid := range mids {
  206. if !exist[mid] {
  207. log.Warn("tokens by mid, task(%s) mid(%d)", task.ID, mid)
  208. }
  209. }
  210. res = make(map[int][]string)
  211. for _, rr := range rs {
  212. for _, r := range rr {
  213. if r.APPID != task.APPID {
  214. continue
  215. }
  216. if r.NotifySwitch == pushmdl.SwitchOff {
  217. continue
  218. }
  219. realTime := pushmdl.RealTime(r.TimeZone)
  220. if realTime.Unix() > int64(task.ExpireTime) {
  221. continue
  222. }
  223. // if platformCount > 0 && !validatePlatform(r.PlatformID, task.Platform) {
  224. // continue
  225. // }
  226. if buildCount > 0 && !pushmdl.ValidateBuild(r.PlatformID, r.Build, task.Build) {
  227. continue
  228. }
  229. res[r.PlatformID] = append(res[r.PlatformID], r.DeviceToken)
  230. }
  231. valid++
  232. }
  233. return
  234. }