dao.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/service/live/rtc/internal/model"
  5. "go-common/library/cache/redis"
  6. "time"
  7. "go-common/app/service/live/rtc/internal/conf"
  8. xsql "go-common/library/database/sql"
  9. )
  10. // Dao dao
  11. type Dao struct {
  12. c *conf.Config
  13. //mc *memcache.Pool
  14. redis *redis.Pool
  15. db *xsql.DB
  16. }
  17. // New init mysql db
  18. func New(c *conf.Config) (dao *Dao) {
  19. dao = &Dao{
  20. c: c,
  21. redis: redis.NewPool(c.Redis),
  22. db: xsql.NewMySQL(c.MySQL),
  23. }
  24. return
  25. }
  26. // Close close the resource.
  27. func (d *Dao) Close() {
  28. //d.mc.Close()
  29. d.redis.Close()
  30. d.db.Close()
  31. }
  32. // Ping dao ping
  33. func (d *Dao) Ping(ctx context.Context) error {
  34. // TODO: add mc,redis... if you use
  35. return d.db.Ping(ctx)
  36. }
  37. func (d *Dao) GetMediaSource(ctx context.Context, channelID uint64) ([]*model.RtcMediaSource, error) {
  38. sql := "SELECT `id`,`channel_id`,`user_id`,`type`,`codec`,`media_specific` FROM `rtc_media_source` WHERE `channel_id` = ? AND `status` = 0"
  39. stmt := d.db.Prepared(sql)
  40. defer stmt.Close()
  41. rows, err := stmt.Query(ctx, channelID)
  42. if err != nil {
  43. return nil, err
  44. }
  45. defer rows.Close()
  46. source := make([]*model.RtcMediaSource, 0)
  47. for rows.Next() {
  48. s := &model.RtcMediaSource{}
  49. if err = rows.Scan(&s.SourceID, &s.ChannelID, &s.UserID, &s.Type, &s.Codec, &s.MediaSpecific); err != nil {
  50. return nil, err
  51. }
  52. source = append(source, s)
  53. }
  54. return source, nil
  55. }
  56. func (d *Dao) CreateCall(ctx context.Context, call *model.RtcCall) (uint32, error) {
  57. sql := "INSERT INTO `rtc_call`(`user_id`,`channel_id`,`version`,`token`,`join_time`,`leave_time`,`status`) VALUES(?,?,?,?,?,?,?)"
  58. stmt := d.db.Prepared(sql)
  59. defer stmt.Close()
  60. r, err := stmt.Exec(ctx, call.UserID, call.ChannelID, call.Version, call.Token, call.JoinTime, call.LeaveTime, call.Status)
  61. if err != nil {
  62. return 0, err
  63. }
  64. id, err := r.LastInsertId()
  65. if err != nil {
  66. return 0, err
  67. }
  68. call.CallID = uint32(id)
  69. return call.CallID, nil
  70. }
  71. func (d *Dao) UpdateCallStatus(ctx context.Context, channelID uint64, callID uint32, userID uint64, leave time.Time, status uint8) error {
  72. sql := "UPDATE `rtc_call` SET `leave_time` = ?,`status` = ? WHERE `id` = ? AND `user_id` = ? LIMIT 1"
  73. stmt := d.db.Prepared(sql)
  74. defer stmt.Close()
  75. _, err := stmt.Exec(ctx, leave, status, callID, userID)
  76. if err != nil {
  77. return err
  78. }
  79. return nil
  80. }
  81. func (d *Dao) UpdateMediaSourceStatus(ctx context.Context, channelID uint64, callID uint32, userID uint64, status uint8) error {
  82. sql := "UPDATE `rtc_media_source` SET `status` = ? WHERE `call_id` = ? AND `channel_id` = ? AND `user_id` = ?"
  83. stmt := d.db.Prepared(sql)
  84. defer stmt.Close()
  85. _, err := stmt.Exec(ctx, status, callID, channelID, userID)
  86. if err != nil {
  87. return err
  88. }
  89. return nil
  90. }
  91. func (d *Dao) CreateMediaSource(ctx context.Context, source *model.RtcMediaSource) (uint32, error) {
  92. sql := "INSERT INTO `rtc_media_source`(`channel_id`,`user_id`,`type`,`codec`,`media_specific`,`status`) VALUES(?,?,?,?,?,?)"
  93. stmt := d.db.Prepared(sql)
  94. defer stmt.Close()
  95. r, err := stmt.Exec(ctx, source.ChannelID, source.UserID, source.Type, source.Codec, source.MediaSpecific, source.Status)
  96. if err != nil {
  97. return 0, err
  98. }
  99. id, err := r.LastInsertId()
  100. if err != nil {
  101. return 0, err
  102. }
  103. return uint32(id), nil
  104. }
  105. func (d *Dao) CreateMediaPublish(ctx context.Context, publish *model.RtcMediaPublish) error {
  106. mixConfigSql := "REPLACE INTO `rtc_mix_config`(`call_id`,`config`) VALUES(?,?)"
  107. mixConfigStmt := d.db.Prepared(mixConfigSql)
  108. defer mixConfigStmt.Close()
  109. var err error
  110. mixConfigResult, err := mixConfigStmt.Exec(ctx, publish.CallID, publish.MixConfig)
  111. if err != nil {
  112. return err
  113. }
  114. _, err = mixConfigResult.LastInsertId()
  115. if err != nil {
  116. return err
  117. }
  118. publishSql := "REPLACE INTO `rtc_media_publish`(`call_id`,`channel_id`,`user_id`,`switch`,`width`,`height`,`frame_rate`,`video_codec`,`video_profile`,`channel`,`sample_rate`,`audio_codec`,`bitrate`) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)"
  119. publishStmt := d.db.Prepared(publishSql)
  120. defer publishStmt.Close()
  121. _, err = publishStmt.Exec(ctx, publish.CallID, publish.ChannelID, publish.UserID, publish.Switch,
  122. publish.Width, publish.Height, publish.FrameRate, publish.VideoCodec, publish.VideoProfile,
  123. publish.Channel, publish.SampleRate, publish.AudioCodec, publish.Bitrate)
  124. if err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. func (d *Dao) GetMediaPublishConfig(ctx context.Context, channelID uint64, callID uint32) (*model.RtcMediaPublish, error) {
  130. publishSql := "SELECT `user_id`,`switch`,`width`,`height`,`frame_rate`,`video_codec`,`video_profile`,`channel`,`sample_rate`,`audio_codec`,`bitrate`,`mix_config_id` FROM `rtc_media_publish` WHERE `call_id` = ? AND `channel_id` = ? LIMIT 1"
  131. publishStmt := d.db.Prepared(publishSql)
  132. defer publishStmt.Close()
  133. publishRow := publishStmt.QueryRow(ctx, callID, channelID)
  134. var publish model.RtcMediaPublish
  135. var mixConfigID uint32
  136. if err := publishRow.Scan(&publish.UserID, &publish.Switch, &publish.Width, &publish.Height, &publish.FrameRate,
  137. &publish.VideoCodec, &publish.VideoProfile, &publish.Channel, &publish.SampleRate,
  138. &publish.AudioCodec, &publish.Bitrate, &mixConfigID); err != nil {
  139. return nil, err
  140. }
  141. mixConfigSql := "SELECT `config` FROM `rtc_mix_config` WHERE `id` = ? "
  142. mixConfigStmt := d.db.Prepared(mixConfigSql)
  143. defer mixConfigStmt.Close()
  144. mixConfigRow := mixConfigStmt.QueryRow(ctx, mixConfigID)
  145. if err := mixConfigRow.Scan(&publish.MixConfig); err != nil {
  146. return nil, err
  147. }
  148. return &publish, nil
  149. }
  150. func (d *Dao) UpdateMediaPublishConfig(ctx context.Context, channelID uint64, callID uint32, config string) error {
  151. sql := "UPDATE `rtc_mix_config` SET `config` = ? WHERE `call_id` = ? LIMIT 1"
  152. stmt := d.db.Prepared(sql)
  153. defer stmt.Close()
  154. _, err := stmt.Exec(ctx, config, callID)
  155. if err != nil {
  156. return err
  157. }
  158. return nil
  159. }
  160. func (d *Dao) TerminateStream(ctx context.Context, channelID uint64, callID uint32) error {
  161. sql := "UPDATE `rtc_media_publish` SET `switch` = 0 WHERE `call_id` = ? LIMIT 1"
  162. stmt := d.db.Prepared(sql)
  163. defer stmt.Close()
  164. _, err := stmt.Exec(ctx, callID)
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. func (d *Dao) GetChannelIP(ctx context.Context, channelID uint64) ([]string, error) {
  171. sql := "SELECT `ip` FROM `rtc_call` WHERE `channel_id` = ? AND `status` = 0"
  172. stmt := d.db.Prepared(sql)
  173. defer stmt.Close()
  174. rows, err := stmt.Query(ctx, channelID)
  175. if err != nil {
  176. return nil, err
  177. }
  178. defer rows.Close()
  179. result := make([]string, 0)
  180. for rows.Next() {
  181. var ip string
  182. if err = rows.Scan(&ip); err != nil {
  183. return nil, err
  184. }
  185. result = append(result, ip)
  186. }
  187. return result, nil
  188. }
  189. func (d *Dao) GetToken(ctx context.Context, channelID uint64, callID uint32) (string, error) {
  190. sql := "SELECT `token` FROM `rtc_call` WHERE `id` = ? AND `channel_id` = ?"
  191. stmt := d.db.Prepared(sql)
  192. defer stmt.Close()
  193. row := stmt.QueryRow(ctx, callID, channelID)
  194. var token string
  195. err := row.Scan(&token)
  196. if err == xsql.ErrNoRows {
  197. err = nil
  198. }
  199. return token, err
  200. }