service.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package unicom
  2. import (
  3. "sync"
  4. "time"
  5. "go-common/app/interface/main/app-wall/conf"
  6. accdao "go-common/app/interface/main/app-wall/dao/account"
  7. liveDao "go-common/app/interface/main/app-wall/dao/live"
  8. seqDao "go-common/app/interface/main/app-wall/dao/seq"
  9. shopDao "go-common/app/interface/main/app-wall/dao/shopping"
  10. unicomDao "go-common/app/interface/main/app-wall/dao/unicom"
  11. "go-common/app/interface/main/app-wall/model/unicom"
  12. "go-common/library/queue/databus"
  13. "go-common/library/stat/prom"
  14. )
  15. const (
  16. _initIPlimitKey = "iplimit_%v_%v"
  17. )
  18. type Service struct {
  19. c *conf.Config
  20. dao *unicomDao.Dao
  21. live *liveDao.Dao
  22. seqdao *seqDao.Dao
  23. accd *accdao.Dao
  24. shop *shopDao.Dao
  25. tick time.Duration
  26. unicomIpCache []*unicom.UnicomIP
  27. unicomIpSQLCache map[string]*unicom.UnicomIP
  28. operationIPlimit map[string]struct{}
  29. unicomPackCache []*unicom.UserPack
  30. // infoc
  31. logCh chan interface{}
  32. packCh chan interface{}
  33. packLogCh chan interface{}
  34. userBindCh chan interface{}
  35. // waiter
  36. waiter sync.WaitGroup
  37. // databus
  38. userbindPub *databus.Databus
  39. // prom
  40. pHit *prom.Prom
  41. pMiss *prom.Prom
  42. }
  43. func New(c *conf.Config) (s *Service) {
  44. s = &Service{
  45. c: c,
  46. dao: unicomDao.New(c),
  47. live: liveDao.New(c),
  48. seqdao: seqDao.New(c),
  49. accd: accdao.New(c),
  50. shop: shopDao.New(c),
  51. tick: time.Duration(c.Tick),
  52. unicomIpCache: []*unicom.UnicomIP{},
  53. unicomIpSQLCache: map[string]*unicom.UnicomIP{},
  54. operationIPlimit: map[string]struct{}{},
  55. unicomPackCache: []*unicom.UserPack{},
  56. // databus
  57. userbindPub: databus.New(c.UnicomDatabus),
  58. // infoc
  59. logCh: make(chan interface{}, 1024),
  60. packCh: make(chan interface{}, 1024),
  61. packLogCh: make(chan interface{}, 1024),
  62. userBindCh: make(chan interface{}, 1024),
  63. // prom
  64. pHit: prom.CacheHit,
  65. pMiss: prom.CacheMiss,
  66. }
  67. // now := time.Now()
  68. s.loadIPlimit(c)
  69. s.loadUnicomIP()
  70. // s.loadUnicomIPOrder(now)
  71. s.loadUnicomPacks()
  72. // s.loadUnicomFlow()
  73. go s.loadproc()
  74. go s.unicomInfocproc()
  75. go s.unicomPackInfocproc()
  76. go s.addUserPackLogproc()
  77. s.waiter.Add(1)
  78. go s.userbindConsumer()
  79. return
  80. }
  81. // cacheproc load cache
  82. func (s *Service) loadproc() {
  83. for {
  84. time.Sleep(s.tick)
  85. // now := time.Now()
  86. s.loadUnicomIP()
  87. // s.loadUnicomIPOrder(now)
  88. s.loadUnicomPacks()
  89. // s.loadUnicomFlow()
  90. }
  91. }