send.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. broadcasrtService "go-common/app/service/live/broadcast-proxy/api/v1"
  7. "go-common/app/service/live/live-dm/model"
  8. roomService "go-common/app/service/live/room/api/liverpc/v1"
  9. "go-common/library/cache/redis"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _adminMsgHistoryCache = "cache:last10_roomadminmsg:"
  14. _msgHistoryCache = "cache:last10_roommsg:"
  15. )
  16. //SaveHistory 弹幕历史存入redis
  17. func (d *Dao) SaveHistory(ctx context.Context, hm string, adm bool, rid int64) {
  18. var conn = d.redis.Get(ctx)
  19. defer conn.Close()
  20. if adm {
  21. admKey := fmt.Sprintf("%s%d", _adminMsgHistoryCache, rid)
  22. if err := conn.Send("LPUSH", admKey, hm); err != nil {
  23. log.Error("DM: SaveHistory LPUSH err: %v", err)
  24. }
  25. if err := conn.Send("LLEN", admKey); err != nil {
  26. log.Error("DM: SaveHistory LLEN err: %v", err)
  27. return
  28. }
  29. if err := conn.Flush(); err != nil {
  30. log.Error("DM: SaveHistory Flush err: %v", err)
  31. return
  32. }
  33. if _, err := conn.Receive(); err != nil {
  34. log.Error("DM: SaveHistory LPUSH err: %v", err)
  35. }
  36. count, err := redis.Int64(conn.Receive())
  37. if err != nil {
  38. log.Error("DM: SaveHistory LPUSH LLEN err: %v", err)
  39. return
  40. }
  41. if count > 15 {
  42. err := conn.Send("LTRIM", admKey, 0, 9)
  43. if err != nil {
  44. log.Error("DM: SaveHistory LTRIM err: %v", err)
  45. }
  46. }
  47. if err := conn.Send("EXPIRE", admKey, 86400); err != nil {
  48. log.Error("DM: SaveHistory EXPIRE err: %v", err)
  49. }
  50. if err := conn.Flush(); err != nil {
  51. log.Error("DM: SaveHistory Flush err: %v", err)
  52. }
  53. }
  54. userKey := fmt.Sprintf("%s%d", _msgHistoryCache, rid)
  55. if err := conn.Send("LPUSH", userKey, hm); err != nil {
  56. log.Error("DM: SaveHistory LPUSH err: %v", err)
  57. }
  58. if err := conn.Send("LLEN", userKey); err != nil {
  59. log.Error("DM: SaveHistory LLEN err: %v", err)
  60. return
  61. }
  62. if err := conn.Flush(); err != nil {
  63. log.Error("DM: SaveHistory Flush err: %v", err)
  64. return
  65. }
  66. if _, err := conn.Receive(); err != nil {
  67. log.Error("DM: SaveHistory Receive LPUSH err: %v", err)
  68. }
  69. count, err := redis.Int64(conn.Receive())
  70. if err != nil {
  71. log.Error("DM: SaveHistory Int64 err: %v", err)
  72. return
  73. }
  74. if count > 15 {
  75. if err := conn.Send("LTRIM", userKey, 0, 9); err != nil {
  76. log.Error("DM: SaveHistory LTRIM err: %v", err)
  77. }
  78. }
  79. if err := conn.Send("EXPIRE", userKey, 86400); err != nil {
  80. log.Error("DM: SaveHistory EXPIRE err: %v", err)
  81. }
  82. if err := conn.Flush(); err != nil {
  83. log.Error("DM: SaveHistory Flush err: %v", err)
  84. }
  85. }
  86. //IncrDMNum 弹幕条数
  87. func IncrDMNum(ctx context.Context, rid int64, mode int64) {
  88. req := &roomService.RoomIncrDanmuSendNumReq{
  89. RoomId: rid,
  90. Mode: mode,
  91. }
  92. resp, err := RoomServiceClient.V1Room.IncrDanmuSendNum(ctx, req)
  93. if err != nil {
  94. log.Error("DM: IncrDMNum err: %v", err)
  95. return
  96. }
  97. if resp.Code != 0 {
  98. log.Error("DM: IncrDMNum err code: %d", resp.Code)
  99. return
  100. }
  101. }
  102. //SendBroadCast 发送弹幕(http)
  103. func SendBroadCast(ctx context.Context, sm string, rid int64) error {
  104. err := LiveBroadCastClient.PushBroadcast(ctx, rid, 0, sm)
  105. if err != nil {
  106. log.Error("DM: SendBroadCast err: %v", err)
  107. return err
  108. }
  109. return nil
  110. }
  111. //SendBroadCastGrpc 调用GRPC发送弹幕
  112. func SendBroadCastGrpc(ctx context.Context, sm string, rid int64) error {
  113. req := &broadcasrtService.RoomMessageRequest{
  114. RoomId: int32(rid),
  115. Message: sm,
  116. }
  117. _, err := BcastClient.DanmakuClient.RoomMessage(ctx, req)
  118. if err != nil {
  119. log.Error("DM: SendBroadCastGrpc err: %v", err)
  120. return err
  121. }
  122. return nil
  123. }
  124. //SendBNDatabus 拜年祭制定房间投递到databus
  125. func SendBNDatabus(ctx context.Context, uid int64, info *model.BNDatabus) {
  126. uids := strconv.FormatInt(uid, 10)
  127. if err := bndatabus.Send(ctx, uids, info); err != nil {
  128. log.Error("[service.live-dm.v1.bndatabus] send error(%v), record(%v)", err, info)
  129. }
  130. }