service.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go-common/app/job/live-userexp/conf"
  9. "go-common/app/job/live-userexp/dao"
  10. "go-common/app/job/live-userexp/model"
  11. "go-common/library/log"
  12. "go-common/library/queue/databus"
  13. )
  14. // Service http service
  15. type Service struct {
  16. c *conf.Config
  17. keys map[string]string
  18. dao *dao.Dao
  19. missch chan func()
  20. expSub *databus.Databus
  21. waiter *sync.WaitGroup
  22. expUpMo int64
  23. }
  24. // New for new service obj
  25. func New(c *conf.Config) *Service {
  26. s := &Service{
  27. c: c,
  28. keys: map[string]string{},
  29. dao: dao.New(c),
  30. missch: make(chan func(), 1024),
  31. expSub: databus.New(c.ExpSub),
  32. waiter: new(sync.WaitGroup),
  33. }
  34. s.waiter.Add(1)
  35. go s.expCanalConsumeproc()
  36. go s.checkExpCanalConsumeproc()
  37. return s
  38. }
  39. // Ping check server ok
  40. func (s *Service) Ping(c context.Context) (err error) {
  41. return s.dao.Ping(c)
  42. }
  43. // Close dao
  44. func (s *Service) Close() {
  45. defer s.waiter.Wait()
  46. s.expSub.Close()
  47. s.dao.Close()
  48. }
  49. // expCanalConsumeproc consumer archive
  50. func (s *Service) expCanalConsumeproc() {
  51. var (
  52. msgs = s.expSub.Messages()
  53. err error
  54. )
  55. defer s.waiter.Done()
  56. for {
  57. msg, ok := <-msgs
  58. if !ok {
  59. log.Info("expCanal databus Consumer exit")
  60. return
  61. }
  62. s.expUpMo++
  63. msg.Commit()
  64. m := &model.Message{}
  65. if err = json.Unmarshal(msg.Value, m); err != nil {
  66. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  67. continue
  68. }
  69. if !strings.HasPrefix(m.Table, "user_exp_") || m.Action != "update" {
  70. continue
  71. }
  72. s.levelCacheUpdate(m.New, m.Old)
  73. }
  74. }
  75. // checkConsumeproc check consumer stat
  76. func (s *Service) checkExpCanalConsumeproc() {
  77. if s.c.Env != "pro" {
  78. return
  79. }
  80. var expMo int64
  81. for {
  82. time.Sleep(1 * time.Minute)
  83. if s.expUpMo-expMo == 0 {
  84. msg := "live-userexp-job expCanal did not consume within a minute"
  85. //s.dao.SendSMS(msg)
  86. log.Warn(msg)
  87. }
  88. expMo = s.expUpMo
  89. }
  90. }
  91. // Wait goroutinue to close
  92. func (s *Service) Wait() {
  93. s.waiter.Wait()
  94. }