user_likes.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/thumbup/model"
  6. xmdl "go-common/app/service/main/thumbup/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. func newUserLikeMsg(msg *databus.Message) (res interface{}, err error) {
  11. userLikesMsg := new(xmdl.UserMsg)
  12. if err = json.Unmarshal(msg.Value, &userLikesMsg); err != nil {
  13. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  14. return
  15. }
  16. log.Info("get user like msg: %+v", userLikesMsg)
  17. res = userLikesMsg
  18. return
  19. }
  20. func userLikesSplit(msg *databus.Message, data interface{}) int {
  21. um, ok := data.(*xmdl.UserMsg)
  22. if !ok {
  23. log.Error("user like msg err: mid: 0 %s", msg.Value)
  24. return 0
  25. }
  26. return int(um.Mid)
  27. }
  28. func (s *Service) userLikesDo(ms []interface{}) {
  29. for _, m := range ms {
  30. um, ok := m.(*xmdl.UserMsg)
  31. if !ok {
  32. continue
  33. }
  34. var (
  35. err error
  36. businessID int64
  37. ctx = context.Background()
  38. )
  39. if businessID, err = s.checkBusiness(um.Business); err != nil {
  40. log.Warn("userlikes: checkBusiness(%s) err:%+v", um.Business, err)
  41. continue
  42. }
  43. var exist bool
  44. if exist, _ = s.dao.ExpireUserLikesCache(ctx, um.Mid, businessID, um.State); exist {
  45. log.Warn("userlikes: ExpireUserLikesCache(%+v) exist ignore", um, err)
  46. continue
  47. }
  48. for i := 0; i < _retryTimes; i++ {
  49. if err = s.addUserLikesCache(context.Background(), um); err == nil {
  50. break
  51. }
  52. }
  53. if err != nil {
  54. log.Error("userLikes fail params(%+v) err: %+v", m, err)
  55. } else {
  56. log.Info("userLikes success params(%+v)", m)
  57. }
  58. }
  59. }
  60. // addUserLikesCache .
  61. func (s *Service) addUserLikesCache(c context.Context, p *xmdl.UserMsg) (err error) {
  62. var businessID int64
  63. if businessID, err = s.checkBusiness(p.Business); err != nil {
  64. log.Error("s.checkBusiness business(%s) error(%v)", p.Business, err)
  65. return
  66. }
  67. var items []*model.ItemLikeRecord
  68. var limit = s.businessIDMap[businessID].UserLikesLimit
  69. if items, err = s.dao.UserLikes(c, p.Mid, businessID, p.State, limit); err != nil {
  70. log.Error("s.dao.UserLikes mid(%d) businessID(%d)(%d) type(%d) error(%v)", p.Mid, businessID, p.State, err)
  71. return
  72. }
  73. err = s.dao.AddUserLikesCache(c, p.Mid, businessID, items, p.State, limit)
  74. return
  75. }
  76. func (s *Service) addUserlikeRecord(c context.Context, mid, businessID int64, state int8, item *model.ItemLikeRecord) (err error) {
  77. var exist bool
  78. if exist, err = s.dao.ExpireUserLikesCache(c, mid, businessID, state); (err != nil) || !exist {
  79. return
  80. }
  81. limit := s.businessIDMap[businessID].UserLikesLimit
  82. err = s.dao.AppendCacheUserLikeList(c, mid, item, businessID, state, limit)
  83. return
  84. }