extension.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/bbq/topic/api"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. )
  11. const (
  12. _selectExtension = "select svid, content from extension where svid in (%s)"
  13. _insertExtension = "insert ignore into extension (`svid`,`type`,`content`) values (?,?,?)"
  14. )
  15. const (
  16. _videoExtensionKey = "ext:%d"
  17. )
  18. // RawVideoExtension 从mysql获取extension
  19. func (d *Dao) RawVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) {
  20. res = make(map[int64]*api.VideoExtension)
  21. if len(svids) == 0 {
  22. return
  23. }
  24. querySQL := fmt.Sprintf(_selectExtension, xstr.JoinInts(svids))
  25. rows, err := d.db.Query(ctx, querySQL)
  26. if err != nil {
  27. log.Errorw(ctx, "log", "get extension error", "err", err, "sql", querySQL)
  28. return
  29. }
  30. defer rows.Close()
  31. var svid int64
  32. var content string
  33. for rows.Next() {
  34. if err = rows.Scan(&svid, &content); err != nil {
  35. log.Errorw(ctx, "log", "get extension from mysql fail", "sql", querySQL)
  36. return
  37. }
  38. // 由于数据库中的数据和缓存中还不太一样,因此这里需要对db读取的数据进行额外处理
  39. var extension api.Extension
  40. json.Unmarshal([]byte(content), &extension.TitleExtra)
  41. // TODO:check
  42. log.V(10).Infow(ctx, "log", "unmarshal content", "result", extension)
  43. data, _ := json.Marshal(&extension)
  44. res[svid] = &api.VideoExtension{Svid: svid, Extension: string(data)}
  45. }
  46. log.V(1).Infow(ctx, "log", "get extension", "req", svids, "rsp_size", len(res))
  47. return
  48. }
  49. // CacheVideoExtension 从缓存获取extension
  50. func (d *Dao) CacheVideoExtension(ctx context.Context, svids []int64) (res map[int64]*api.VideoExtension, err error) {
  51. res = make(map[int64]*api.VideoExtension)
  52. conn := d.redis.Get(ctx)
  53. defer conn.Close()
  54. for _, svid := range svids {
  55. conn.Send("GET", fmt.Sprintf(_videoExtensionKey, svid))
  56. }
  57. conn.Flush()
  58. var data string
  59. for _, svid := range svids {
  60. if data, err = redis.String(conn.Receive()); err != nil {
  61. if err == redis.ErrNil {
  62. err = nil
  63. } else {
  64. log.Errorv(ctx, log.KV("event", "redis_get"), log.KV("svid", svid))
  65. }
  66. continue
  67. }
  68. extension := new(api.VideoExtension)
  69. extension.Svid = svid
  70. extension.Extension = data
  71. res[extension.Svid] = extension
  72. }
  73. log.Infov(ctx, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
  74. return
  75. }
  76. // AddCacheVideoExtension 添加extension缓存
  77. func (d *Dao) AddCacheVideoExtension(ctx context.Context, extensions map[int64]*api.VideoExtension) (err error) {
  78. conn := d.redis.Get(ctx)
  79. defer conn.Close()
  80. for svid, value := range extensions {
  81. conn.Send("SET", fmt.Sprintf(_videoExtensionKey, svid), value.Extension, "EX", d.topicExpire)
  82. }
  83. conn.Flush()
  84. for i := 0; i < len(extensions); i++ {
  85. conn.Receive()
  86. }
  87. log.Infov(ctx, log.KV("event", "redis_set"), log.KV("row_num", len(extensions)))
  88. return
  89. }
  90. // DelCacheVideoExtension 删除extension缓存
  91. func (d *Dao) DelCacheVideoExtension(ctx context.Context, svid int64) {
  92. var key = fmt.Sprintf(_videoExtensionKey, svid)
  93. conn := d.redis.Get(ctx)
  94. defer conn.Close()
  95. conn.Do("DEL", key)
  96. }
  97. // InsertExtension 插入extension到db
  98. func (d *Dao) InsertExtension(ctx context.Context, svid int64, extensionType int64, extension *api.Extension) (rowsAffected int64, err error) {
  99. data, _ := json.Marshal(extension.TitleExtra)
  100. res, err := d.db.Exec(ctx, _insertExtension, svid, extensionType, string(data))
  101. if err != nil {
  102. log.Errorw(ctx, "log", "insert extension db fail", "svid", svid, "extension_type", extensionType, "extension", extensionType)
  103. return
  104. }
  105. rowsAffected, tmpErr := res.RowsAffected()
  106. if tmpErr != nil {
  107. log.Warnw(ctx, "log", "get rows affected fail")
  108. }
  109. d.DelCacheVideoExtension(ctx, svid)
  110. return
  111. }