service.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package unicom
  2. import (
  3. "sync"
  4. "time"
  5. "go-common/app/job/main/app-wall/conf"
  6. seqDao "go-common/app/job/main/app-wall/dao/seq"
  7. unicomDao "go-common/app/job/main/app-wall/dao/unicom"
  8. "go-common/app/job/main/app-wall/model/unicom"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. "go-common/library/stat/prom"
  12. )
  13. type Service struct {
  14. c *conf.Config
  15. dao *unicomDao.Dao
  16. seqdao *seqDao.Dao
  17. clickSub *databus.Databus
  18. closed bool
  19. // waiter
  20. waiter sync.WaitGroup
  21. cliChan []chan *unicom.ClickMsg
  22. dbcliChan []chan *unicom.UserBind
  23. // infoc
  24. logCh []chan interface{}
  25. packCh chan interface{}
  26. packLogCh chan interface{}
  27. integralLogCh []chan interface{}
  28. // prom
  29. pHit *prom.Prom
  30. pMiss *prom.Prom
  31. // tick
  32. tick time.Duration
  33. lastmonth map[int]bool
  34. }
  35. func New(c *conf.Config) (s *Service) {
  36. s = &Service{
  37. c: c,
  38. dao: unicomDao.New(c),
  39. clickSub: databus.New(c.ReportDatabus),
  40. seqdao: seqDao.New(c),
  41. // infoc
  42. packCh: make(chan interface{}, 1024),
  43. packLogCh: make(chan interface{}, 1024),
  44. // close
  45. closed: false,
  46. // prom
  47. pHit: prom.CacheHit,
  48. pMiss: prom.CacheMiss,
  49. lastmonth: map[int]bool{},
  50. // tick
  51. tick: time.Duration(c.Tick),
  52. }
  53. for i := int64(0); i < s.c.ChanNum; i++ {
  54. s.cliChan = append(s.cliChan, make(chan *unicom.ClickMsg, 300000))
  55. }
  56. for i := int64(0); i < s.c.ChanDBNum; i++ {
  57. s.dbcliChan = append(s.dbcliChan, make(chan *unicom.UserBind, 1024))
  58. s.integralLogCh = append(s.integralLogCh, make(chan interface{}, 1024))
  59. s.logCh = append(s.logCh, make(chan interface{}, 1024))
  60. }
  61. for i := int64(0); i < s.c.ChanNum; i++ {
  62. s.waiter.Add(1)
  63. go s.cliChanProc(i)
  64. }
  65. for i := int64(0); i < s.c.ChanDBNum; i++ {
  66. s.waiter.Add(1)
  67. go s.unicomInfocproc(i)
  68. go s.addUserIntegralLogproc(i)
  69. }
  70. for i := int64(0); i < s.c.ChanDBNum; i++ {
  71. s.waiter.Add(1)
  72. go s.dbcliChanProc(i)
  73. }
  74. s.waiter.Add(1)
  75. go s.clickConsumer()
  76. s.waiter.Add(1)
  77. now := time.Now()
  78. if s.c.Monthly {
  79. // s.updatemonth(now)
  80. s.upBindAll()
  81. }
  82. s.waiter.Add(1)
  83. s.loadUnicomIPOrder(now)
  84. s.loadUnicomFlow()
  85. go s.loadproc()
  86. s.waiter.Add(1)
  87. go s.unicomPackInfocproc()
  88. go s.addUserPackLogproc()
  89. return
  90. }
  91. // Close Service
  92. func (s *Service) Close() {
  93. s.closed = true
  94. time.Sleep(time.Second * 2)
  95. s.clickSub.Close()
  96. for i := 0; i < len(s.cliChan); i++ {
  97. close(s.cliChan[i])
  98. }
  99. for i := 0; i < len(s.dbcliChan); i++ {
  100. close(s.dbcliChan[i])
  101. close(s.integralLogCh[i])
  102. close(s.logCh[i])
  103. }
  104. s.waiter.Wait()
  105. log.Info("app-wall-job unicom flow closed.")
  106. }
  107. // cacheproc load cache
  108. func (s *Service) loadproc() {
  109. for {
  110. time.Sleep(s.tick)
  111. now := time.Now()
  112. s.loadUnicomFlow()
  113. s.updatemonth(now)
  114. s.loadUnicomIPOrder(now)
  115. }
  116. }