dao.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/common/live/library/lrucache"
  6. jsonitor "github.com/json-iterator/go"
  7. v1pb "go-common/app/service/live/dao-anchor/api/grpc/v1"
  8. "go-common/app/service/live/dao-anchor/conf"
  9. "go-common/library/cache/redis"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "go-common/library/sync/errgroup"
  15. )
  16. const (
  17. INFO_ROOM = 1 << iota
  18. INFO_ROOM_EXT
  19. INFO_TAG
  20. INFO_ANCHOR
  21. INFO_SHORT_ID
  22. INFO_AREA_INFO
  23. )
  24. const (
  25. INFO_ALL = (INFO_ROOM | INFO_ROOM_EXT | INFO_TAG | INFO_ANCHOR | INFO_SHORT_ID | INFO_AREA_INFO)
  26. )
  27. const (
  28. FETCH_PAGE_SIZE = 100
  29. )
  30. //消费类型常量 定义
  31. const (
  32. //弹幕
  33. //DANMU_NUM 当前弹幕累计数量
  34. DANMU_NUM = "danmu_num"
  35. //DANMU_MINUTE_NUM_15 最近15分钟弹幕数量
  36. DANMU_MINUTE_NUM_15 = "danmu_minute_num_15"
  37. //DANMU_MINUTE_NUM_30 ...
  38. DANMU_MINUTE_NUM_30 = "danmu_minute_num_30"
  39. //DANMU_MINUTE_NUM_45 ...
  40. DANMU_MINUTE_NUM_45 = "danmu_minute_num_45"
  41. //DANMU_MINUTE_NUM_60 ...
  42. DANMU_MINUTE_NUM_60 = "danmu_minute_num_60"
  43. //人气
  44. //POPULARITY 当前实时人气
  45. POPULARITY = "popularity"
  46. //POPULARITY_MAX_TO_ARG_7 7日峰值人气的均值
  47. POPULARITY_MAX_TO_ARG_7 = "popularity_max_to_avg_7"
  48. //POPULARITY_MAX_TO_ARG_30 30日人气峰值的均值
  49. POPULARITY_MAX_TO_ARG_30 = "popularity_max_to_avg_30"
  50. //送礼
  51. //GIFT_NUM 实时送礼数
  52. GIFT_NUM = "gift_num_current_total"
  53. //GIFT_GOLD_AMOUNT 实时消费金瓜子数
  54. GIFT_GOLD_NUM = "gift_gold_num"
  55. //GIFT_GOLD_AMOUNT 实时消费金瓜子金额
  56. GIFT_GOLD_AMOUNT = "gift_gold_amount"
  57. //GIFT_GOLD_AMOUNT_MINUTE_15 最近15分钟金瓜子金额
  58. GIFT_GOLD_AMOUNT_MINUTE_15 = "gift_gold_num_minute_15"
  59. //GIFT_GOLD_AMOUNT_MINUTE_30 最近30分钟金瓜子金额
  60. GIFT_GOLD_AMOUNT_MINUTE_30 = "gift_gold_num_minute_30"
  61. //GIFT_GOLD_AMOUNT_MINUTE_45 ...
  62. GIFT_GOLD_AMOUNT_MINUTE_45 = "gift_gold_num_minute_45"
  63. //GIFT_GOLD_AMOUNT_MINUTE_60 ...
  64. GIFT_GOLD_AMOUNT_MINUTE_60 = "gift_gold_num_minute_60"
  65. //有效开播天数
  66. //VALID_LIVE_DAYS_TYPE_1_DAY_7 7日内有效开播天数;有效开播:一次开播大于5分钟
  67. VALID_LIVE_DAYS_TYPE_1_DAY_7 = "valid_days_type_1_day_7"
  68. //VALID_LIVE_DAYS_TYPE_1_DAY_14 14日内有效开播天数;有效开播:一次开播大于5分钟
  69. VALID_LIVE_DAYS_TYPE_1_DAY_14 = "valid_days_type_1_day_14"
  70. //VALID_LIVE_DAYS_TYPE_2_DAY_7 7日内有效开播天数;有效开播:大于等于120分钟
  71. VALID_LIVE_DAYS_TYPE_2_DAY_7 = "valid_days_type_2_day_7"
  72. //VALID_LIVE_DAYS_TYPE_2_DAY_30 14日内有效开播天数;有效开播:大于等于120分钟
  73. VALID_LIVE_DAYS_TYPE_2_DAY_30 = "valid_days_type_2_day_30"
  74. //房间状态
  75. //ROOM_TAG_CURRENT 房间实时标签
  76. ROOM_TAG_CURRENT = "room_tag_current"
  77. //榜单
  78. //RANK_LIST_CURRENT 排行榜相关数据
  79. RANK_LIST_CURRENT = "rank_list_current"
  80. //DAU
  81. DAU = "dau"
  82. )
  83. const (
  84. _RoomIdMappingCacheCapacity = 1024
  85. )
  86. // Dao dao
  87. type Dao struct {
  88. c *conf.Config
  89. redis *redis.Pool
  90. db *xsql.DB
  91. dbLiveApp *xsql.DB
  92. shortIDMapping *lrucache.SyncCache
  93. areaInfoMapping *lrucache.SyncCache
  94. }
  95. // New init mysql db
  96. func New(c *conf.Config) (dao *Dao) {
  97. dao = &Dao{
  98. c: c,
  99. redis: redis.NewPool(c.Redis),
  100. db: xsql.NewMySQL(c.MySQL),
  101. dbLiveApp: xsql.NewMySQL(c.LiveAppMySQL),
  102. shortIDMapping: lrucache.NewSyncCache(c.LRUCache.Bucket, c.LRUCache.Capacity, c.LRUCache.Timeout),
  103. areaInfoMapping: lrucache.NewSyncCache(c.LRUCache.Bucket, c.LRUCache.Capacity, c.LRUCache.Timeout),
  104. }
  105. return
  106. }
  107. // Close close the resource.
  108. func (d *Dao) Close() {
  109. d.redis.Close()
  110. d.db.Close()
  111. return
  112. }
  113. // Ping dao ping
  114. func (d *Dao) Ping(c context.Context) error {
  115. // TODO: if you need use mc,redis, please add
  116. return d.db.Ping(c)
  117. }
  118. // FetchRoomByIDs implementation
  119. // FetchRoomByIDs 查询房间信息
  120. func (d *Dao) FetchRoomByIDs(ctx context.Context, req *v1pb.RoomByIDsReq) (resp *v1pb.RoomByIDsResp, err error) {
  121. if len(req.RoomIds) > 0 {
  122. req.RoomIds, err = d.dbNormalizeRoomIDs(ctx, req.RoomIds)
  123. if err != nil {
  124. log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] normalize ids error(%v), req(%v)", err, req)
  125. return nil, err
  126. }
  127. }
  128. // TODO: 处理部分fields的情况,需要考虑特殊status的依赖问题
  129. if len(req.RoomIds) > 0 {
  130. resp = &v1pb.RoomByIDsResp{
  131. RoomDataSet: make(map[int64]*v1pb.RoomData),
  132. }
  133. idsDB := make([]int64, 0, len(req.RoomIds))
  134. // 从redis获取房间所有信息
  135. for _, id := range req.RoomIds {
  136. data, err := d.redisGetRoomInfo(ctx, id, _allRoomInfoFields)
  137. if err != nil {
  138. idsDB = append(idsDB, id)
  139. } else {
  140. d.dbDealWithStatus(ctx, data)
  141. resp.RoomDataSet[id] = data
  142. }
  143. }
  144. // 需要回源DB取数据
  145. if len(idsDB) > 0 {
  146. // 分段处理
  147. for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
  148. end := start + FETCH_PAGE_SIZE
  149. if end > len(idsDB) {
  150. end = len(idsDB)
  151. }
  152. reqRoom := &v1pb.RoomByIDsReq{
  153. RoomIds: idsDB[start:end],
  154. Fields: _allRoomInfoFields,
  155. }
  156. respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
  157. if err != nil {
  158. log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
  159. return nil, err
  160. }
  161. // 回写房间信息到redis
  162. for _, id := range idsDB[start:end] {
  163. resp.RoomDataSet[id] = respRoom.RoomDataSet[id]
  164. d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
  165. }
  166. }
  167. }
  168. } else if len(req.Uids) > 0 {
  169. // TODO 根据主播ID查询房间号的场景较少,暂不优化,后续先转房间号
  170. resp, err = d.dbFetchRoomByIDs(ctx, req)
  171. }
  172. return
  173. }
  174. // RoomOnlineList implementation
  175. // RoomOnlineList 在线房间列表
  176. func (d *Dao) RoomOnlineList(ctx context.Context, req *v1pb.RoomOnlineListReq) (resp *v1pb.RoomOnlineListResp, err error) {
  177. log.Info("[dao|RoomOnlineList] req(%v)", err, req)
  178. ids, err := d.redisGetOnlineList(ctx, _onlineListAllArea)
  179. if err != nil || len(ids) <= 0 {
  180. ids, err = d.dbOnlineListByArea(ctx, _onlineListAllArea)
  181. if err != nil {
  182. log.Error("[RoomOnlineListByAttrs] dbOnlineListByArea error(%v), req(%v)", err, req)
  183. return nil, err
  184. }
  185. d.redisSetOnlineList(ctx, _onlineListAllArea, ids)
  186. }
  187. resp = &v1pb.RoomOnlineListResp{
  188. RoomDataList: make(map[int64]*v1pb.RoomData),
  189. }
  190. // 分页逻辑
  191. start := int(req.Page * req.PageSize)
  192. size := len(ids)
  193. if start >= size {
  194. return
  195. }
  196. end := start + int(req.PageSize)
  197. if end > size {
  198. end = size
  199. }
  200. ids = ids[start:end]
  201. idsDB := make([]int64, 0, len(ids))
  202. // 从redis获取房间信息
  203. for _, id := range ids {
  204. data, err := d.redisGetRoomInfo(ctx, id, req.Fields)
  205. if err != nil {
  206. idsDB = append(idsDB, id)
  207. } else {
  208. d.dbDealWithStatus(ctx, data)
  209. resp.RoomDataList[id] = data
  210. }
  211. }
  212. // 需要回源DB取数据
  213. if len(idsDB) > 0 {
  214. reqRoom := &v1pb.RoomByIDsReq{
  215. RoomIds: idsDB,
  216. Fields: _allRoomInfoFields,
  217. }
  218. respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
  219. if err != nil {
  220. log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
  221. return nil, err
  222. }
  223. // 回写房间信息到redis
  224. for _, id := range idsDB {
  225. resp.RoomDataList[id] = respRoom.RoomDataSet[id]
  226. d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
  227. }
  228. }
  229. return
  230. }
  231. // RoomOnlineListByArea implementation
  232. // RoomOnlineListByArea 分区在线房间列表
  233. func (d *Dao) RoomOnlineListByArea(ctx context.Context, req *v1pb.RoomOnlineListByAreaReq) (resp *v1pb.RoomOnlineListByAreaResp, err error) {
  234. idSet := make(map[int64]bool)
  235. idsDB := make([]int64, 0)
  236. if len(req.AreaIds) <= 0 {
  237. req.AreaIds = []int64{0}
  238. }
  239. for _, areaID := range req.AreaIds {
  240. ids, err := d.redisGetOnlineList(ctx, areaID)
  241. if err != nil {
  242. idsDB = append(idsDB, areaID)
  243. } else {
  244. for _, id := range ids {
  245. idSet[id] = true
  246. }
  247. }
  248. }
  249. // 需要回源DB取数据
  250. if len(idsDB) > 0 {
  251. for _, areaID := range idsDB {
  252. roomIds, err := d.dbOnlineListByArea(ctx, areaID)
  253. if err != nil {
  254. log.Error("[RoomOnlineListByArea] dbOnlineListByArea error(%v), areaID(%v)", err, areaID)
  255. return nil, err
  256. }
  257. d.redisSetOnlineList(ctx, areaID, roomIds)
  258. for _, id := range roomIds {
  259. idSet[id] = true
  260. }
  261. }
  262. }
  263. resp = &v1pb.RoomOnlineListByAreaResp{
  264. RoomIds: make([]int64, 0, len(idSet)),
  265. }
  266. for id := range idSet {
  267. resp.RoomIds = append(resp.RoomIds, id)
  268. }
  269. return
  270. }
  271. var (
  272. _fields = []string{"uid", "area_id", "parent_area_id", "popularity_count", "anchor_profile_type"}
  273. )
  274. // RoomOnlineListByAttrs implementation
  275. // RoomOnlineListByAttrs 在线房间维度信息(不传attrs,不查询attr)
  276. func (d *Dao) RoomOnlineListByAttrs(ctx context.Context, req *v1pb.RoomOnlineListByAttrsReq) (resp *v1pb.RoomOnlineListByAttrsResp, err error) {
  277. ids, err := d.redisGetOnlineList(ctx, _onlineListAllArea)
  278. if err != nil || len(ids) <= 0 {
  279. ids, err = d.dbOnlineListByArea(ctx, _onlineListAllArea)
  280. if err != nil {
  281. log.Error("[RoomOnlineListByAttrs] dbOnlineListByArea error(%v), req(%v)", err, req)
  282. return nil, err
  283. }
  284. d.redisSetOnlineList(ctx, _onlineListAllArea, ids)
  285. }
  286. resp = &v1pb.RoomOnlineListByAttrsResp{
  287. Attrs: make(map[int64]*v1pb.AttrResp),
  288. }
  289. idsDB := make([]int64, 0, len(ids))
  290. for _, id := range ids {
  291. // 从redis获取房间基础信息
  292. data, err := d.redisGetRoomInfo(ctx, id, _fields)
  293. if err != nil {
  294. idsDB = append(idsDB, id)
  295. } else {
  296. resp.Attrs[id] = &v1pb.AttrResp{
  297. Uid: data.Uid,
  298. RoomId: id,
  299. AreaId: data.AreaId,
  300. ParentAreaId: data.ParentAreaId,
  301. PopularityCount: data.PopularityCount,
  302. AnchorProfileType: data.AnchorProfileType,
  303. }
  304. }
  305. }
  306. // 需要回源DB取数据
  307. if len(idsDB) > 0 {
  308. eg := errgroup.Group{}
  309. // 分段处理
  310. for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
  311. end := start + FETCH_PAGE_SIZE
  312. if end > len(idsDB) {
  313. end = len(idsDB)
  314. }
  315. eg.Go(func(idsDB []int64, start, end int) func() error {
  316. return func() (err error) {
  317. reqRoom := &v1pb.RoomByIDsReq{
  318. RoomIds: idsDB[start:end],
  319. Fields: _allRoomInfoFields,
  320. }
  321. respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
  322. if err != nil {
  323. log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
  324. return err
  325. }
  326. // 回写房间信息到redis
  327. for _, id := range idsDB[start:end] {
  328. resp.Attrs[id] = &v1pb.AttrResp{
  329. Uid: respRoom.RoomDataSet[id].Uid,
  330. RoomId: id,
  331. AreaId: respRoom.RoomDataSet[id].AreaId,
  332. ParentAreaId: respRoom.RoomDataSet[id].ParentAreaId,
  333. PopularityCount: respRoom.RoomDataSet[id].PopularityCount,
  334. AnchorProfileType: respRoom.RoomDataSet[id].AnchorProfileType,
  335. TagList: respRoom.RoomDataSet[id].TagList,
  336. AttrList: make([]*v1pb.AttrData, 0, len(req.Attrs)),
  337. }
  338. d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
  339. d.redisSetTagList(ctx, id, respRoom.RoomDataSet[id].TagList)
  340. }
  341. return
  342. }
  343. }(idsDB, start, end))
  344. }
  345. eg.Wait()
  346. }
  347. // 重置回源数组
  348. idsDB = make([]int64, 0, len(ids))
  349. for _, id := range ids {
  350. if resp.Attrs[id].TagList == nil {
  351. // 从redis获取房间Tag信息
  352. data, err := d.redisGetTagList(ctx, id)
  353. if err != nil {
  354. idsDB = append(idsDB, id)
  355. } else {
  356. resp.Attrs[id].TagList = data
  357. }
  358. }
  359. }
  360. // 需要回源DB取数据
  361. if len(idsDB) > 0 {
  362. eg := errgroup.Group{}
  363. // 分段处理
  364. for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
  365. end := start + FETCH_PAGE_SIZE
  366. if end > len(idsDB) {
  367. end = len(idsDB)
  368. }
  369. eg.Go(func(idsDB []int64, start, end int) func() error {
  370. return func() (err error) {
  371. respRoom := make(map[int64]*v1pb.RoomData)
  372. err = d.dbFetchTagInfo(ctx, idsDB[start:end], respRoom)
  373. if err != nil {
  374. log.Error("[RoomOnlineList] dbFetchTagInfo error(%v), idsDB[start:end](%v)", err, idsDB[start:end])
  375. return err
  376. }
  377. // 回写房间Tag信息到redis
  378. for _, id := range idsDB[start:end] {
  379. if tag, ok := respRoom[id]; ok {
  380. resp.Attrs[id].TagList = tag.TagList
  381. } else {
  382. resp.Attrs[id].TagList = make([]*v1pb.TagData, 0, len(req.Attrs))
  383. }
  384. d.redisSetTagList(ctx, id, resp.Attrs[id].TagList)
  385. }
  386. return
  387. }
  388. }(idsDB, start, end))
  389. }
  390. eg.Wait()
  391. }
  392. // TODO 从redis获取attr列表
  393. // TODO 批量从db获取attr列表
  394. if len(req.Attrs) > 0 {
  395. eg := errgroup.Group{}
  396. for _, attr := range req.Attrs {
  397. // 实时人气值特殊处理
  398. if attr.AttrId == ATTRID_POPULARITY && attr.AttrSubId == ATTRSUBID_POPULARITY_REALTIME {
  399. for _, attrResp := range resp.Attrs {
  400. resp.Attrs[attrResp.RoomId].AttrList = append(resp.Attrs[attrResp.RoomId].AttrList, &v1pb.AttrData{
  401. RoomId: attrResp.RoomId,
  402. AttrId: attr.AttrId,
  403. AttrSubId: attr.AttrSubId,
  404. AttrValue: attrResp.PopularityCount,
  405. })
  406. }
  407. continue
  408. }
  409. eg.Go(func(attr *v1pb.AttrReq) func() error {
  410. return func() (err error) {
  411. reqAttr := &v1pb.FetchAttrByIDsReq{
  412. AttrId: attr.AttrId,
  413. AttrSubId: attr.AttrSubId,
  414. RoomIds: ids,
  415. }
  416. respAttr, err := d.FetchAttrByIDs(ctx, reqAttr)
  417. if err != nil {
  418. log.Error("[RoomOnlineListByAttrs] FetchAttrByIDs from db error(%v), reqAttr(%v)", err, reqAttr)
  419. return err
  420. }
  421. for _, attr := range respAttr.Attrs {
  422. resp.Attrs[attr.RoomId].AttrList = append(resp.Attrs[attr.RoomId].AttrList, attr)
  423. }
  424. return
  425. }
  426. }(attr))
  427. }
  428. eg.Wait()
  429. }
  430. return
  431. }
  432. // RoomCreate implementation
  433. // RoomCreate 房间创建
  434. func (d *Dao) RoomCreate(ctx context.Context, req *v1pb.RoomCreateReq) (resp *v1pb.RoomCreateResp, err error) {
  435. return d.roomCreate(ctx, req)
  436. }
  437. // RoomUpdate implementation
  438. // RoomUpdate 房间更新
  439. func (d *Dao) RoomUpdate(ctx context.Context, req *v1pb.RoomUpdateReq) (resp *v1pb.UpdateResp, err error) {
  440. resp, err = d.roomUpdate(ctx, req)
  441. if err == nil {
  442. fields := make([]string, 0, len(req.Fields))
  443. data := &v1pb.RoomData{
  444. RoomId: req.RoomId,
  445. AnchorLevel: new(v1pb.AnchorLevel),
  446. }
  447. for _, f := range req.Fields {
  448. switch f {
  449. case "title":
  450. data.Title = req.Title
  451. case "cover":
  452. data.Cover = req.Cover
  453. case "tags":
  454. data.Tags = req.Tags
  455. case "background":
  456. data.Background = req.Background
  457. case "description":
  458. data.Description = req.Description
  459. case "live_start_time":
  460. data.LiveStartTime = req.LiveStartTime
  461. // 更新在播列表
  462. var areaID int64
  463. reqRoom := &v1pb.RoomByIDsReq{
  464. RoomIds: []int64{req.RoomId},
  465. Fields: _allRoomInfoFields,
  466. }
  467. respRoom, err := d.FetchRoomByIDs(ctx, reqRoom)
  468. if err != nil {
  469. log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
  470. } else {
  471. if respRoom.RoomDataSet[req.RoomId] != nil {
  472. areaID = respRoom.RoomDataSet[req.RoomId].AreaId
  473. }
  474. }
  475. if req.LiveStartTime > 0 {
  476. d.redisAddOnlineList(ctx, _onlineListAllArea, req.RoomId)
  477. d.redisAddOnlineList(ctx, areaID, req.RoomId)
  478. } else {
  479. d.redisDelOnlineList(ctx, _onlineListAllArea, req.RoomId)
  480. d.redisDelOnlineList(ctx, areaID, req.RoomId)
  481. }
  482. // TODO 更新开播状态
  483. case "live_screen_type":
  484. data.LiveScreenType = req.LiveScreenType
  485. case "live_type":
  486. data.LiveType = req.LiveType
  487. case "lock_status":
  488. data.LockStatus = req.LockStatus
  489. case "lock_time":
  490. data.LockTime = req.LockTime
  491. case "hidden_time":
  492. data.HiddenTime = req.HiddenTime
  493. // TODO 更新隐藏状态
  494. case "area_id":
  495. data.AreaId = req.AreaId
  496. if req.AreaId > 0 {
  497. areaInfo, err := d.dbFetchAreaInfo(ctx, req.AreaId)
  498. if err != nil {
  499. log.Error("[dao.dao-anchor.mysql|roomUpdate] fetch area info error(%v), req(%v)", err, req)
  500. err = ecode.InvalidParam
  501. return nil, err
  502. }
  503. data.ParentAreaId = areaInfo.ParentAreaID
  504. fields = append(fields, "parent_area_id")
  505. }
  506. default:
  507. continue
  508. }
  509. fields = append(fields, f)
  510. }
  511. d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
  512. }
  513. return
  514. }
  515. // RoomBatchUpdate implementation
  516. // RoomBatchUpdate 房间更新
  517. func (d *Dao) RoomBatchUpdate(ctx context.Context, req *v1pb.RoomBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  518. resp = &v1pb.UpdateResp{}
  519. for _, r := range req.Reqs {
  520. res, err := d.RoomUpdate(ctx, r)
  521. if err != nil {
  522. log.Error("[dao.dao-anchor.mysql|RoomBatchUpdate] update room record error(%v), req(%v)", err, r)
  523. return nil, err
  524. }
  525. resp.AffectedRows += res.AffectedRows
  526. }
  527. return
  528. }
  529. // RoomExtendUpdate implementation
  530. // RoomExtendUpdate 房间更新
  531. func (d *Dao) RoomExtendUpdate(ctx context.Context, req *v1pb.RoomExtendUpdateReq) (resp *v1pb.UpdateResp, err error) {
  532. resp, err = d.roomExtendUpdate(ctx, req)
  533. if err == nil {
  534. fields := make([]string, 0, len(req.Fields))
  535. data := &v1pb.RoomData{
  536. RoomId: req.RoomId,
  537. AnchorLevel: new(v1pb.AnchorLevel),
  538. }
  539. for _, f := range req.Fields {
  540. switch f {
  541. case "keyframe":
  542. data.Keyframe = req.Keyframe
  543. case "popularity_count":
  544. data.PopularityCount = req.PopularityCount
  545. default:
  546. continue
  547. }
  548. fields = append(fields, f)
  549. }
  550. d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
  551. }
  552. return
  553. }
  554. // RoomExtendBatchUpdate implementation
  555. // RoomExtendBatchUpdate 房间更新
  556. func (d *Dao) RoomExtendBatchUpdate(ctx context.Context, req *v1pb.RoomExtendBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  557. resp = &v1pb.UpdateResp{}
  558. for _, r := range req.Reqs {
  559. res, err := d.RoomExtendUpdate(ctx, r)
  560. if err != nil {
  561. log.Error("[dao.dao-anchor.mysql|RoomExtendBatchUpdate] update room extend record error(%v), req(%v)", err, r)
  562. return nil, err
  563. }
  564. resp.AffectedRows += res.AffectedRows
  565. }
  566. return
  567. }
  568. // RoomExtendIncre implementation
  569. // RoomExtendIncre 房间增量更新
  570. func (d *Dao) RoomExtendIncre(ctx context.Context, req *v1pb.RoomExtendIncreReq) (resp *v1pb.UpdateResp, err error) {
  571. resp, err = d.roomExtendIncre(ctx, req)
  572. if err == nil {
  573. fields := make([]string, 0, len(req.Fields))
  574. data := &v1pb.RoomData{
  575. RoomId: req.RoomId,
  576. AnchorLevel: new(v1pb.AnchorLevel),
  577. }
  578. for _, f := range req.Fields {
  579. switch f {
  580. case "popularity_count":
  581. data.PopularityCount = req.PopularityCount
  582. default:
  583. continue
  584. }
  585. fields = append(fields, f)
  586. }
  587. if len(fields) > 0 {
  588. d.redisIncreRoomInfo(ctx, data.RoomId, fields, data)
  589. }
  590. }
  591. return
  592. }
  593. // RoomExtendBatchIncre implementation
  594. // RoomExtendBatchIncre 房间增量更新
  595. func (d *Dao) RoomExtendBatchIncre(ctx context.Context, req *v1pb.RoomExtendBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
  596. resp = &v1pb.UpdateResp{}
  597. for _, r := range req.Reqs {
  598. res, err := d.RoomExtendIncre(ctx, r)
  599. if err != nil {
  600. log.Error("[dao.dao-anchor.mysql|RoomExtendBatchIncre] update room extend increment record error(%v), req(%v)", err, r)
  601. return nil, err
  602. }
  603. resp.AffectedRows += res.AffectedRows
  604. }
  605. return
  606. }
  607. // RoomTagCreate implementation
  608. // RoomTagCreate 房间Tag创建
  609. func (d *Dao) RoomTagCreate(ctx context.Context, req *v1pb.RoomTagCreateReq) (resp *v1pb.UpdateResp, err error) {
  610. resp, err = d.roomTagCreate(ctx, req)
  611. if err == nil {
  612. tag := &v1pb.TagData{
  613. TagId: req.TagId,
  614. TagSubId: req.TagSubId,
  615. TagValue: req.TagValue,
  616. TagExt: req.TagExt,
  617. TagExpireAt: req.TagExpireAt,
  618. }
  619. d.redisAddTag(ctx, req.RoomId, tag)
  620. }
  621. return
  622. }
  623. // RoomAttrCreate implementation
  624. // RoomAttrCreate 房间Attr创建
  625. func (d *Dao) RoomAttrCreate(ctx context.Context, req *v1pb.RoomAttrCreateReq) (resp *v1pb.UpdateResp, err error) {
  626. return d.roomAttrCreate(ctx, req)
  627. }
  628. // RoomAttrSetEx implementation
  629. // RoomAttrSetEx 房间Attr更新
  630. func (d *Dao) RoomAttrSetEx(ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp *v1pb.UpdateResp, err error) {
  631. return d.roomAttrSetEx(ctx, req)
  632. }
  633. // AnchorUpdate implementation
  634. // AnchorUpdate 主播更新
  635. func (d *Dao) AnchorUpdate(ctx context.Context, req *v1pb.AnchorUpdateReq) (resp *v1pb.UpdateResp, err error) {
  636. resp, err = d.anchorUpdate(ctx, req)
  637. if err == nil {
  638. roomID := d.dbFetchRoomIDByUID(ctx, req.Uid)
  639. if roomID == 0 {
  640. return
  641. }
  642. fields := make([]string, 0, len(req.Fields))
  643. data := &v1pb.RoomData{
  644. RoomId: roomID,
  645. AnchorLevel: new(v1pb.AnchorLevel),
  646. }
  647. for _, f := range req.Fields {
  648. switch f {
  649. case "profile_type":
  650. f = "anchor_profile_type"
  651. data.AnchorProfileType = req.ProfileType
  652. case "san_score":
  653. f = "anchor_san"
  654. data.AnchorSan = req.SanScore
  655. case "round_status":
  656. f = "anchor_round_switch"
  657. data.AnchorRoundSwitch = req.RoundStatus
  658. case "record_status":
  659. f = "anchor_record_switch"
  660. data.AnchorRecordSwitch = req.RecordStatus
  661. case "exp":
  662. f = "anchor_exp"
  663. data.AnchorLevel.Score = req.Exp
  664. default:
  665. log.Error("[dao.dao-anchor.mysql|anchorUpdate] unsupported field(%v), req(%s)", f, req)
  666. err = ecode.InvalidParam
  667. return
  668. }
  669. fields = append(fields, f)
  670. }
  671. d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
  672. }
  673. return
  674. }
  675. // AnchorBatchUpdate implementation
  676. // AnchorBatchUpdate 主播更新
  677. func (d *Dao) AnchorBatchUpdate(ctx context.Context, req *v1pb.AnchorBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
  678. resp = &v1pb.UpdateResp{}
  679. for _, r := range req.Reqs {
  680. res, err := d.AnchorUpdate(ctx, r)
  681. if err != nil {
  682. log.Error("[dao.dao-anchor.mysql|AnchorBatchUpdate] update anchor record error(%v), req(%v)", err, r)
  683. return nil, err
  684. }
  685. resp.AffectedRows += res.AffectedRows
  686. }
  687. return
  688. }
  689. // AnchorIncre implementation
  690. // AnchorIncre 主播增量更新
  691. func (d *Dao) AnchorIncre(ctx context.Context, req *v1pb.AnchorIncreReq) (resp *v1pb.UpdateResp, err error) {
  692. resp, err = d.anchorIncre(ctx, req)
  693. if err == nil {
  694. roomID := d.dbFetchRoomIDByUID(ctx, req.Uid)
  695. if roomID == 0 {
  696. return
  697. }
  698. fields := make([]string, 0, len(req.Fields))
  699. data := &v1pb.RoomData{
  700. RoomId: roomID,
  701. AnchorLevel: new(v1pb.AnchorLevel),
  702. }
  703. for _, f := range req.Fields {
  704. switch f {
  705. case "san_score":
  706. f = "anchor_san"
  707. data.AnchorSan = req.SanScore
  708. case "exp":
  709. f = "anchor_exp"
  710. data.AnchorLevel.Score = req.Exp
  711. default:
  712. continue
  713. }
  714. fields = append(fields, f)
  715. }
  716. if len(fields) > 0 {
  717. d.redisIncreRoomInfo(ctx, data.RoomId, fields, data)
  718. }
  719. }
  720. return
  721. }
  722. // AnchorBatchIncre implementation
  723. // AnchorBatchIncre 主播增量更新
  724. func (d *Dao) AnchorBatchIncre(ctx context.Context, req *v1pb.AnchorBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
  725. resp = &v1pb.UpdateResp{}
  726. for _, r := range req.Reqs {
  727. res, err := d.AnchorIncre(ctx, r)
  728. if err != nil {
  729. log.Error("[dao.dao-anchor.mysql|AnchorBatchIncre] update anchor increment record error(%v), req(%v)", err, r)
  730. return nil, err
  731. }
  732. resp.AffectedRows += res.AffectedRows
  733. }
  734. return
  735. }
  736. // FetchAreas implementation
  737. // FetchAreas 根据父分区号查询子分区
  738. func (d *Dao) FetchAreas(ctx context.Context, req *v1pb.FetchAreasReq) (resp *v1pb.FetchAreasResp, err error) {
  739. return d.fetchAreas(ctx, req)
  740. }
  741. // FetchAttrByIDs implementation
  742. // FetchAttrByIDs 批量根据房间号查询指标
  743. func (d *Dao) FetchAttrByIDs(ctx context.Context, req *v1pb.FetchAttrByIDsReq) (resp *v1pb.FetchAttrByIDsResp, err error) {
  744. return d.fetchAttrByIDs(ctx, req)
  745. }
  746. // DeleteAttr implementation
  747. // DeleteAttr 删除一个指标
  748. func (d *Dao) DeleteAttr(ctx context.Context, req *v1pb.DeleteAttrReq) (resp *v1pb.UpdateResp, err error) {
  749. return d.deleteAttr(ctx, req)
  750. }
  751. type msgVal struct {
  752. MsgID string `json:"msg_id"`
  753. }
  754. func getConsumedKey(topic string, msgID string) string {
  755. return fmt.Sprintf("consumed:%s:%s", topic, msgID)
  756. }
  757. // 清除消费过的记录,主要用于测试
  758. func (d *Dao) clearConsumed(ctx context.Context, msg *databus.Message) {
  759. val := &msgVal{}
  760. err := jsonitor.Unmarshal(msg.Value, val)
  761. if err != nil {
  762. return
  763. }
  764. conn := d.redis.Get(ctx)
  765. defer conn.Close()
  766. conn.Do("DEL", getConsumedKey(msg.Topic, val.MsgID))
  767. }
  768. // CanConsume 是否可以消费
  769. func (d *Dao) CanConsume(ctx context.Context, msg *databus.Message) bool {
  770. val := &msgVal{}
  771. err := jsonitor.Unmarshal(msg.Value, val)
  772. if err != nil {
  773. log.Error("unmarshal msg value error %+v, value: %s", err, string(msg.Value))
  774. return true
  775. }
  776. if val.MsgID == "" {
  777. log.Warn("msg_id is empty ; value: %s", string(msg.Value))
  778. return true
  779. }
  780. conn := d.redis.Get(ctx)
  781. defer conn.Close()
  782. var key = getConsumedKey(msg.Topic, val.MsgID)
  783. reply, err := conn.Do("SET", key, "1", "NX", "EX", 86400) // 24 hours
  784. if err == nil {
  785. if reply == nil {
  786. log.Info("Already consumed key:%s", key)
  787. return false
  788. } else {
  789. return true
  790. }
  791. }
  792. if err == redis.ErrNil {
  793. //already consumed
  794. log.Info("Already consumed key:%s", key)
  795. return false
  796. }
  797. // other redis error happenned, let it pass
  798. log.Error("redis error when resolve CanConsume %+v", err)
  799. return true
  800. }