123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874 |
- package dao
- import (
- "context"
- "fmt"
- "go-common/app/common/live/library/lrucache"
- jsonitor "github.com/json-iterator/go"
- v1pb "go-common/app/service/live/dao-anchor/api/grpc/v1"
- "go-common/app/service/live/dao-anchor/conf"
- "go-common/library/cache/redis"
- xsql "go-common/library/database/sql"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "go-common/library/sync/errgroup"
- )
- const (
- INFO_ROOM = 1 << iota
- INFO_ROOM_EXT
- INFO_TAG
- INFO_ANCHOR
- INFO_SHORT_ID
- INFO_AREA_INFO
- )
- const (
- INFO_ALL = (INFO_ROOM | INFO_ROOM_EXT | INFO_TAG | INFO_ANCHOR | INFO_SHORT_ID | INFO_AREA_INFO)
- )
- const (
- FETCH_PAGE_SIZE = 100
- )
- //消费类型常量 定义
- const (
- //弹幕
- //DANMU_NUM 当前弹幕累计数量
- DANMU_NUM = "danmu_num"
- //DANMU_MINUTE_NUM_15 最近15分钟弹幕数量
- DANMU_MINUTE_NUM_15 = "danmu_minute_num_15"
- //DANMU_MINUTE_NUM_30 ...
- DANMU_MINUTE_NUM_30 = "danmu_minute_num_30"
- //DANMU_MINUTE_NUM_45 ...
- DANMU_MINUTE_NUM_45 = "danmu_minute_num_45"
- //DANMU_MINUTE_NUM_60 ...
- DANMU_MINUTE_NUM_60 = "danmu_minute_num_60"
- //人气
- //POPULARITY 当前实时人气
- POPULARITY = "popularity"
- //POPULARITY_MAX_TO_ARG_7 7日峰值人气的均值
- POPULARITY_MAX_TO_ARG_7 = "popularity_max_to_avg_7"
- //POPULARITY_MAX_TO_ARG_30 30日人气峰值的均值
- POPULARITY_MAX_TO_ARG_30 = "popularity_max_to_avg_30"
- //送礼
- //GIFT_NUM 实时送礼数
- GIFT_NUM = "gift_num_current_total"
- //GIFT_GOLD_AMOUNT 实时消费金瓜子数
- GIFT_GOLD_NUM = "gift_gold_num"
- //GIFT_GOLD_AMOUNT 实时消费金瓜子金额
- GIFT_GOLD_AMOUNT = "gift_gold_amount"
- //GIFT_GOLD_AMOUNT_MINUTE_15 最近15分钟金瓜子金额
- GIFT_GOLD_AMOUNT_MINUTE_15 = "gift_gold_num_minute_15"
- //GIFT_GOLD_AMOUNT_MINUTE_30 最近30分钟金瓜子金额
- GIFT_GOLD_AMOUNT_MINUTE_30 = "gift_gold_num_minute_30"
- //GIFT_GOLD_AMOUNT_MINUTE_45 ...
- GIFT_GOLD_AMOUNT_MINUTE_45 = "gift_gold_num_minute_45"
- //GIFT_GOLD_AMOUNT_MINUTE_60 ...
- GIFT_GOLD_AMOUNT_MINUTE_60 = "gift_gold_num_minute_60"
- //有效开播天数
- //VALID_LIVE_DAYS_TYPE_1_DAY_7 7日内有效开播天数;有效开播:一次开播大于5分钟
- VALID_LIVE_DAYS_TYPE_1_DAY_7 = "valid_days_type_1_day_7"
- //VALID_LIVE_DAYS_TYPE_1_DAY_14 14日内有效开播天数;有效开播:一次开播大于5分钟
- VALID_LIVE_DAYS_TYPE_1_DAY_14 = "valid_days_type_1_day_14"
- //VALID_LIVE_DAYS_TYPE_2_DAY_7 7日内有效开播天数;有效开播:大于等于120分钟
- VALID_LIVE_DAYS_TYPE_2_DAY_7 = "valid_days_type_2_day_7"
- //VALID_LIVE_DAYS_TYPE_2_DAY_30 14日内有效开播天数;有效开播:大于等于120分钟
- VALID_LIVE_DAYS_TYPE_2_DAY_30 = "valid_days_type_2_day_30"
- //房间状态
- //ROOM_TAG_CURRENT 房间实时标签
- ROOM_TAG_CURRENT = "room_tag_current"
- //榜单
- //RANK_LIST_CURRENT 排行榜相关数据
- RANK_LIST_CURRENT = "rank_list_current"
- //DAU
- DAU = "dau"
- )
- const (
- _RoomIdMappingCacheCapacity = 1024
- )
- // Dao dao
- type Dao struct {
- c *conf.Config
- redis *redis.Pool
- db *xsql.DB
- dbLiveApp *xsql.DB
- shortIDMapping *lrucache.SyncCache
- areaInfoMapping *lrucache.SyncCache
- }
- // New init mysql db
- func New(c *conf.Config) (dao *Dao) {
- dao = &Dao{
- c: c,
- redis: redis.NewPool(c.Redis),
- db: xsql.NewMySQL(c.MySQL),
- dbLiveApp: xsql.NewMySQL(c.LiveAppMySQL),
- shortIDMapping: lrucache.NewSyncCache(c.LRUCache.Bucket, c.LRUCache.Capacity, c.LRUCache.Timeout),
- areaInfoMapping: lrucache.NewSyncCache(c.LRUCache.Bucket, c.LRUCache.Capacity, c.LRUCache.Timeout),
- }
- return
- }
- // Close close the resource.
- func (d *Dao) Close() {
- d.redis.Close()
- d.db.Close()
- return
- }
- // Ping dao ping
- func (d *Dao) Ping(c context.Context) error {
- // TODO: if you need use mc,redis, please add
- return d.db.Ping(c)
- }
- // FetchRoomByIDs implementation
- // FetchRoomByIDs 查询房间信息
- func (d *Dao) FetchRoomByIDs(ctx context.Context, req *v1pb.RoomByIDsReq) (resp *v1pb.RoomByIDsResp, err error) {
- if len(req.RoomIds) > 0 {
- req.RoomIds, err = d.dbNormalizeRoomIDs(ctx, req.RoomIds)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|dbFetchRoomByIDs] normalize ids error(%v), req(%v)", err, req)
- return nil, err
- }
- }
- // TODO: 处理部分fields的情况,需要考虑特殊status的依赖问题
- if len(req.RoomIds) > 0 {
- resp = &v1pb.RoomByIDsResp{
- RoomDataSet: make(map[int64]*v1pb.RoomData),
- }
- idsDB := make([]int64, 0, len(req.RoomIds))
- // 从redis获取房间所有信息
- for _, id := range req.RoomIds {
- data, err := d.redisGetRoomInfo(ctx, id, _allRoomInfoFields)
- if err != nil {
- idsDB = append(idsDB, id)
- } else {
- d.dbDealWithStatus(ctx, data)
- resp.RoomDataSet[id] = data
- }
- }
- // 需要回源DB取数据
- if len(idsDB) > 0 {
- // 分段处理
- for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
- end := start + FETCH_PAGE_SIZE
- if end > len(idsDB) {
- end = len(idsDB)
- }
- reqRoom := &v1pb.RoomByIDsReq{
- RoomIds: idsDB[start:end],
- Fields: _allRoomInfoFields,
- }
- respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
- if err != nil {
- log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
- return nil, err
- }
- // 回写房间信息到redis
- for _, id := range idsDB[start:end] {
- resp.RoomDataSet[id] = respRoom.RoomDataSet[id]
- d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
- }
- }
- }
- } else if len(req.Uids) > 0 {
- // TODO 根据主播ID查询房间号的场景较少,暂不优化,后续先转房间号
- resp, err = d.dbFetchRoomByIDs(ctx, req)
- }
- return
- }
- // RoomOnlineList implementation
- // RoomOnlineList 在线房间列表
- func (d *Dao) RoomOnlineList(ctx context.Context, req *v1pb.RoomOnlineListReq) (resp *v1pb.RoomOnlineListResp, err error) {
- log.Info("[dao|RoomOnlineList] req(%v)", err, req)
- ids, err := d.redisGetOnlineList(ctx, _onlineListAllArea)
- if err != nil || len(ids) <= 0 {
- ids, err = d.dbOnlineListByArea(ctx, _onlineListAllArea)
- if err != nil {
- log.Error("[RoomOnlineListByAttrs] dbOnlineListByArea error(%v), req(%v)", err, req)
- return nil, err
- }
- d.redisSetOnlineList(ctx, _onlineListAllArea, ids)
- }
- resp = &v1pb.RoomOnlineListResp{
- RoomDataList: make(map[int64]*v1pb.RoomData),
- }
- // 分页逻辑
- start := int(req.Page * req.PageSize)
- size := len(ids)
- if start >= size {
- return
- }
- end := start + int(req.PageSize)
- if end > size {
- end = size
- }
- ids = ids[start:end]
- idsDB := make([]int64, 0, len(ids))
- // 从redis获取房间信息
- for _, id := range ids {
- data, err := d.redisGetRoomInfo(ctx, id, req.Fields)
- if err != nil {
- idsDB = append(idsDB, id)
- } else {
- d.dbDealWithStatus(ctx, data)
- resp.RoomDataList[id] = data
- }
- }
- // 需要回源DB取数据
- if len(idsDB) > 0 {
- reqRoom := &v1pb.RoomByIDsReq{
- RoomIds: idsDB,
- Fields: _allRoomInfoFields,
- }
- respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
- if err != nil {
- log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
- return nil, err
- }
- // 回写房间信息到redis
- for _, id := range idsDB {
- resp.RoomDataList[id] = respRoom.RoomDataSet[id]
- d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
- }
- }
- return
- }
- // RoomOnlineListByArea implementation
- // RoomOnlineListByArea 分区在线房间列表
- func (d *Dao) RoomOnlineListByArea(ctx context.Context, req *v1pb.RoomOnlineListByAreaReq) (resp *v1pb.RoomOnlineListByAreaResp, err error) {
- idSet := make(map[int64]bool)
- idsDB := make([]int64, 0)
- if len(req.AreaIds) <= 0 {
- req.AreaIds = []int64{0}
- }
- for _, areaID := range req.AreaIds {
- ids, err := d.redisGetOnlineList(ctx, areaID)
- if err != nil {
- idsDB = append(idsDB, areaID)
- } else {
- for _, id := range ids {
- idSet[id] = true
- }
- }
- }
- // 需要回源DB取数据
- if len(idsDB) > 0 {
- for _, areaID := range idsDB {
- roomIds, err := d.dbOnlineListByArea(ctx, areaID)
- if err != nil {
- log.Error("[RoomOnlineListByArea] dbOnlineListByArea error(%v), areaID(%v)", err, areaID)
- return nil, err
- }
- d.redisSetOnlineList(ctx, areaID, roomIds)
- for _, id := range roomIds {
- idSet[id] = true
- }
- }
- }
- resp = &v1pb.RoomOnlineListByAreaResp{
- RoomIds: make([]int64, 0, len(idSet)),
- }
- for id := range idSet {
- resp.RoomIds = append(resp.RoomIds, id)
- }
- return
- }
- var (
- _fields = []string{"uid", "area_id", "parent_area_id", "popularity_count", "anchor_profile_type"}
- )
- // RoomOnlineListByAttrs implementation
- // RoomOnlineListByAttrs 在线房间维度信息(不传attrs,不查询attr)
- func (d *Dao) RoomOnlineListByAttrs(ctx context.Context, req *v1pb.RoomOnlineListByAttrsReq) (resp *v1pb.RoomOnlineListByAttrsResp, err error) {
- ids, err := d.redisGetOnlineList(ctx, _onlineListAllArea)
- if err != nil || len(ids) <= 0 {
- ids, err = d.dbOnlineListByArea(ctx, _onlineListAllArea)
- if err != nil {
- log.Error("[RoomOnlineListByAttrs] dbOnlineListByArea error(%v), req(%v)", err, req)
- return nil, err
- }
- d.redisSetOnlineList(ctx, _onlineListAllArea, ids)
- }
- resp = &v1pb.RoomOnlineListByAttrsResp{
- Attrs: make(map[int64]*v1pb.AttrResp),
- }
- idsDB := make([]int64, 0, len(ids))
- for _, id := range ids {
- // 从redis获取房间基础信息
- data, err := d.redisGetRoomInfo(ctx, id, _fields)
- if err != nil {
- idsDB = append(idsDB, id)
- } else {
- resp.Attrs[id] = &v1pb.AttrResp{
- Uid: data.Uid,
- RoomId: id,
- AreaId: data.AreaId,
- ParentAreaId: data.ParentAreaId,
- PopularityCount: data.PopularityCount,
- AnchorProfileType: data.AnchorProfileType,
- }
- }
- }
- // 需要回源DB取数据
- if len(idsDB) > 0 {
- eg := errgroup.Group{}
- // 分段处理
- for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
- end := start + FETCH_PAGE_SIZE
- if end > len(idsDB) {
- end = len(idsDB)
- }
- eg.Go(func(idsDB []int64, start, end int) func() error {
- return func() (err error) {
- reqRoom := &v1pb.RoomByIDsReq{
- RoomIds: idsDB[start:end],
- Fields: _allRoomInfoFields,
- }
- respRoom, err := d.dbFetchRoomByIDs(ctx, reqRoom)
- if err != nil {
- log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
- return err
- }
- // 回写房间信息到redis
- for _, id := range idsDB[start:end] {
- resp.Attrs[id] = &v1pb.AttrResp{
- Uid: respRoom.RoomDataSet[id].Uid,
- RoomId: id,
- AreaId: respRoom.RoomDataSet[id].AreaId,
- ParentAreaId: respRoom.RoomDataSet[id].ParentAreaId,
- PopularityCount: respRoom.RoomDataSet[id].PopularityCount,
- AnchorProfileType: respRoom.RoomDataSet[id].AnchorProfileType,
- TagList: respRoom.RoomDataSet[id].TagList,
- AttrList: make([]*v1pb.AttrData, 0, len(req.Attrs)),
- }
- d.redisSetRoomInfo(ctx, id, _allRoomInfoFields, respRoom.RoomDataSet[id], false)
- d.redisSetTagList(ctx, id, respRoom.RoomDataSet[id].TagList)
- }
- return
- }
- }(idsDB, start, end))
- }
- eg.Wait()
- }
- // 重置回源数组
- idsDB = make([]int64, 0, len(ids))
- for _, id := range ids {
- if resp.Attrs[id].TagList == nil {
- // 从redis获取房间Tag信息
- data, err := d.redisGetTagList(ctx, id)
- if err != nil {
- idsDB = append(idsDB, id)
- } else {
- resp.Attrs[id].TagList = data
- }
- }
- }
- // 需要回源DB取数据
- if len(idsDB) > 0 {
- eg := errgroup.Group{}
- // 分段处理
- for start := 0; start < len(idsDB); start += FETCH_PAGE_SIZE {
- end := start + FETCH_PAGE_SIZE
- if end > len(idsDB) {
- end = len(idsDB)
- }
- eg.Go(func(idsDB []int64, start, end int) func() error {
- return func() (err error) {
- respRoom := make(map[int64]*v1pb.RoomData)
- err = d.dbFetchTagInfo(ctx, idsDB[start:end], respRoom)
- if err != nil {
- log.Error("[RoomOnlineList] dbFetchTagInfo error(%v), idsDB[start:end](%v)", err, idsDB[start:end])
- return err
- }
- // 回写房间Tag信息到redis
- for _, id := range idsDB[start:end] {
- if tag, ok := respRoom[id]; ok {
- resp.Attrs[id].TagList = tag.TagList
- } else {
- resp.Attrs[id].TagList = make([]*v1pb.TagData, 0, len(req.Attrs))
- }
- d.redisSetTagList(ctx, id, resp.Attrs[id].TagList)
- }
- return
- }
- }(idsDB, start, end))
- }
- eg.Wait()
- }
- // TODO 从redis获取attr列表
- // TODO 批量从db获取attr列表
- if len(req.Attrs) > 0 {
- eg := errgroup.Group{}
- for _, attr := range req.Attrs {
- // 实时人气值特殊处理
- if attr.AttrId == ATTRID_POPULARITY && attr.AttrSubId == ATTRSUBID_POPULARITY_REALTIME {
- for _, attrResp := range resp.Attrs {
- resp.Attrs[attrResp.RoomId].AttrList = append(resp.Attrs[attrResp.RoomId].AttrList, &v1pb.AttrData{
- RoomId: attrResp.RoomId,
- AttrId: attr.AttrId,
- AttrSubId: attr.AttrSubId,
- AttrValue: attrResp.PopularityCount,
- })
- }
- continue
- }
- eg.Go(func(attr *v1pb.AttrReq) func() error {
- return func() (err error) {
- reqAttr := &v1pb.FetchAttrByIDsReq{
- AttrId: attr.AttrId,
- AttrSubId: attr.AttrSubId,
- RoomIds: ids,
- }
- respAttr, err := d.FetchAttrByIDs(ctx, reqAttr)
- if err != nil {
- log.Error("[RoomOnlineListByAttrs] FetchAttrByIDs from db error(%v), reqAttr(%v)", err, reqAttr)
- return err
- }
- for _, attr := range respAttr.Attrs {
- resp.Attrs[attr.RoomId].AttrList = append(resp.Attrs[attr.RoomId].AttrList, attr)
- }
- return
- }
- }(attr))
- }
- eg.Wait()
- }
- return
- }
- // RoomCreate implementation
- // RoomCreate 房间创建
- func (d *Dao) RoomCreate(ctx context.Context, req *v1pb.RoomCreateReq) (resp *v1pb.RoomCreateResp, err error) {
- return d.roomCreate(ctx, req)
- }
- // RoomUpdate implementation
- // RoomUpdate 房间更新
- func (d *Dao) RoomUpdate(ctx context.Context, req *v1pb.RoomUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.roomUpdate(ctx, req)
- if err == nil {
- fields := make([]string, 0, len(req.Fields))
- data := &v1pb.RoomData{
- RoomId: req.RoomId,
- AnchorLevel: new(v1pb.AnchorLevel),
- }
- for _, f := range req.Fields {
- switch f {
- case "title":
- data.Title = req.Title
- case "cover":
- data.Cover = req.Cover
- case "tags":
- data.Tags = req.Tags
- case "background":
- data.Background = req.Background
- case "description":
- data.Description = req.Description
- case "live_start_time":
- data.LiveStartTime = req.LiveStartTime
- // 更新在播列表
- var areaID int64
- reqRoom := &v1pb.RoomByIDsReq{
- RoomIds: []int64{req.RoomId},
- Fields: _allRoomInfoFields,
- }
- respRoom, err := d.FetchRoomByIDs(ctx, reqRoom)
- if err != nil {
- log.Error("[RoomOnlineList] dbFetchRoomByIDs error(%v), reqRoom(%v)", err, reqRoom)
- } else {
- if respRoom.RoomDataSet[req.RoomId] != nil {
- areaID = respRoom.RoomDataSet[req.RoomId].AreaId
- }
- }
- if req.LiveStartTime > 0 {
- d.redisAddOnlineList(ctx, _onlineListAllArea, req.RoomId)
- d.redisAddOnlineList(ctx, areaID, req.RoomId)
- } else {
- d.redisDelOnlineList(ctx, _onlineListAllArea, req.RoomId)
- d.redisDelOnlineList(ctx, areaID, req.RoomId)
- }
- // TODO 更新开播状态
- case "live_screen_type":
- data.LiveScreenType = req.LiveScreenType
- case "live_type":
- data.LiveType = req.LiveType
- case "lock_status":
- data.LockStatus = req.LockStatus
- case "lock_time":
- data.LockTime = req.LockTime
- case "hidden_time":
- data.HiddenTime = req.HiddenTime
- // TODO 更新隐藏状态
- case "area_id":
- data.AreaId = req.AreaId
- if req.AreaId > 0 {
- areaInfo, err := d.dbFetchAreaInfo(ctx, req.AreaId)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|roomUpdate] fetch area info error(%v), req(%v)", err, req)
- err = ecode.InvalidParam
- return nil, err
- }
- data.ParentAreaId = areaInfo.ParentAreaID
- fields = append(fields, "parent_area_id")
- }
- default:
- continue
- }
- fields = append(fields, f)
- }
- d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
- }
- return
- }
- // RoomBatchUpdate implementation
- // RoomBatchUpdate 房间更新
- func (d *Dao) RoomBatchUpdate(ctx context.Context, req *v1pb.RoomBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp = &v1pb.UpdateResp{}
- for _, r := range req.Reqs {
- res, err := d.RoomUpdate(ctx, r)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|RoomBatchUpdate] update room record error(%v), req(%v)", err, r)
- return nil, err
- }
- resp.AffectedRows += res.AffectedRows
- }
- return
- }
- // RoomExtendUpdate implementation
- // RoomExtendUpdate 房间更新
- func (d *Dao) RoomExtendUpdate(ctx context.Context, req *v1pb.RoomExtendUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.roomExtendUpdate(ctx, req)
- if err == nil {
- fields := make([]string, 0, len(req.Fields))
- data := &v1pb.RoomData{
- RoomId: req.RoomId,
- AnchorLevel: new(v1pb.AnchorLevel),
- }
- for _, f := range req.Fields {
- switch f {
- case "keyframe":
- data.Keyframe = req.Keyframe
- case "popularity_count":
- data.PopularityCount = req.PopularityCount
- default:
- continue
- }
- fields = append(fields, f)
- }
- d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
- }
- return
- }
- // RoomExtendBatchUpdate implementation
- // RoomExtendBatchUpdate 房间更新
- func (d *Dao) RoomExtendBatchUpdate(ctx context.Context, req *v1pb.RoomExtendBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp = &v1pb.UpdateResp{}
- for _, r := range req.Reqs {
- res, err := d.RoomExtendUpdate(ctx, r)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|RoomExtendBatchUpdate] update room extend record error(%v), req(%v)", err, r)
- return nil, err
- }
- resp.AffectedRows += res.AffectedRows
- }
- return
- }
- // RoomExtendIncre implementation
- // RoomExtendIncre 房间增量更新
- func (d *Dao) RoomExtendIncre(ctx context.Context, req *v1pb.RoomExtendIncreReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.roomExtendIncre(ctx, req)
- if err == nil {
- fields := make([]string, 0, len(req.Fields))
- data := &v1pb.RoomData{
- RoomId: req.RoomId,
- AnchorLevel: new(v1pb.AnchorLevel),
- }
- for _, f := range req.Fields {
- switch f {
- case "popularity_count":
- data.PopularityCount = req.PopularityCount
- default:
- continue
- }
- fields = append(fields, f)
- }
- if len(fields) > 0 {
- d.redisIncreRoomInfo(ctx, data.RoomId, fields, data)
- }
- }
- return
- }
- // RoomExtendBatchIncre implementation
- // RoomExtendBatchIncre 房间增量更新
- func (d *Dao) RoomExtendBatchIncre(ctx context.Context, req *v1pb.RoomExtendBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
- resp = &v1pb.UpdateResp{}
- for _, r := range req.Reqs {
- res, err := d.RoomExtendIncre(ctx, r)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|RoomExtendBatchIncre] update room extend increment record error(%v), req(%v)", err, r)
- return nil, err
- }
- resp.AffectedRows += res.AffectedRows
- }
- return
- }
- // RoomTagCreate implementation
- // RoomTagCreate 房间Tag创建
- func (d *Dao) RoomTagCreate(ctx context.Context, req *v1pb.RoomTagCreateReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.roomTagCreate(ctx, req)
- if err == nil {
- tag := &v1pb.TagData{
- TagId: req.TagId,
- TagSubId: req.TagSubId,
- TagValue: req.TagValue,
- TagExt: req.TagExt,
- TagExpireAt: req.TagExpireAt,
- }
- d.redisAddTag(ctx, req.RoomId, tag)
- }
- return
- }
- // RoomAttrCreate implementation
- // RoomAttrCreate 房间Attr创建
- func (d *Dao) RoomAttrCreate(ctx context.Context, req *v1pb.RoomAttrCreateReq) (resp *v1pb.UpdateResp, err error) {
- return d.roomAttrCreate(ctx, req)
- }
- // RoomAttrSetEx implementation
- // RoomAttrSetEx 房间Attr更新
- func (d *Dao) RoomAttrSetEx(ctx context.Context, req *v1pb.RoomAttrSetExReq) (resp *v1pb.UpdateResp, err error) {
- return d.roomAttrSetEx(ctx, req)
- }
- // AnchorUpdate implementation
- // AnchorUpdate 主播更新
- func (d *Dao) AnchorUpdate(ctx context.Context, req *v1pb.AnchorUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.anchorUpdate(ctx, req)
- if err == nil {
- roomID := d.dbFetchRoomIDByUID(ctx, req.Uid)
- if roomID == 0 {
- return
- }
- fields := make([]string, 0, len(req.Fields))
- data := &v1pb.RoomData{
- RoomId: roomID,
- AnchorLevel: new(v1pb.AnchorLevel),
- }
- for _, f := range req.Fields {
- switch f {
- case "profile_type":
- f = "anchor_profile_type"
- data.AnchorProfileType = req.ProfileType
- case "san_score":
- f = "anchor_san"
- data.AnchorSan = req.SanScore
- case "round_status":
- f = "anchor_round_switch"
- data.AnchorRoundSwitch = req.RoundStatus
- case "record_status":
- f = "anchor_record_switch"
- data.AnchorRecordSwitch = req.RecordStatus
- case "exp":
- f = "anchor_exp"
- data.AnchorLevel.Score = req.Exp
- default:
- log.Error("[dao.dao-anchor.mysql|anchorUpdate] unsupported field(%v), req(%s)", f, req)
- err = ecode.InvalidParam
- return
- }
- fields = append(fields, f)
- }
- d.redisSetRoomInfo(ctx, data.RoomId, fields, data, true)
- }
- return
- }
- // AnchorBatchUpdate implementation
- // AnchorBatchUpdate 主播更新
- func (d *Dao) AnchorBatchUpdate(ctx context.Context, req *v1pb.AnchorBatchUpdateReq) (resp *v1pb.UpdateResp, err error) {
- resp = &v1pb.UpdateResp{}
- for _, r := range req.Reqs {
- res, err := d.AnchorUpdate(ctx, r)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|AnchorBatchUpdate] update anchor record error(%v), req(%v)", err, r)
- return nil, err
- }
- resp.AffectedRows += res.AffectedRows
- }
- return
- }
- // AnchorIncre implementation
- // AnchorIncre 主播增量更新
- func (d *Dao) AnchorIncre(ctx context.Context, req *v1pb.AnchorIncreReq) (resp *v1pb.UpdateResp, err error) {
- resp, err = d.anchorIncre(ctx, req)
- if err == nil {
- roomID := d.dbFetchRoomIDByUID(ctx, req.Uid)
- if roomID == 0 {
- return
- }
- fields := make([]string, 0, len(req.Fields))
- data := &v1pb.RoomData{
- RoomId: roomID,
- AnchorLevel: new(v1pb.AnchorLevel),
- }
- for _, f := range req.Fields {
- switch f {
- case "san_score":
- f = "anchor_san"
- data.AnchorSan = req.SanScore
- case "exp":
- f = "anchor_exp"
- data.AnchorLevel.Score = req.Exp
- default:
- continue
- }
- fields = append(fields, f)
- }
- if len(fields) > 0 {
- d.redisIncreRoomInfo(ctx, data.RoomId, fields, data)
- }
- }
- return
- }
- // AnchorBatchIncre implementation
- // AnchorBatchIncre 主播增量更新
- func (d *Dao) AnchorBatchIncre(ctx context.Context, req *v1pb.AnchorBatchIncreReq) (resp *v1pb.UpdateResp, err error) {
- resp = &v1pb.UpdateResp{}
- for _, r := range req.Reqs {
- res, err := d.AnchorIncre(ctx, r)
- if err != nil {
- log.Error("[dao.dao-anchor.mysql|AnchorBatchIncre] update anchor increment record error(%v), req(%v)", err, r)
- return nil, err
- }
- resp.AffectedRows += res.AffectedRows
- }
- return
- }
- // FetchAreas implementation
- // FetchAreas 根据父分区号查询子分区
- func (d *Dao) FetchAreas(ctx context.Context, req *v1pb.FetchAreasReq) (resp *v1pb.FetchAreasResp, err error) {
- return d.fetchAreas(ctx, req)
- }
- // FetchAttrByIDs implementation
- // FetchAttrByIDs 批量根据房间号查询指标
- func (d *Dao) FetchAttrByIDs(ctx context.Context, req *v1pb.FetchAttrByIDsReq) (resp *v1pb.FetchAttrByIDsResp, err error) {
- return d.fetchAttrByIDs(ctx, req)
- }
- // DeleteAttr implementation
- // DeleteAttr 删除一个指标
- func (d *Dao) DeleteAttr(ctx context.Context, req *v1pb.DeleteAttrReq) (resp *v1pb.UpdateResp, err error) {
- return d.deleteAttr(ctx, req)
- }
- type msgVal struct {
- MsgID string `json:"msg_id"`
- }
- func getConsumedKey(topic string, msgID string) string {
- return fmt.Sprintf("consumed:%s:%s", topic, msgID)
- }
- // 清除消费过的记录,主要用于测试
- func (d *Dao) clearConsumed(ctx context.Context, msg *databus.Message) {
- val := &msgVal{}
- err := jsonitor.Unmarshal(msg.Value, val)
- if err != nil {
- return
- }
- conn := d.redis.Get(ctx)
- defer conn.Close()
- conn.Do("DEL", getConsumedKey(msg.Topic, val.MsgID))
- }
- // CanConsume 是否可以消费
- func (d *Dao) CanConsume(ctx context.Context, msg *databus.Message) bool {
- val := &msgVal{}
- err := jsonitor.Unmarshal(msg.Value, val)
- if err != nil {
- log.Error("unmarshal msg value error %+v, value: %s", err, string(msg.Value))
- return true
- }
- if val.MsgID == "" {
- log.Warn("msg_id is empty ; value: %s", string(msg.Value))
- return true
- }
- conn := d.redis.Get(ctx)
- defer conn.Close()
- var key = getConsumedKey(msg.Topic, val.MsgID)
- reply, err := conn.Do("SET", key, "1", "NX", "EX", 86400) // 24 hours
- if err == nil {
- if reply == nil {
- log.Info("Already consumed key:%s", key)
- return false
- } else {
- return true
- }
- }
- if err == redis.ErrNil {
- //already consumed
- log.Info("Already consumed key:%s", key)
- return false
- }
- // other redis error happenned, let it pass
- log.Error("redis error when resolve CanConsume %+v", err)
- return true
- }
|