dao.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/service/bbq/user/api"
  5. "go-common/library/log"
  6. "go-common/library/sync/pipeline/fanout"
  7. notice "go-common/app/service/bbq/notice-service/api/v1"
  8. "go-common/app/service/bbq/user/internal/conf"
  9. acc "go-common/app/service/main/account/api"
  10. filter "go-common/app/service/main/filter/api/grpc/v1"
  11. "go-common/library/cache/redis"
  12. xsql "go-common/library/database/sql"
  13. "go-common/library/net/rpc/warden"
  14. )
  15. // Dao dao
  16. type Dao struct {
  17. c *conf.Config
  18. cache *fanout.Fanout
  19. redis *redis.Pool
  20. db *xsql.DB
  21. accountClient acc.AccountClient
  22. noticeClient notice.NoticeClient
  23. filterClient filter.FilterClient
  24. }
  25. //go:generate $GOPATH/src/go-common/app/tool/cache/gen
  26. type _cache interface {
  27. // cache: -batch=10 -max_group=10 -batch_err=break -nullcache=&api.UserBase{Mid:-1} -check_null_code=$==nil||$.Mid==-1
  28. UserBase(c context.Context, mid []int64) (map[int64]*api.UserBase, error)
  29. }
  30. // New init mysql db
  31. func New(c *conf.Config) (dao *Dao) {
  32. dao = &Dao{
  33. c: c,
  34. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  35. redis: redis.NewPool(c.Redis),
  36. db: xsql.NewMySQL(c.MySQL),
  37. accountClient: newAccountClient(c.GRPCClient["account"]),
  38. noticeClient: newNoticeClient(c.GRPCClient["notice"]),
  39. filterClient: newFilterClient(c.GRPCClient["filter"]),
  40. }
  41. return
  42. }
  43. // newNoticeClient .
  44. func newFilterClient(cfg *conf.GRPCConf) filter.FilterClient {
  45. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  46. if err != nil {
  47. panic(err)
  48. }
  49. return filter.NewFilterClient(cc)
  50. }
  51. // newNoticeClient .
  52. func newNoticeClient(cfg *conf.GRPCConf) notice.NoticeClient {
  53. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  54. if err != nil {
  55. panic(err)
  56. }
  57. return notice.NewNoticeClient(cc)
  58. }
  59. //newAccountClient .
  60. func newAccountClient(cfg *conf.GRPCConf) acc.AccountClient {
  61. cc, err := warden.NewClient(cfg.WardenConf).Dial(context.Background(), cfg.Addr)
  62. if err != nil {
  63. panic(err)
  64. }
  65. return acc.NewAccountClient(cc)
  66. }
  67. // Close close the resource.
  68. func (d *Dao) Close() {
  69. d.redis.Close()
  70. d.db.Close()
  71. }
  72. // Ping dao ping
  73. func (d *Dao) Ping(ctx context.Context) error {
  74. // TODO: add mc,redis... if you use
  75. return d.db.Ping(ctx)
  76. }
  77. // BeginTran begin mysql transaction
  78. func (d *Dao) BeginTran(c context.Context) (*xsql.Tx, error) {
  79. return d.db.Begin(c)
  80. }
  81. // CreateNotice 创建通知
  82. func (d *Dao) CreateNotice(ctx context.Context, notice *notice.NoticeBase) (err error) {
  83. _, err = d.noticeClient.CreateNotice(ctx, notice)
  84. if err != nil {
  85. log.Errorv(ctx, log.KV("log", "create notice fail: notice="+notice.String()))
  86. return
  87. }
  88. log.V(1).Infov(ctx, log.KV("log", "create notice: notice="+notice.String()))
  89. return
  90. }
  91. // Filter .
  92. func (d *Dao) Filter(ctx context.Context, content string, area string) (level int32, err error) {
  93. req := new(filter.FilterReq)
  94. req.Message = content
  95. req.Area = area
  96. reply, err := d.filterClient.Filter(ctx, req)
  97. if err != nil {
  98. log.Errorv(ctx, log.KV("log", "filter fail : req="+req.String()))
  99. return
  100. }
  101. level = reply.Level
  102. log.V(1).Infov(ctx, log.KV("log", "get filter reply="+reply.String()))
  103. return
  104. }