dao.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/service/main/push/conf"
  6. "go-common/app/service/main/push/dao/apns2"
  7. "go-common/app/service/main/push/dao/fcm"
  8. "go-common/app/service/main/push/dao/huawei"
  9. "go-common/app/service/main/push/dao/jpush"
  10. "go-common/app/service/main/push/dao/mi"
  11. "go-common/app/service/main/push/dao/oppo"
  12. "go-common/app/service/main/push/model"
  13. "go-common/library/cache/memcache"
  14. xredis "go-common/library/cache/redis"
  15. xsql "go-common/library/database/sql"
  16. "go-common/library/log"
  17. "go-common/library/queue/databus"
  18. "go-common/library/stat/prom"
  19. )
  20. const (
  21. _retry = 3
  22. )
  23. //go:generate $GOPATH/src/go-common/app/tool/cache/mc
  24. type _mc interface {
  25. //mc: -key=tokenKey -type=get
  26. TokenCache(c context.Context, key string) (*model.Report, error)
  27. //mc: -key=tokenKey -expire=d.mcReportExpire
  28. AddTokenCache(c context.Context, key string, value *model.Report) error
  29. //mc: -key=tokenKey -expire=d.mcReportExpire
  30. AddTokensCache(c context.Context, values map[string]*model.Report) error
  31. //mc: -key=tokenKey
  32. DelTokenCache(c context.Context, key string) error
  33. }
  34. // Dao .
  35. type Dao struct {
  36. c *conf.Config
  37. db *xsql.DB
  38. mc *memcache.Pool
  39. redis *xredis.Pool
  40. reportPub *databus.Databus
  41. callbackPub *databus.Databus
  42. clientsIPhone map[int64][]*apns2.Client
  43. clientsIPad map[int64][]*apns2.Client
  44. clientsMi map[int64][]*mi.Client
  45. clientMiByMids map[int64]*mi.Client
  46. clientsHuawei map[int64][]*huawei.Client
  47. clientsOppo map[int64][]*oppo.Client
  48. clientsJpush map[int64][]*jpush.Client
  49. clientsFCM map[int64][]*fcm.Client
  50. clientsLen map[string]int
  51. clientsIndex map[string]*uint32
  52. huaweiAuth map[int64]*huawei.Access
  53. oppoAuth map[int64]*oppo.Auth
  54. addTaskStmt *xsql.Stmt
  55. updateTaskStatusStmt *xsql.Stmt
  56. updateTaskProgressStmt *xsql.Stmt
  57. taskStmt *xsql.Stmt
  58. businessesStmt *xsql.Stmt
  59. settingStmt *xsql.Stmt
  60. setSettingStmt *xsql.Stmt
  61. authsStmt *xsql.Stmt
  62. addReportStmt *xsql.Stmt
  63. updateReportStmt *xsql.Stmt
  64. reportStmt *xsql.Stmt
  65. reportByIDStmt *xsql.Stmt
  66. delReportStmt *xsql.Stmt
  67. reportsByMidStmt *xsql.Stmt
  68. lastReportIDStmt *xsql.Stmt
  69. addCallbackStmt *xsql.Stmt
  70. redisTokenExpire int32
  71. redisLaterExpire int32
  72. redisMidsExpire int32
  73. mcReportExpire int32
  74. mcSettingExpire int32
  75. mcUUIDExpire int32
  76. }
  77. var (
  78. errorsCount = prom.BusinessErrCount
  79. infosCount = prom.BusinessInfoCount
  80. missedCount = prom.CacheMiss
  81. cachedCount = prom.CacheHit
  82. )
  83. // New creates a push-service DAO instance.
  84. func New(c *conf.Config) *Dao {
  85. d := &Dao{
  86. c: c,
  87. db: xsql.NewMySQL(c.MySQL),
  88. mc: memcache.NewPool(c.Memcache.Config),
  89. redis: xredis.NewPool(c.Redis.Config),
  90. reportPub: databus.New(c.ReportPub),
  91. callbackPub: databus.New(c.CallbackPub),
  92. clientsIPhone: make(map[int64][]*apns2.Client),
  93. clientsIPad: make(map[int64][]*apns2.Client),
  94. clientsMi: make(map[int64][]*mi.Client),
  95. clientMiByMids: make(map[int64]*mi.Client),
  96. clientsHuawei: make(map[int64][]*huawei.Client),
  97. clientsOppo: make(map[int64][]*oppo.Client),
  98. clientsJpush: make(map[int64][]*jpush.Client),
  99. clientsFCM: make(map[int64][]*fcm.Client),
  100. clientsLen: make(map[string]int),
  101. clientsIndex: make(map[string]*uint32),
  102. huaweiAuth: make(map[int64]*huawei.Access),
  103. oppoAuth: make(map[int64]*oppo.Auth),
  104. redisTokenExpire: int32(time.Duration(c.Redis.TokenExpire) / time.Second),
  105. redisLaterExpire: int32(time.Duration(c.Redis.LaterExpire) / time.Second),
  106. redisMidsExpire: int32(time.Duration(c.Redis.MidsExpire) / time.Second),
  107. mcReportExpire: int32(time.Duration(c.Memcache.ReportExpire) / time.Second),
  108. mcSettingExpire: int32(time.Duration(c.Memcache.SettingExpire) / time.Second),
  109. mcUUIDExpire: int32(time.Duration(c.Memcache.UUIDExpire) / time.Second),
  110. }
  111. d.addTaskStmt = d.db.Prepared(_addTaskSQL)
  112. d.updateTaskStatusStmt = d.db.Prepared(_upadteTaskStatusSQL)
  113. d.updateTaskProgressStmt = d.db.Prepared(_upadteTaskProgressSQL)
  114. d.taskStmt = d.db.Prepared(_taskByIDSQL)
  115. d.businessesStmt = d.db.Prepared(_businessesSQL)
  116. d.settingStmt = d.db.Prepared(_settingSQL)
  117. d.setSettingStmt = d.db.Prepared(_setSettingSQL)
  118. d.authsStmt = d.db.Prepared(_authsSQL)
  119. d.addReportStmt = d.db.Prepared(_addReportSQL)
  120. d.updateReportStmt = d.db.Prepared(_updateReportSQL)
  121. d.addCallbackStmt = d.db.Prepared(_addCallbackSQL)
  122. d.reportStmt = d.db.Prepared(_reportSQL)
  123. d.reportByIDStmt = d.db.Prepared(_reportByIDSQL)
  124. d.delReportStmt = d.db.Prepared(_delReportSQL)
  125. d.reportsByMidStmt = d.db.Prepared(_reportsByMidSQL)
  126. d.lastReportIDStmt = d.db.Prepared(_lastReportIDSQL)
  127. go d.refreshAuthproc()
  128. time.Sleep(time.Second)
  129. d.loadClients()
  130. return d
  131. }
  132. func (d *Dao) refreshAuthproc() {
  133. for {
  134. auths, err := d.auths(context.Background())
  135. if err != nil {
  136. log.Error("d.auths() error(%v)", err)
  137. time.Sleep(time.Second)
  138. continue
  139. }
  140. for _, a := range auths {
  141. d.refreshAuth(a)
  142. }
  143. time.Sleep(1 * time.Minute)
  144. }
  145. }
  146. func (d *Dao) refreshAuth(a *model.Auth) {
  147. i := fmtRoundIndex(a.APPID, a.PlatformID)
  148. switch a.PlatformID {
  149. case model.PlatformOppo:
  150. if d.clientsLen[i] == 0 || d.oppoAuth[a.APPID] == nil || d.oppoAuth[a.APPID].IsExpired() {
  151. auth, err := oppo.NewAuth(a.Key, a.Value)
  152. if err != nil {
  153. log.Error("new oppo auth failed, key(%s) secret(%s) error(%v)", a.Key, a.Value, err)
  154. return
  155. }
  156. log.Info("oppo refresh auth app(%d) auth(%+v)", a.APPID, auth)
  157. if d.oppoAuth[a.APPID] == nil {
  158. d.oppoAuth[a.APPID] = new(oppo.Auth)
  159. }
  160. *d.oppoAuth[a.APPID] = *auth
  161. if d.clientsLen[i] == 0 {
  162. cs := d.newOppoClients(a.APPID, a.BundleID)
  163. if len(cs) > 0 {
  164. d.clientsOppo[a.APPID] = cs
  165. d.clientsLen[i] = len(d.clientsOppo)
  166. log.Info("oppo renew push clients app(%d)", a.APPID)
  167. }
  168. }
  169. }
  170. case model.PlatformHuawei:
  171. if d.clientsLen[i] == 0 || d.huaweiAuth[a.APPID] == nil || d.huaweiAuth[a.APPID].IsExpired() {
  172. ac, err := huawei.NewAccess(a.Key, a.Value)
  173. if err != nil {
  174. log.Error("new huawei access failed, id(%s) secret(%s) error(%v)", a.Key, a.Value, err)
  175. return
  176. }
  177. log.Info("huawei refresh auth app(%d) auth(%+v)", a.APPID, ac)
  178. if d.huaweiAuth[a.APPID] == nil {
  179. d.huaweiAuth[a.APPID] = new(huawei.Access)
  180. }
  181. *d.huaweiAuth[a.APPID] = *ac
  182. if d.clientsLen[i] == 0 {
  183. cs := d.newHuaweiClients(a.APPID, a.BundleID)
  184. if len(cs) > 0 {
  185. d.clientsHuawei[a.APPID] = cs
  186. d.clientsLen[i] = len(d.clientsHuawei)
  187. log.Info("huawei renew push clients app(%d)", a.APPID)
  188. }
  189. }
  190. }
  191. }
  192. }
  193. // PromError prom error
  194. func PromError(name string) {
  195. errorsCount.Incr(name)
  196. }
  197. // PromInfo add prom info
  198. func PromInfo(name string) {
  199. infosCount.Incr(name)
  200. }
  201. // PromChanLen channel length
  202. func PromChanLen(name string, length int64) {
  203. infosCount.State(name, length)
  204. }
  205. // BeginTx begin transaction.
  206. func (d *Dao) BeginTx(c context.Context) (*xsql.Tx, error) {
  207. return d.db.Begin(c)
  208. }
  209. // Close dao.
  210. func (d *Dao) Close() (err error) {
  211. if err = d.db.Close(); err != nil {
  212. log.Error("d.db.Close() error(%v)", err)
  213. PromError("db:close")
  214. }
  215. if err = d.redis.Close(); err != nil {
  216. log.Error("d.redis.Close() error(%v)", err)
  217. PromError("redis:close")
  218. }
  219. if err = d.mc.Close(); err != nil {
  220. log.Error("d.mc.Close() error(%v)", err)
  221. PromError("mc:close")
  222. }
  223. return
  224. }
  225. // Ping check connection status.
  226. func (d *Dao) Ping(c context.Context) (err error) {
  227. if err = d.pingRedis(c); err != nil {
  228. PromError("redis:Ping")
  229. log.Error("d.pingRedis error(%v)", err)
  230. return
  231. }
  232. if err = d.pingMC(c); err != nil {
  233. PromError("mc:Ping")
  234. log.Error("d.pingMC error(%v)", err)
  235. return
  236. }
  237. if err = d.db.Ping(c); err != nil {
  238. PromError("mysql:Ping")
  239. log.Error("d.db.Ping error(%v)", err)
  240. }
  241. return
  242. }