passport.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/passport-game-cloud/model"
  7. "go-common/library/log"
  8. "github.com/go-sql-driver/mysql"
  9. "github.com/pkg/errors"
  10. )
  11. const (
  12. _userTableUpdateDuration = time.Second
  13. _mySQLErrCodeDuplicateEntry = 1062
  14. _tokenTableUpdateRetryCount = 3
  15. _tokenTableUpdateDuration = time.Second
  16. _tokenTablePrefix = "aso_app_perm"
  17. _tokenCacheRetryCount = 3
  18. _tokenCacheRetryDuration = time.Second
  19. _notifyGameRetryCount = 3
  20. _notifyGameRetryDuration = time.Second
  21. )
  22. func (s *Service) processUserInfo(bmsg *model.BMsg) {
  23. n := new(model.Info)
  24. if err := json.Unmarshal(bmsg.New, n); err != nil {
  25. log.Error("failed to parse binlog new, json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  26. return
  27. }
  28. s.asoAccountInterval.Prom(context.TODO(), bmsg.MTS)
  29. switch bmsg.Action {
  30. case "insert":
  31. s.addMemberInit(context.TODO(), n)
  32. s.delInfoCache(context.TODO(), n.Mid)
  33. case "update":
  34. old := new(model.Info)
  35. if err := json.Unmarshal(bmsg.Old, old); err != nil {
  36. log.Error("failed to parse binlog old, json.Unmarshal(%s) error(%v)", string(bmsg.Old), err)
  37. return
  38. }
  39. if n.Equals(old) {
  40. return
  41. }
  42. s.delInfoCache(context.TODO(), n.Mid)
  43. case "delete":
  44. s.delInfoCache(context.TODO(), n.Mid)
  45. }
  46. }
  47. // sub log encryption UPDATE INSERT
  48. func (s *Service) processAsoAccSub(msg *model.PMsg) {
  49. n := msg.Data
  50. flag := msg.Flag
  51. s.transInterval.Prom(context.TODO(), msg.MTS)
  52. switch msg.Action {
  53. case "insert":
  54. s.addAsoAccount(context.TODO(), n)
  55. case "update":
  56. s.updateAsoAccount(context.TODO(), n, flag)
  57. case "delete":
  58. s.delAsoAccount(context.TODO(), n.Mid)
  59. }
  60. s.delInfoCache(context.TODO(), n.Mid)
  61. s.notifyGame(context.TODO(), n.Mid, "", _updateUserInfo)
  62. }
  63. func (s *Service) processToken(bmsg *model.BMsg) {
  64. n := new(model.Perm)
  65. if err := json.Unmarshal(bmsg.New, n); err != nil {
  66. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  67. return
  68. }
  69. isGame := false
  70. for _, id := range s.gameAppIDs {
  71. if n.AppID == id {
  72. isGame = true
  73. break
  74. }
  75. }
  76. if !isGame {
  77. return
  78. }
  79. s.tokenInterval.Prom(context.TODO(), bmsg.MTS)
  80. switch bmsg.Action {
  81. case "insert":
  82. s.addToken(context.TODO(), n)
  83. s.setTokenCache(context.TODO(), n)
  84. case "update":
  85. old := new(model.Perm)
  86. if err := json.Unmarshal(bmsg.Old, old); err != nil {
  87. log.Error("failed to parse binlog old, json.Unmarshal(%s) error(%v)", string(bmsg.Old), err)
  88. return
  89. }
  90. if n.Equals(old) {
  91. return
  92. }
  93. s.updateToken(context.TODO(), n)
  94. s.setTokenCache(context.TODO(), n)
  95. case "delete":
  96. s.delToken(context.TODO(), n.AccessToken)
  97. s.delTokenCache(context.TODO(), n.AccessToken)
  98. }
  99. }
  100. func (s *Service) addAsoAccount(c context.Context, a *model.AsoAccount) (err error) {
  101. for {
  102. _, err = s.d.AddAsoAccount(c, a)
  103. if err == nil {
  104. break
  105. }
  106. switch nErr := errors.Cause(err).(type) {
  107. case *mysql.MySQLError:
  108. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  109. log.Error("failed to add aso because of duplicate entry, value is (%v), error(%v)", a, err)
  110. return
  111. }
  112. }
  113. time.Sleep(_userTableUpdateDuration)
  114. }
  115. return
  116. }
  117. func (s *Service) updateAsoAccount(c context.Context, a *model.AsoAccount, flag int) (err error) {
  118. for {
  119. _, err = s.UpdateAsoAccount(c, a, flag)
  120. if err == nil {
  121. break
  122. }
  123. switch nErr := errors.Cause(err).(type) {
  124. case *mysql.MySQLError:
  125. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  126. log.Error("failed to update aso because of duplicate entry, value is (%v), error(%v)", a, err)
  127. return
  128. }
  129. }
  130. time.Sleep(_userTableUpdateDuration)
  131. }
  132. return
  133. }
  134. // UpdateAsoAccount update aso account.
  135. func (s *Service) UpdateAsoAccount(c context.Context, t *model.AsoAccount, flag int) (affected int64, err error) {
  136. if affected, err = s.d.UpdateAsoAccount(c, t); err != nil {
  137. return
  138. }
  139. if flag != 1 {
  140. return
  141. }
  142. mid := t.Mid
  143. var tokens []string
  144. if tokens, err = s.d.Tokens(c, mid); err != nil {
  145. return
  146. }
  147. for _, token := range tokens {
  148. s.delToken(c, token)
  149. s.delTokenCache(c, token)
  150. s.notifyGame(c, mid, token, _changePwd)
  151. }
  152. return
  153. }
  154. func (s *Service) delAsoAccount(c context.Context, mid int64) (err error) {
  155. for {
  156. if _, err = s.d.DelAsoAccount(c, mid); err == nil {
  157. break
  158. }
  159. time.Sleep(_userTableUpdateDuration)
  160. }
  161. return
  162. }
  163. func (s *Service) addMemberInit(c context.Context, a *model.Info) (err error) {
  164. for {
  165. memberInfo := &model.Info{
  166. Mid: a.Mid,
  167. Uname: a.Uname,
  168. Face: "",
  169. }
  170. err = s.addMemberInfo(c, memberInfo)
  171. if err == nil {
  172. break
  173. }
  174. switch nErr := errors.Cause(err).(type) {
  175. case *mysql.MySQLError:
  176. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  177. log.Error("failed to add member because of duplicate entry, error(%v)", err)
  178. return
  179. }
  180. }
  181. time.Sleep(_userTableUpdateDuration)
  182. }
  183. return
  184. }
  185. func (s *Service) setTokenCache(c context.Context, t *model.Perm) (err error) {
  186. for i := 0; i < _tokenCacheRetryCount; i++ {
  187. if err = s.d.SetTokenCache(c, t); err == nil {
  188. break
  189. }
  190. time.Sleep(_tokenCacheRetryDuration)
  191. }
  192. return
  193. }
  194. func (s *Service) delTokenCache(c context.Context, accessToken string) (err error) {
  195. for i := 0; i < _tokenCacheRetryCount; i++ {
  196. if err = s.d.DelTokenCache(c, accessToken); err == nil {
  197. break
  198. }
  199. time.Sleep(_tokenCacheRetryDuration)
  200. }
  201. return
  202. }
  203. func (s *Service) addToken(c context.Context, t *model.Perm) (err error) {
  204. for i := 0; i < _tokenTableUpdateRetryCount; i++ {
  205. _, err = s.d.AddToken(c, t)
  206. if err == nil {
  207. return
  208. }
  209. switch nErr := errors.Cause(err).(type) {
  210. case *mysql.MySQLError:
  211. if nErr.Number == _mySQLErrCodeDuplicateEntry {
  212. log.Error("failed to add token because of duplicate entry, error(%v)", err)
  213. return
  214. }
  215. }
  216. time.Sleep(_tokenTableUpdateDuration)
  217. }
  218. return
  219. }
  220. func (s *Service) updateToken(c context.Context, t *model.Perm) (err error) {
  221. for i := 0; i < _tokenTableUpdateRetryCount; i++ {
  222. if _, err = s.d.UpdateToken(c, t); err == nil {
  223. return
  224. }
  225. time.Sleep(_tokenTableUpdateDuration)
  226. }
  227. return
  228. }
  229. func (s *Service) delToken(c context.Context, accessToken string) (err error) {
  230. for i := 0; i < _tokenTableUpdateRetryCount; i++ {
  231. if _, err = s.d.DelToken(c, accessToken); err == nil {
  232. return
  233. }
  234. time.Sleep(_tokenTableUpdateDuration)
  235. }
  236. return
  237. }
  238. func (s *Service) notifyGame(c context.Context, mid int64, token, action string) (err error) {
  239. for i := 0; i < _notifyGameRetryCount; i++ {
  240. if err = s.d.NotifyGame(c, mid, token, action); err == nil {
  241. return
  242. }
  243. time.Sleep(_notifyGameRetryDuration)
  244. }
  245. return
  246. }