merge.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "hash/crc32"
  7. "sort"
  8. "strings"
  9. "time"
  10. "go-common/app/service/main/history/model"
  11. "go-common/library/log"
  12. "go-common/library/stat/prom"
  13. "go-common/library/sync/pipeline"
  14. )
  15. func (s *Service) serviceConsumeproc() {
  16. var (
  17. err error
  18. msgs = s.serviceHisSub.Messages()
  19. )
  20. for {
  21. msg, ok := <-msgs
  22. if !ok {
  23. log.Error("s.serviceConsumeproc closed")
  24. return
  25. }
  26. if s.c.Job.IgnoreMsg {
  27. err = msg.Commit()
  28. log.Info("serviceConsumeproc key:%s partition:%d offset:%d err: %+v, ts:%v ignore", msg.Key, msg.Partition, msg.Offset, err, msg.Timestamp)
  29. continue
  30. }
  31. ms := make([]*model.Merge, 0, 32)
  32. if err = json.Unmarshal(msg.Value, &ms); err != nil {
  33. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  34. continue
  35. }
  36. for _, x := range ms {
  37. key := fmt.Sprintf("%d-%d-%d", x.Mid, x.Bid, x.Kid)
  38. s.merge.SyncAdd(context.Background(), key, x)
  39. }
  40. err := msg.Commit()
  41. log.Info("serviceConsumeproc key:%s partition:%d offset:%d err: %+v, len(%v)", msg.Key, msg.Partition, msg.Offset, err, len(ms))
  42. }
  43. }
  44. func (s *Service) serviceFlush(merges []*model.Merge) {
  45. // 相同的mid聚合在一起
  46. sort.Slice(merges, func(i, j int) bool { return merges[i].Mid < merges[j].Mid })
  47. var ms []*model.Merge
  48. for _, m := range merges {
  49. if (len(ms) < s.c.Job.ServiceBatch) || (ms[len(ms)-1].Mid == m.Mid) {
  50. ms = append(ms, m)
  51. continue
  52. }
  53. s.FlushCache(context.Background(), ms)
  54. ms = []*model.Merge{m}
  55. }
  56. if len(ms) > 0 {
  57. s.FlushCache(context.Background(), ms)
  58. }
  59. }
  60. // FlushCache 数据从缓存写入到DB中
  61. func (s *Service) FlushCache(c context.Context, merges []*model.Merge) (err error) {
  62. var histories []*model.History
  63. if histories, err = s.dao.HistoriesCache(c, merges); err != nil {
  64. log.Error("historyDao.Cache(%+v) error(%v)", merges, err)
  65. return
  66. }
  67. prom.BusinessInfoCount.Add("histories-db", int64(len(histories)))
  68. if err = s.limit.WaitN(context.Background(), len(histories)); err != nil {
  69. log.Error("s.limit.WaitN(%v) err: %+v", len(histories), err)
  70. }
  71. for {
  72. if err = s.dao.AddHistories(c, histories); err != nil {
  73. prom.BusinessInfoCount.Add("retry", int64(len(histories)))
  74. time.Sleep(time.Duration(s.c.Job.RetryTime))
  75. continue
  76. }
  77. break
  78. }
  79. s.cache.Do(c, func(c context.Context) {
  80. for _, merge := range merges {
  81. limit := s.c.Job.CacheLen
  82. s.dao.TrimCache(context.Background(), merge.Business, merge.Mid, limit)
  83. }
  84. })
  85. return
  86. }
  87. func (s *Service) initMerge() {
  88. s.merge = pipeline.NewPipeline(s.c.Merge)
  89. s.merge.Split = func(a string) int {
  90. midStr := strings.Split(a, "-")[0]
  91. return int(crc32.ChecksumIEEE([]byte(midStr)))
  92. }
  93. s.merge.Do = func(c context.Context, ch int, values map[string][]interface{}) {
  94. var merges []*model.Merge
  95. for _, vs := range values {
  96. var t int64
  97. var m *model.Merge
  98. for _, v := range vs {
  99. prom.BusinessInfoCount.Incr("dbus-msg")
  100. if v.(*model.Merge).Time >= t {
  101. m = v.(*model.Merge)
  102. }
  103. }
  104. if m.Mid%1000 == 0 {
  105. log.Info("debug: merge mid:%v, ch:%v, value:%+v", m.Mid, ch, m)
  106. }
  107. merges = append(merges, m)
  108. }
  109. prom.BusinessInfoCount.Add(fmt.Sprintf("ch-%v", ch), int64(len(merges)))
  110. s.serviceFlush(merges)
  111. }
  112. s.merge.Start()
  113. }