topic.go 12 KB


  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/service/bbq/topic/api"
  7. "go-common/app/service/bbq/topic/internal/model"
  8. "go-common/library/cache/redis"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "go-common/library/sync/errgroup.v2"
  12. "go-common/library/xstr"
  13. "strings"
  14. )
  15. const (
  16. _selectTopic = "select `id`, `name`, `desc`, `state` from topic where id in (%s)"
  17. _insertUpdateTopic = "insert into topic (`name`,`score`,`state`,`video_num`) values %s on duplicate key update `video_num`=`video_num`+1"
  18. _selectTopicID = "select id, name from topic where name in (%s)"
  19. _selectDiscoveryTopic = "select id from topic where state=0 %s order by score desc, id desc limit %d, %d"
  20. _selectUnavailabelTopic = "select id from topic where state=1 limit %d,%d"
  21. _updateTopicField = "update topic set `%s` = ? where `id` = ?"
  22. )
  23. const (
  24. _topicKey = "topic:%d"
  25. )
  26. // RawTopicInfo 从mysql获取topic info
  27. func (d *Dao) RawTopicInfo(ctx context.Context, topicIDs []int64) (res map[int64]*api.TopicInfo, err error) {
  28. res = make(map[int64]*api.TopicInfo)
  29. if len(topicIDs) == 0 {
  30. return
  31. }
  32. querySQL := fmt.Sprintf(_selectTopic, xstr.JoinInts(topicIDs))
  33. rows, err := d.db.Query(ctx, querySQL)
  34. if err != nil {
  35. log.Errorw(ctx, "log", "get topic error", "err", err, "sql", querySQL)
  36. return
  37. }
  38. defer rows.Close()
  39. for rows.Next() {
  40. topicInfo := new(api.TopicInfo)
  41. if err = rows.Scan(&topicInfo.TopicId, &topicInfo.Name, &topicInfo.Desc, &topicInfo.State); err != nil {
  42. log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL)
  43. return
  44. }
  45. topicInfo.CoverUrl = "http://i0.hdslb.com/bfs/bbq/video-image/userface/155886860_1547729941"
  46. res[topicInfo.TopicId] = topicInfo
  47. }
  48. log.V(1).Infow(ctx, "log", "get topic", "req", topicIDs, "rsp_size", len(res))
  49. return
  50. }
  51. // CacheTopicInfo 从缓存获取topic info
  52. func (d *Dao) CacheTopicInfo(ctx context.Context, topicIDs []int64) (res map[int64]*api.TopicInfo, err error) {
  53. res = make(map[int64]*api.TopicInfo)
  54. keys := make([]string, 0, len(topicIDs))
  55. keyMidMap := make(map[int64]bool, len(topicIDs))
  56. for _, topicID := range topicIDs {
  57. key := fmt.Sprintf(_topicKey, topicID)
  58. if _, exist := keyMidMap[topicID]; !exist {
  59. // duplicate mid
  60. keyMidMap[topicID] = true
  61. keys = append(keys, key)
  62. }
  63. }
  64. conn := d.redis.Get(ctx)
  65. defer conn.Close()
  66. for _, key := range keys {
  67. conn.Send("GET", key)
  68. }
  69. conn.Flush()
  70. var data []byte
  71. for i := 0; i < len(keys); i++ {
  72. if data, err = redis.Bytes(conn.Receive()); err != nil {
  73. if err == redis.ErrNil {
  74. err = nil
  75. } else {
  76. log.Errorv(ctx, log.KV("event", "redis_get"), log.KV("key", keys[i]))
  77. }
  78. continue
  79. }
  80. topicInfo := new(api.TopicInfo)
  81. json.Unmarshal(data, topicInfo)
  82. res[topicInfo.TopicId] = topicInfo
  83. }
  84. log.Infov(ctx, log.KV("event", "redis_get"), log.KV("row_num", len(res)))
  85. return
  86. }
  87. // AddCacheTopicInfo 添加topic info缓存
  88. func (d *Dao) AddCacheTopicInfo(ctx context.Context, topicInfos map[int64]*api.TopicInfo) (err error) {
  89. keyValueMap := make(map[string][]byte, len(topicInfos))
  90. for topicID, topicInfo := range topicInfos {
  91. key := fmt.Sprintf(_topicKey, topicID)
  92. if _, exist := keyValueMap[key]; !exist {
  93. data, _ := json.Marshal(topicInfo)
  94. keyValueMap[key] = data
  95. }
  96. }
  97. conn := d.redis.Get(ctx)
  98. defer conn.Close()
  99. for key, value := range keyValueMap {
  100. conn.Send("SET", key, value, "EX", d.topicExpire)
  101. }
  102. conn.Flush()
  103. for i := 0; i < len(keyValueMap); i++ {
  104. conn.Receive()
  105. }
  106. log.Infov(ctx, log.KV("event", "redis_set"), log.KV("row_num", len(topicInfos)))
  107. return
  108. }
  109. // DelCacheTopicInfo 删除topic info缓存
  110. func (d *Dao) DelCacheTopicInfo(ctx context.Context, topicID int64) {
  111. var key = fmt.Sprintf(_topicKey, topicID)
  112. conn := d.redis.Get(ctx)
  113. defer conn.Close()
  114. conn.Do("DEL", key)
  115. }
  116. // InsertTopics 插入话题
  117. func (d *Dao) InsertTopics(ctx context.Context, topics map[string]*api.TopicInfo) (newTopics map[string]*api.TopicInfo, err error) {
  118. //func (d *Dao) InsertTopics(ctx context.Context, topics map[string]int64) (err error) {
  119. newTopics = make(map[string]*api.TopicInfo)
  120. // 0. check
  121. if len(topics) == 0 {
  122. return
  123. }
  124. if len(topics) > model.MaxBatchLen {
  125. err = ecode.TopicNumTooManyErr
  126. return
  127. }
  128. // 长度校验
  129. for _, item := range topics {
  130. if strings.Count(item.Name, "")-1 > model.MaxTopicNameLen {
  131. err = ecode.TopicNameLenErr
  132. log.Errorw(ctx, "log", "topic name len too long", "name", item.Name)
  133. return
  134. }
  135. }
  136. // 1. 插入更新
  137. group := errgroup.WithCancel(ctx)
  138. group.GOMAXPROCS(5)
  139. var groupInsertTopic = func(topicInfo *api.TopicInfo) {
  140. group.Go(func(ctx context.Context) (err error) {
  141. topicID, err := d.insertTopic(ctx, topicInfo)
  142. if err != nil {
  143. log.Warnw(ctx, "log", "get topic videos fail", "topic_name", topicInfo.Name)
  144. return
  145. }
  146. if topicID == 0 {
  147. log.Errorw(ctx, "log", "get error topic_id", "name", topicInfo.Name)
  148. err = ecode.TopicInsertErr
  149. return
  150. }
  151. topicInfo.TopicId = topicID
  152. return
  153. })
  154. }
  155. for _, topic := range topics {
  156. groupInsertTopic(topic)
  157. }
  158. err = group.Wait()
  159. if err != nil {
  160. log.Warnw(ctx, "log", "do group insert topic fail")
  161. return
  162. }
  163. // 由于insert的时候会返回ID,所以直接赋值返回
  164. newTopics = topics
  165. return
  166. }
  167. // insertTopic 插入话题
  168. func (d *Dao) insertTopic(ctx context.Context, topicInfo *api.TopicInfo) (topicID int64, err error) {
  169. //func (d *Dao) InsertTopics(ctx context.Context, topics map[string]int64) (err error) {
  170. // 0. check
  171. // 长度校验
  172. if strings.Count(topicInfo.Name, "")-1 > model.MaxTopicNameLen {
  173. err = ecode.TopicNameLenErr
  174. log.Errorw(ctx, "log", "topic name len too long", "name", topicInfo.Name)
  175. return
  176. }
  177. var str string
  178. // 1. 插入更新
  179. str += fmt.Sprintf("('%s',%f,%d,1)", topicInfo.Name, topicInfo.Score, topicInfo.State)
  180. insertSQL := fmt.Sprintf(_insertUpdateTopic, str)
  181. log.V(1).Infow(ctx, "sql", insertSQL)
  182. res, err := d.db.Exec(ctx, insertSQL)
  183. if err != nil {
  184. log.Errorw(ctx, "log", "insert topic fail", "topic_name", topicInfo.Name)
  185. return
  186. }
  187. topicID, err = res.LastInsertId()
  188. if err != nil {
  189. log.Errorw(ctx, "log", "insert topic fail", "topic_name", topicInfo.Name)
  190. return
  191. }
  192. return
  193. }
  194. // UpdateTopic 更新话题,当前有简介和状态
  195. // 这个函数把操作权其实已经交给上层了,设计上不是个好设计,但是在于避免重复代码
  196. func (d *Dao) UpdateTopic(ctx context.Context, topicID int64, field string, value interface{}) (err error) {
  197. if field != "desc" && field != "state" {
  198. return ecode.ReqParamErr
  199. }
  200. querySQL := fmt.Sprintf(_updateTopicField, field)
  201. _, err = d.db.Exec(ctx, querySQL, value, topicID)
  202. if err != nil {
  203. log.Errorw(ctx, "log", "update topic field fail", "field", field, "value", value, "topic_id", topicID)
  204. return
  205. }
  206. d.DelCacheTopicInfo(ctx, topicID)
  207. return
  208. }
  209. // TopicID 通过话题name获取话题ID
  210. // 话题ID结果存在topics中
  211. func (d *Dao) TopicID(ctx context.Context, names []string) (topics map[string]int64, err error) {
  212. topics = make(map[string]int64)
  213. if len(names) == 0 {
  214. return
  215. }
  216. if len(names) > model.MaxBatchLen {
  217. err = ecode.TopicNumTooManyErr
  218. return
  219. }
  220. querySQL := fmt.Sprintf(_selectTopicID, "\""+strings.Join(names, "\",\"")+"\"")
  221. log.V(1).Infow(ctx, "log", "select topic id", "sql", querySQL)
  222. rows, err := d.db.Query(ctx, querySQL)
  223. if err != nil {
  224. log.Errorw(ctx, "log", "get topic id error", "err", err, "sql", querySQL)
  225. return
  226. }
  227. defer rows.Close()
  228. var topicID int64
  229. var name string
  230. for rows.Next() {
  231. if err = rows.Scan(&topicID, &name); err != nil {
  232. log.Errorw(ctx, "log", "scan topic id error", "err", err, "sql", querySQL)
  233. return
  234. }
  235. topics[name] = topicID
  236. }
  237. log.V(1).Infow(ctx, "log", "get topic id", "req", names, "rsp", topics)
  238. return
  239. }
  240. // ListUnAvailableTopics .
  241. func (d *Dao) ListUnAvailableTopics(ctx context.Context, page int32, size int32) (list []int64, hasMore bool, err error) {
  242. hasMore = true
  243. // 0. check
  244. if page < 1 {
  245. err = ecode.TopicReqParamErr
  246. return
  247. }
  248. if page > model.MaxDiscoveryTopicPage {
  249. hasMore = false
  250. return
  251. }
  252. // 2. get list
  253. offset := (page - 1) * size
  254. querySQL := fmt.Sprintf(_selectUnavailabelTopic, offset, size)
  255. rows, err := d.db.Query(ctx, querySQL)
  256. if err != nil {
  257. log.Errorw(ctx, "log", "get topic video error", "err", err, "sql", querySQL)
  258. return
  259. }
  260. defer rows.Close()
  261. for rows.Next() {
  262. var topicID int64
  263. if err = rows.Scan(&topicID); err != nil {
  264. log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL)
  265. return
  266. }
  267. list = append(list, topicID)
  268. }
  269. // 3. 判断has_more
  270. if len(list) < int(size) {
  271. hasMore = false
  272. }
  273. return
  274. }
  275. // ListRankTopics 获取推荐的话题列表
  276. // TODO: 把置顶逻辑移上去
  277. func (d *Dao) ListRankTopics(ctx context.Context, page int32, size int32) (list []int64, hasMore bool, err error) {
  278. hasMore = true
  279. // 0. check
  280. if page < 1 {
  281. err = ecode.TopicReqParamErr
  282. return
  283. }
  284. if page > model.MaxDiscoveryTopicPage {
  285. hasMore = false
  286. return
  287. }
  288. // 1. 获取置顶数据s
  289. additionalConditionSQL := ""
  290. stickList, err := d.GetStickTopic(ctx)
  291. if err != nil {
  292. log.Warnw(ctx, "log", "get stick topic fail")
  293. } else if len(stickList) > 0 {
  294. additionalConditionSQL = fmt.Sprintf("and id not in (%s)", xstr.JoinInts(stickList))
  295. }
  296. // 2. 若page=1,则获取推荐
  297. if page == 1 {
  298. list = stickList
  299. }
  300. // 3. 根据page获取话题列表
  301. offset := (page - 1) * size
  302. querySQL := fmt.Sprintf(_selectDiscoveryTopic, additionalConditionSQL, offset, size)
  303. log.Infow(ctx, "sql", querySQL, "page", page, "size", size)
  304. rows, err := d.db.Query(ctx, querySQL)
  305. if err != nil {
  306. log.Errorw(ctx, "log", "get topic video error", "err", err, "sql", querySQL)
  307. return
  308. }
  309. defer rows.Close()
  310. for rows.Next() {
  311. var topicID int64
  312. if err = rows.Scan(&topicID); err != nil {
  313. log.Errorw(ctx, "log", "get topic from mysql fail", "sql", querySQL)
  314. return
  315. }
  316. list = append(list, topicID)
  317. }
  318. // 4. 判断has_more
  319. if len(list) < int(size) {
  320. hasMore = false
  321. }
  322. return
  323. }
  324. // GetStickTopic 获取置顶视频
  325. // TODO: 这个方式是临时之计,当qps增大时,会导致热点的产生
  326. func (d *Dao) GetStickTopic(ctx context.Context) (list []int64, err error) {
  327. return d.getRedisList(ctx, model.RedisStickTopicKey)
  328. }
  329. func (d *Dao) setStickTopic(ctx context.Context, list []int64) (err error) {
  330. return d.setRedisList(ctx, model.RedisStickTopicKey, list)
  331. }
  332. // StickTopic .
  333. func (d *Dao) StickTopic(ctx context.Context, opTopicID, op int64) (err error) {
  334. // 0. check
  335. info, err := d.TopicInfo(ctx, []int64{opTopicID})
  336. if err != nil {
  337. log.Warnw(ctx, "log", "get topic info fail", "topic_id", opTopicID)
  338. return
  339. }
  340. topicInfo, exists := info[opTopicID]
  341. if !exists {
  342. log.Errorw(ctx, "log", "stick topic fail due to error topic_id", "topic_id", opTopicID)
  343. err = ecode.TopicIDNotFound
  344. return
  345. }
  346. if topicInfo.State != api.TopicStateAvailable {
  347. log.Errorw(ctx, "log", "topic state unavailable to do sticking", "state", topicInfo.State, "topic_id", opTopicID)
  348. err = ecode.TopicStateErr
  349. return
  350. }
  351. // 1. 获取stick topic
  352. stickList, err := d.GetStickTopic(ctx)
  353. if err != nil {
  354. log.Warnw(ctx, "log", "get stick topic fail")
  355. return
  356. }
  357. // 2. 操作stick topic
  358. var newStickList []int64
  359. if op != 0 {
  360. newStickList = append(newStickList, opTopicID)
  361. }
  362. for _, stickTopicID := range stickList {
  363. if stickTopicID != opTopicID {
  364. newStickList = append(newStickList, stickTopicID)
  365. }
  366. }
  367. if len(newStickList) > model.MaxStickTopicNum {
  368. newStickList = newStickList[:model.MaxStickTopicNum]
  369. }
  370. // 3. 更新stick topic
  371. err = d.setStickTopic(ctx, newStickList)
  372. if err != nil {
  373. log.Warnw(ctx, "update stick topic fail")
  374. return
  375. }
  376. return
  377. }