service.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "go-common/app/infra/notify/conf"
  8. "go-common/app/infra/notify/dao"
  9. "go-common/app/infra/notify/model"
  10. "go-common/app/infra/notify/notify"
  11. "go-common/library/conf/env"
  12. "go-common/library/log"
  13. )
  14. // Service struct
  15. type Service struct {
  16. c *conf.Config
  17. dao *dao.Dao
  18. plock sync.RWMutex
  19. pubConfs map[string]*model.Pub
  20. subs map[string]*notify.Sub
  21. pubs map[string]*notify.Pub
  22. }
  23. // New init
  24. func New(c *conf.Config) (s *Service) {
  25. s = &Service{
  26. c: c,
  27. dao: dao.New(c),
  28. pubConfs: make(map[string]*model.Pub),
  29. subs: make(map[string]*notify.Sub),
  30. pubs: make(map[string]*notify.Pub),
  31. }
  32. err := s.loadNotify()
  33. if err != nil {
  34. return
  35. }
  36. go s.notifyproc()
  37. go s.loadPub()
  38. go s.retryproc()
  39. return s
  40. }
  41. func (s *Service) loadPub() {
  42. for {
  43. pubs, err := s.dao.LoadPub(context.TODO())
  44. if err != nil {
  45. log.Error("load pub info err %v", err)
  46. time.Sleep(time.Minute)
  47. continue
  48. }
  49. ps := make(map[string]*model.Pub, len(pubs))
  50. for _, p := range pubs {
  51. ps[key(p.Group, p.Topic)] = p
  52. }
  53. s.pubConfs = ps
  54. time.Sleep(time.Minute)
  55. }
  56. }
  57. // TODO():auto reload and update.
  58. func (s *Service) loadNotify() (err error) {
  59. watcher, err := s.dao.LoadNotify(context.TODO(), env.Zone)
  60. if err != nil {
  61. log.Error("load notify err %v", err)
  62. return
  63. }
  64. subs := make(map[string]*notify.Sub, len(watcher))
  65. for _, w := range watcher {
  66. if sub, ok := s.subs[key(w.Group, w.Topic)]; ok && !sub.Closed() && !sub.IsUpdate(w) {
  67. subs[key(w.Group, w.Topic)] = sub
  68. } else {
  69. n, err := s.newSub(w)
  70. if err != nil {
  71. log.Error("create notify topic(%s) group(%s) err(%v)", w.Topic, w.Group, err)
  72. continue
  73. }
  74. subs[key(w.Group, w.Topic)] = n
  75. log.Info("new sub %s %s", w.Group, w.Topic)
  76. }
  77. }
  78. // close subs not subscribe any more.
  79. for k, sub := range s.subs {
  80. if _, ok := subs[k]; !ok {
  81. sub.Close()
  82. log.Info("close sub not subscribe any %s", k)
  83. }
  84. }
  85. s.subs = subs
  86. return
  87. }
  88. func (s *Service) newSub(w *model.Watcher) (*notify.Sub, error) {
  89. var err error
  90. if w.Filter {
  91. w.Filters, err = s.dao.Filters(context.TODO(), w.ID)
  92. if err != nil {
  93. log.Error("s.dao.Filters err(%v)", err)
  94. }
  95. }
  96. return notify.NewSub(w, s.dao, s.c)
  97. }
  98. func (s *Service) notifyproc() {
  99. for {
  100. time.Sleep(time.Minute)
  101. s.loadNotify()
  102. }
  103. }
  104. func key(group, topic string) string {
  105. return fmt.Sprintf("%s_%s", group, topic)
  106. }
  107. // Ping Service
  108. func (s *Service) Ping(c context.Context) (err error) {
  109. return s.dao.Ping(c)
  110. }
  111. // Close Service
  112. func (s *Service) Close() {
  113. s.dao.Close()
  114. }
  115. func (s *Service) retryproc() {
  116. for {
  117. fs, err := s.dao.LoadFailBk(context.TODO())
  118. if err != nil {
  119. log.Error("s.loadFailBk err (%v)", err)
  120. time.Sleep(time.Minute)
  121. continue
  122. }
  123. for _, f := range fs {
  124. if n, ok := s.subs[key(f.Group, f.Topic)]; ok && !n.Closed() {
  125. n.AddRty(f.Msg, f.ID, f.Index)
  126. }
  127. }
  128. time.Sleep(time.Minute * 10)
  129. }
  130. }