databus.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/videoup-report/model/archive"
  6. "go-common/app/job/main/videoup-report/model/manager"
  7. "go-common/library/log"
  8. )
  9. var (
  10. _archive = "archive"
  11. _video = "archive_video"
  12. _insertAct = "insert"
  13. _updateAct = "update"
  14. //_delete = "delete"
  15. )
  16. // consumer binlog
  17. func (s *Service) arcCanalConsume() {
  18. defer s.waiter.Done()
  19. var (
  20. msgs = s.archiveSub.Messages()
  21. err error
  22. )
  23. for {
  24. msg, ok := <-msgs
  25. if !ok {
  26. log.Error("s.archiveSub.Message closed", err)
  27. return
  28. }
  29. msg.Commit()
  30. m := &archive.Message{}
  31. if err = json.Unmarshal(msg.Value, m); err != nil {
  32. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  33. continue
  34. }
  35. log.Info("arcCanalConsume msg(%s)", msg.Value)
  36. log.Info("arcCanalConsume topic(%s) partition(%d) offset(%d) commit start", msg.Topic, msg.Partition, msg.Offset)
  37. if msg.Offset >= s.c.BeginOffset {
  38. log.Info("arcCanalConsume offset(%d) is hit BeginOffset(%d) and start track data", msg.Offset, s.c.BeginOffset)
  39. if m.Table == _archiveTable {
  40. s.putArcChan(m.Action, m.New, m.Old)
  41. }
  42. if m.Table == _videoTable {
  43. s.putVideoChan(m.Action, m.New, m.Old)
  44. }
  45. } else {
  46. log.Info("arcCanalConsume offset(%d) not hit BeginOffset(%d) and pass", msg.Offset, s.c.BeginOffset)
  47. }
  48. //todo 异步消费
  49. if m.Table == _video && m.Action == _updateAct {
  50. s.hdlVideoUpdateBinLog(m.New, m.Old)
  51. }
  52. if m.Table == _archive {
  53. s.hdlArchiveMessage(m.Action, m.New, m.Old)
  54. }
  55. }
  56. }
  57. func (s *Service) videoupConsumer() {
  58. defer s.waiter.Done()
  59. var (
  60. msgs = s.videoupSub.Messages()
  61. err error
  62. c = context.TODO()
  63. )
  64. for {
  65. msg, ok := <-msgs
  66. if !ok {
  67. log.Error("s.videoupSub.Messages closed")
  68. return
  69. }
  70. msg.Commit()
  71. m := &archive.VideoupMsg{}
  72. if err = json.Unmarshal(msg.Value, m); err != nil {
  73. log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
  74. return
  75. }
  76. log.Info("videoupMessage key(%s) value(%s) partition(%d) offset(%d) route(%s) commit start", msg.Key, msg.Value, msg.Partition, msg.Offset, m.Route)
  77. switch m.Route {
  78. case archive.RoutePostFirstRound:
  79. err = s.postFirstRound(c, m)
  80. case archive.RouteSecondRound:
  81. err = s.secondRound(c, m)
  82. case archive.RouteAddArchive:
  83. err = s.addArchive(c, m)
  84. case archive.RouteModifyArchive:
  85. err = s.modifyArchive(c, m)
  86. case archive.RouteAutoOpen:
  87. err = s.autoOpen(c, m)
  88. case archive.RouteDelayOpen:
  89. err = s.delayOpen(c, m)
  90. default:
  91. log.Warn("videoupConsumer unknown message route(%s)", m.Route)
  92. }
  93. if err == nil {
  94. log.Info("videoupMessage key(%s) value(%s) partition(%d) offset(%d) end", msg.Key, msg.Value, msg.Partition, msg.Offset)
  95. } else {
  96. log.Error("videoupMessage key(%s) value(%s) partition(%d) offset(%d) error(%v)", msg.Key, msg.Value, msg.Partition, msg.Offset, err)
  97. }
  98. }
  99. }
  100. // managerDBConsume 消费manager binlog
  101. func (s *Service) managerDBConsume() {
  102. defer s.waiter.Done()
  103. var (
  104. err error
  105. msgs = s.ManagerDBSub.Messages()
  106. )
  107. for {
  108. msg, open := <-msgs
  109. if !open {
  110. log.Info("managerDBConsume s.arcResultSub.Messages is closed")
  111. return
  112. }
  113. if msg == nil {
  114. continue
  115. }
  116. msg.Commit()
  117. log.Info("managerDBConsume consume key(%s) offset(%d) value(%s)", msg.Key, msg.Offset, string(msg.Value))
  118. m := &manager.BinMsg{}
  119. if err = json.Unmarshal(msg.Value, m); err != nil {
  120. log.Error("managerDBConsume json.Unmarshal error(%v)", err)
  121. continue
  122. }
  123. switch m.Table {
  124. case _upsTable:
  125. s.hdlManagerUpsBinlog(m)
  126. }
  127. }
  128. }