view.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package archive
  2. import (
  3. "context"
  4. history "go-common/app/interface/main/history/model"
  5. "go-common/app/interface/main/tv/model/view"
  6. arcwar "go-common/app/service/main/archive/api"
  7. "go-common/library/cache/memcache"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. "go-common/library/net/metadata"
  11. "github.com/pkg/errors"
  12. )
  13. // Archive3 picks one archive data
  14. func (d *Dao) Archive3(c context.Context, aid int64) (arc *arcwar.Arc, err error) {
  15. var (
  16. arcReply *arcwar.ArcReply
  17. arg = &arcwar.ArcRequest{Aid: aid}
  18. )
  19. arcReply, err = d.arcClient.Arc(c, arg)
  20. if err != nil {
  21. log.Error("d.arcRPC.Archive(%v) error(%+v)", arg, err)
  22. if ecode.Cause(err) == ecode.NothingFound {
  23. err = nil
  24. return
  25. }
  26. }
  27. arc = arcReply.Arc
  28. return
  29. }
  30. // LoadViews picks view meta information from Cache & RPC
  31. func (d *Dao) LoadViews(ctx context.Context, aids []int64) (resMetas map[int64]*arcwar.ViewReply) {
  32. var (
  33. missedMetas = make(map[int64]*arcwar.ViewReply)
  34. addCache = true
  35. )
  36. resMetas = make(map[int64]*arcwar.ViewReply, len(aids))
  37. cachedMetas, missed, err := d.ViewsCache(ctx, aids)
  38. if err != nil {
  39. log.Error("LoadViews ViewsCache Sids:%v, Error:%v", aids, err)
  40. err = nil
  41. addCache = false // mc error, we don't add
  42. }
  43. if len(missed) > 0 {
  44. for _, vv := range missed {
  45. if vp, _ := d.view3(ctx, vv); vp != nil {
  46. missedMetas[vv] = vp
  47. resMetas[vv] = vp
  48. }
  49. }
  50. }
  51. // merge info from DB and the info from MC
  52. for sid, v := range cachedMetas {
  53. resMetas[sid] = v
  54. }
  55. log.Info("LoadViews Info Hit %d, Missed %d", len(cachedMetas), len(missed))
  56. if addCache && len(missedMetas) > 0 { // async Reset the DB data in MC for next time
  57. log.Info("Set MissedMetas %d Data in MC", missedMetas)
  58. for aid, vp := range missedMetas {
  59. d.AddViewCache(aid, vp)
  60. }
  61. }
  62. return
  63. }
  64. // GetView gets the aid's View info from Cache or RPC
  65. func (d *Dao) GetView(c context.Context, aid int64) (vp *arcwar.ViewReply, err error) {
  66. var (
  67. addCache = false
  68. )
  69. if vp, err = d.viewCache(c, aid); err != nil {
  70. log.Error("ViewPage viewCache AID %d, Err %v", aid, err)
  71. }
  72. if !validView(vp, true) { // back source
  73. if vp, err = d.view3(c, aid); err != nil {
  74. log.Error("%+v", err)
  75. err = ecode.NothingFound
  76. return
  77. }
  78. addCache = true
  79. }
  80. if !validView(vp, false) {
  81. err = ecode.NothingFound
  82. return
  83. }
  84. if addCache {
  85. d.AddViewCache(aid, vp)
  86. return
  87. }
  88. return
  89. }
  90. // view3 view archive with pages pb.
  91. func (d *Dao) view3(c context.Context, aid int64) (reply *arcwar.ViewReply, err error) {
  92. var arg = &arcwar.ViewRequest{Aid: aid}
  93. if reply, err = d.arcClient.View(c, arg); err != nil {
  94. log.Error("d.arcRPC.view3(%v) error(%+v)", arg, err)
  95. if ecode.Cause(err) == ecode.NothingFound {
  96. err = nil
  97. return
  98. }
  99. }
  100. return
  101. }
  102. // viewCache get view cache from remote memecache .
  103. func (d *Dao) viewCache(c context.Context, aid int64) (vs *arcwar.ViewReply, err error) {
  104. conn := d.arcMc.Get(c)
  105. key := keyView(aid)
  106. defer conn.Close()
  107. r, err := conn.Get(key)
  108. if err != nil {
  109. if err == memcache.ErrNotFound {
  110. err = nil
  111. return
  112. }
  113. err = errors.Wrapf(err, "conn.Get(%s)", key)
  114. return
  115. }
  116. vs = &arcwar.ViewReply{Arc: &arcwar.Arc{}}
  117. if err = conn.Scan(r, vs); err != nil {
  118. vs = nil
  119. err = errors.Wrapf(err, "conn.Scan(%s)", r.Value)
  120. }
  121. return
  122. }
  123. // Progress is archive plays progress .
  124. func (d *Dao) Progress(c context.Context, aid, mid int64) (h *view.History, err error) {
  125. ip := metadata.String(c, metadata.RemoteIP)
  126. arg := &history.ArgPro{Mid: mid, Aids: []int64{aid}, RealIP: ip}
  127. his, err := d.hisRPC.Progress(c, arg)
  128. if err != nil {
  129. log.Error("d.hisRPC.Progress(%v) error(%v)", arg, err)
  130. return
  131. }
  132. if his[aid] != nil {
  133. h = &view.History{Cid: his[aid].Cid, Progress: his[aid].Pro}
  134. }
  135. return
  136. }
  137. // ViewsCache pick view from cache
  138. func (d *Dao) ViewsCache(c context.Context, aids []int64) (cached map[int64]*arcwar.ViewReply, missed []int64, err error) {
  139. if len(aids) == 0 {
  140. return
  141. }
  142. var allKeys []string
  143. cached = make(map[int64]*arcwar.ViewReply, len(aids))
  144. idmap := make(map[string]int64, len(aids))
  145. for _, id := range aids {
  146. k := keyView(id)
  147. allKeys = append(allKeys, k)
  148. idmap[k] = id
  149. }
  150. conn := d.arcMc.Get(c)
  151. defer conn.Close()
  152. replys, err := conn.GetMulti(allKeys)
  153. if err != nil {
  154. log.Error("conn.Gets(%v) error(%v)", allKeys, err)
  155. err = nil
  156. return
  157. }
  158. for key, item := range replys {
  159. vp := &arcwar.ViewReply{}
  160. if err = conn.Scan(item, vp); err != nil {
  161. log.Error("item.Scan(%s) error(%v)", item.Value, err)
  162. err = nil
  163. continue
  164. }
  165. if !validView(vp, true) {
  166. continue
  167. }
  168. cached[idmap[key]] = vp
  169. delete(idmap, key)
  170. }
  171. missed = make([]int64, 0, len(idmap))
  172. for _, id := range idmap {
  173. missed = append(missed, id)
  174. }
  175. return
  176. }
  177. // Archives multi get archives.
  178. func (d *Dao) Archives(c context.Context, aids []int64) (as map[int64]*arcwar.Arc, err error) {
  179. if len(aids) == 0 {
  180. return
  181. }
  182. var (
  183. missed []int64
  184. tmp map[int64]*arcwar.Arc
  185. reply *arcwar.ArcsReply
  186. )
  187. if as, missed, err = d.arcsCache(c, aids); err != nil {
  188. as = make(map[int64]*arcwar.Arc, len(aids))
  189. missed = aids
  190. log.Error("%+v", err)
  191. err = nil
  192. }
  193. if len(missed) == 0 {
  194. return
  195. }
  196. arg := &arcwar.ArcsRequest{Aids: missed}
  197. if reply, err = d.arcClient.Arcs(c, arg); err != nil {
  198. log.Error("d.arcRPC.Archives3(%v) error(%v)", arg, err)
  199. if reply, err = d.arcClient.Arcs(c, arg); err != nil {
  200. err = errors.Wrapf(err, "%v", arg)
  201. return
  202. }
  203. }
  204. tmp = reply.Arcs
  205. for aid, a := range tmp {
  206. as[aid] = a
  207. d.AddArcCache(aid, a) // re-fill the cache
  208. }
  209. return
  210. }