wechat.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package wechat
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "go-common/app/admin/ep/saga/conf"
  8. "go-common/app/admin/ep/saga/dao"
  9. "go-common/app/admin/ep/saga/model"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. // Wechat 企业微信应用
  14. type Wechat struct {
  15. dao *dao.Dao
  16. saga *model.AppConfig
  17. contact *model.AppConfig
  18. }
  19. // New create an new wechat work
  20. func New(d *dao.Dao) (w *Wechat) {
  21. w = &Wechat{
  22. dao: d,
  23. saga: conf.Conf.Property.Wechat,
  24. contact: conf.Conf.Property.Contact,
  25. }
  26. return w
  27. }
  28. // NewTxtNotify create wechat format text notification 从配置初始化企业微信TxtNotification
  29. func (w *Wechat) NewTxtNotify(content string) (txtMsg *model.TxtNotification) {
  30. return &model.TxtNotification{
  31. Notification: model.Notification{
  32. MsgType: "text",
  33. AgentID: w.saga.AppID,
  34. },
  35. Body: model.Text{
  36. Content: content,
  37. },
  38. Safe: 0,
  39. }
  40. }
  41. // AccessToken get access_token from cache first, if not found, get it via wechat api.
  42. func (w *Wechat) AccessToken(c context.Context, app *model.AppConfig) (token string, err error) {
  43. var (
  44. key string
  45. expire int32
  46. )
  47. key = fmt.Sprintf("appid_%d", app.AppID)
  48. if token, err = w.dao.AccessTokenRedis(c, key); err != nil {
  49. log.Warn("AccessToken: failed to get access_token from cache, appId (%s), error (%s)", app.AppID, err.Error())
  50. //企业微信api获取公司token
  51. if token, expire, err = w.dao.WechatAccessToken(c, app.AppSecret); err != nil {
  52. err = errors.Wrapf(err, "AccessToken: both mc and api can't provide access_token, appId(%s)", app.AppID)
  53. return
  54. }
  55. // 通过API获取到了,缓存一波
  56. err = w.dao.SetAccessTokenRedis(c, key, token, expire)
  57. return
  58. }
  59. if token == "" {
  60. if token, expire, err = w.dao.WechatAccessToken(c, app.AppSecret); err != nil {
  61. return
  62. }
  63. // 通过API获取到了,缓存一波
  64. err = w.dao.SetAccessTokenRedis(c, key, token, expire)
  65. }
  66. return
  67. }
  68. // PushMsg push text message via wechat notification api with access_token.推送企业微信
  69. func (w *Wechat) PushMsg(c context.Context, userNames []string, content string) (err error) {
  70. var (
  71. token string
  72. userIds string
  73. invalidUser string
  74. userNamesByte []byte
  75. txtMsg = w.NewTxtNotify(content)
  76. contentDB = content
  77. )
  78. //获取企业token
  79. if token, err = w.AccessToken(c, w.saga); err != nil {
  80. return
  81. }
  82. if token == "" {
  83. err = errors.Errorf("PushMsg: get access token failed, it's empty. appid (%s), secret (%s)", w.saga.AppID, w.saga.AppSecret)
  84. return
  85. }
  86. //员工编号以竖线分隔
  87. if userIds, err = w.UserIds(userNames); err != nil {
  88. return
  89. }
  90. txtMsg.ToUser = userIds
  91. if invalidUser, err = w.dao.WechatPushMsg(c, token, txtMsg); err != nil {
  92. if err = w.addRequireVisible(c, invalidUser); err != nil {
  93. log.Error("PushMsg add userID (%s) in cache, error(%s)", invalidUser, err.Error())
  94. }
  95. return
  96. }
  97. if userNamesByte, err = json.Marshal(userNames); err != nil {
  98. return
  99. }
  100. if len(contentDB) > model.MaxWechatLen {
  101. contentDB = contentDB[:model.MaxWechatLen]
  102. }
  103. messageLog := &model.WechatMessageLog{
  104. Touser: string(userNamesByte),
  105. Content: contentDB,
  106. Status: 1,
  107. }
  108. return w.dao.CreateMessageLog(messageLog)
  109. }
  110. // UserIds query user ids for user name list 查询员工编号
  111. func (w *Wechat) UserIds(userNames []string) (ids string, err error) {
  112. if ids, err = w.dao.UserIds(userNames); err != nil {
  113. return
  114. }
  115. return
  116. }
  117. // addRequireVisible update wechat require visible users in memcache
  118. func (w *Wechat) addRequireVisible(c context.Context, userIDs string) (err error) {
  119. var (
  120. contactInfo *model.ContactInfo
  121. userID string
  122. alreadyIn bool
  123. )
  124. users := strings.Split(userIDs, "|")
  125. for _, userID = range users {
  126. //查看是否缓存,缓存则继续
  127. if alreadyIn, err = w.alreadyInCache(c, userID); err != nil || alreadyIn {
  128. continue
  129. }
  130. //未缓存从数据库查询
  131. if contactInfo, err = w.dao.QueryUserByID(userID); err != nil {
  132. log.Error("no such userID (%s) in db, error(%s)", userID, err.Error())
  133. return
  134. }
  135. //数据库查询结果缓存
  136. if err = w.dao.SetRequireVisibleUsersRedis(c, contactInfo); err != nil {
  137. log.Error("failed set to cache userID (%s) username (%s), err (%s)", userID, contactInfo.UserName, err.Error())
  138. return
  139. }
  140. }
  141. return
  142. }
  143. // alreadyInCache check user is or not in the memcache
  144. func (w *Wechat) alreadyInCache(c context.Context, userID string) (alreadyIn bool, err error) {
  145. var (
  146. userMap = make(map[string]model.RequireVisibleUser)
  147. )
  148. //查询所有的值
  149. if err = w.dao.RequireVisibleUsersRedis(c, &userMap); err != nil {
  150. log.Error("get userID (%s) from cache error(%s)", userID, err.Error())
  151. return
  152. }
  153. //匹配需要查询的用户id
  154. for k, v := range userMap {
  155. if userID == k {
  156. log.Info("(%s) is already exist in cache, value(%v)", k, v)
  157. alreadyIn = true
  158. return
  159. }
  160. }
  161. return
  162. }