dao.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package archive
  2. import (
  3. "context"
  4. "runtime"
  5. "time"
  6. hisrpc "go-common/app/interface/main/history/rpc/client"
  7. "go-common/app/interface/main/tv/conf"
  8. arcwar "go-common/app/service/main/archive/api"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. bm "go-common/library/net/http/blademaster"
  13. )
  14. // Dao is archive dao.
  15. type Dao struct {
  16. // cfg
  17. db *sql.DB
  18. conf *conf.Config
  19. relateURL string
  20. // memory
  21. arcTypes map[int32]*arcwar.Tp // map for arc types
  22. arcTypesRel map[int32][]*arcwar.Tp // map for relation between arc type
  23. // http client
  24. client *bm.Client
  25. // rpc
  26. arcClient arcwar.ArchiveClient
  27. hisRPC *hisrpc.Service
  28. // memcache
  29. arcMc *memcache.Pool
  30. expireRlt int32
  31. expireArc int32
  32. expireView int32
  33. // chan
  34. mCh chan func()
  35. }
  36. // New new a archive dao.
  37. func New(c *conf.Config) (d *Dao) {
  38. d = &Dao{
  39. // http client
  40. client: bm.NewClient(c.HTTPClient),
  41. // rpc
  42. hisRPC: hisrpc.New(c.HisRPC),
  43. // cfg
  44. relateURL: c.Host.Data + _realteURL,
  45. conf: c,
  46. db: sql.NewMySQL(c.Mysql),
  47. // memorry
  48. arcTypes: make(map[int32]*arcwar.Tp),
  49. arcTypesRel: make(map[int32][]*arcwar.Tp),
  50. // memcache
  51. arcMc: memcache.NewPool(c.Memcache.Config),
  52. expireRlt: int32(time.Duration(c.Memcache.RelateExpire) / time.Second),
  53. expireView: int32(time.Duration(c.Memcache.ViewExpire) / time.Second),
  54. expireArc: int32(time.Duration(c.Memcache.ArcExpire) / time.Second),
  55. // mc proc
  56. mCh: make(chan func(), 10240),
  57. }
  58. var err error
  59. if d.arcClient, err = arcwar.NewClient(c.ArcClient); err != nil {
  60. panic(err)
  61. }
  62. // video db
  63. for i := 0; i < runtime.NumCPU()*2; i++ {
  64. go d.cacheproc()
  65. }
  66. d.loadTypes(context.Background())
  67. return
  68. }
  69. // addCache add archive to mc or redis
  70. func (d *Dao) addCache(f func()) {
  71. select {
  72. case d.mCh <- f:
  73. default:
  74. log.Warn("cacheproc chan full")
  75. }
  76. }
  77. // cacheproc write memcache and stat redis use goroutine
  78. func (d *Dao) cacheproc() {
  79. for {
  80. f := <-d.mCh
  81. f()
  82. }
  83. }