up_base_info.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package upcrmservice
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "time"
  7. "go-common/app/admin/main/up/dao/global"
  8. "go-common/app/admin/main/up/model/datamodel"
  9. "go-common/app/admin/main/up/model/upcrmmodel"
  10. "go-common/app/admin/main/up/util"
  11. accgrpc "go-common/app/service/main/account/api"
  12. "go-common/library/database/elastic"
  13. "go-common/library/log"
  14. "go-common/library/net/metadata"
  15. "go-common/library/sync/errgroup"
  16. )
  17. const (
  18. maxSearchItemCount = 10000
  19. maxBatchCount = 100
  20. )
  21. var (
  22. //ErrTooManySearchItem too many search items, 搜索只支持返回前10000条数据
  23. ErrTooManySearchItem = errors.New("筛选仅支持展示前1万条")
  24. //ErrNoMid no mid
  25. ErrNoMid = errors.New("Mid为空")
  26. )
  27. //UpBaseInfoQuery query
  28. func (s *Service) UpBaseInfoQuery(context context.Context, arg *upcrmmodel.InfoQueryArgs) (result *upcrmmodel.InfoQueryResult, err error) {
  29. var data, e = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByMid, arg.Mid)
  30. err = e
  31. // 没找到按照错误处理
  32. if err != nil || len(data) == 0 {
  33. log.Error("get from db fail, req=%+v, err=%+v", arg, err)
  34. return
  35. }
  36. result = data[0]
  37. log.Info("query base info ok, req=%+v, err=%+v", arg, result)
  38. return
  39. }
  40. //QueryDbFunc query func type
  41. type QueryDbFunc func(fields string, mid ...int64) (result []upcrmmodel.UpBaseInfo, err error)
  42. func (s *Service) upBaseInfoQueryBatch(queryfunc QueryDbFunc, ids ...int64) (result []*upcrmmodel.InfoQueryResult, err error) {
  43. var data, e = queryfunc("*", ids...)
  44. err = e
  45. if err != nil {
  46. log.Error("get from db fail, err=%+v", err)
  47. return
  48. }
  49. for _, v := range data {
  50. var info = upcrmmodel.InfoQueryResult{}
  51. info.CopyFromBaseInfo(v)
  52. info.CalculateAttr()
  53. result = append(result, &info)
  54. }
  55. return
  56. }
  57. // UpAccountInfo get account info
  58. func (s *Service) UpAccountInfo(c context.Context, arg *upcrmmodel.InfoAccountInfoArgs) (res []*accgrpc.Info, err error) {
  59. var (
  60. infosReply *accgrpc.InfosReply
  61. mids = util.ExplodeInt64(arg.Mids, ",")
  62. )
  63. if infosReply, err = global.GetAccClient().Infos3(c, &accgrpc.MidsReq{Mids: mids, RealIp: metadata.String(c, metadata.RemoteIP)}); err != nil {
  64. return
  65. }
  66. if infosReply == nil || infosReply.Infos == nil {
  67. return
  68. }
  69. for _, v := range infosReply.Infos {
  70. res = append(res, v)
  71. }
  72. log.Info("query acount info ok, req=%+v, result=%+v", arg, res)
  73. return
  74. }
  75. //SearchResult struct
  76. type SearchResult struct {
  77. AccountState int `json:"account_state"`
  78. Activity int `json:"activity"`
  79. Attr int `json:"attr"`
  80. ArticleCountAccumulate int `json:"article_count_accumulate"`
  81. ID uint32
  82. Mid int64
  83. }
  84. func getAttrFormat(attrs upcrmmodel.UpAttr) (result []int) {
  85. // 什么要shift,因为es的位是从1开始的,而存储的位是从0开始的
  86. const shift = 1
  87. if attrs.AttrVideo != 0 {
  88. result = append(result, upcrmmodel.AttrBitVideo+shift)
  89. }
  90. if attrs.AttrAudio != 0 {
  91. result = append(result, upcrmmodel.AttrBitAudio+shift)
  92. }
  93. if attrs.AttrArticle != 0 {
  94. result = append(result, upcrmmodel.AttrBitArticle+shift)
  95. }
  96. if attrs.AttrPhoto != 0 {
  97. result = append(result, upcrmmodel.AttrBitPhoto+shift)
  98. }
  99. if attrs.AttrSign != 0 {
  100. result = append(result, upcrmmodel.AttrBitSign+shift)
  101. }
  102. if attrs.AttrGrowup != 0 {
  103. result = append(result, upcrmmodel.AttrBitGrowup+shift)
  104. }
  105. if attrs.AttrVerify != 0 {
  106. result = append(result, upcrmmodel.AttrBitVerify+shift)
  107. }
  108. return
  109. }
  110. func getEsCombo(attrs upcrmmodel.UpAttr) (combos []*elastic.Combo) {
  111. const shift = 1
  112. var attrs1, attrs2 []interface{}
  113. var attrFlagList = getAttrFormat(attrs)
  114. for _, v := range attrFlagList {
  115. if _, ok := upcrmmodel.AttrGroup1[v-shift]; ok {
  116. attrs1 = append(attrs1, v)
  117. } else if _, ok := upcrmmodel.AttrGroup2[v-shift]; ok {
  118. attrs2 = append(attrs2, v)
  119. }
  120. }
  121. if attrs1 != nil {
  122. var attrGroup = make(map[string][]interface{})
  123. attrGroup["attr_format"] = attrs1
  124. cmb := &elastic.Combo{}
  125. cmb.ComboIn([]map[string][]interface{}{
  126. attrGroup},
  127. ).MinIn(1).MinAll(1)
  128. combos = append(combos, cmb)
  129. }
  130. if attrs2 != nil {
  131. var attrGroup = make(map[string][]interface{})
  132. attrGroup["attr_format"] = attrs2
  133. cmb := &elastic.Combo{}
  134. cmb.ComboIn([]map[string][]interface{}{
  135. attrGroup},
  136. ).MinIn(1).MinAll(1)
  137. combos = append(combos, cmb)
  138. }
  139. return
  140. }
  141. //UpInfoSearch info search
  142. func (s *Service) UpInfoSearch(c context.Context, arg *upcrmmodel.InfoSearchArgs) (result upcrmmodel.InfoSearchResult, err error) {
  143. //调用搜索的接口
  144. var searchData esResult
  145. searchData, err = s.searchFromEs(c, arg)
  146. if err != nil {
  147. log.Error("search arg=%+v, err=%+v", arg, err)
  148. return
  149. }
  150. if len(searchData.Result) == 0 {
  151. log.Info("no data return from search, just return")
  152. return
  153. }
  154. var ids []int64
  155. for _, v := range searchData.Result {
  156. ids = append(ids, int64(v.ID))
  157. }
  158. result.Result, err = s.queryUpBaseInfo(c, ids...)
  159. if err != nil {
  160. log.Error("query up base info fail, err=%+v", err)
  161. return
  162. }
  163. result.PageInfo = searchData.Page.ToPageInfo()
  164. log.Info("res=%+v, page=%+v", searchData, result.PageInfo)
  165. return
  166. }
  167. type esPage struct {
  168. Num int `json:"num"`
  169. Size int `json:"size"`
  170. Total int `json:"total"`
  171. }
  172. //ToPageInfo cast to page info
  173. func (e *esPage) ToPageInfo() (pageInfo upcrmmodel.PageInfo) {
  174. if e == nil {
  175. return
  176. }
  177. pageInfo.TotalCount = e.Total
  178. pageInfo.Size = e.Size
  179. pageInfo.Page = e.Num
  180. return
  181. }
  182. type esResult struct {
  183. Page *esPage `json:"page"`
  184. Result []*SearchResult `json:"result"`
  185. }
  186. func (s *Service) searchFromEs(c context.Context, arg *upcrmmodel.InfoSearchArgs) (searchData esResult, err error) {
  187. if arg.Page*arg.Size > maxSearchItemCount {
  188. err = ErrTooManySearchItem
  189. return
  190. }
  191. var searchSdk = elastic.NewElastic(nil)
  192. var r = searchSdk.NewRequest("up_crm_info")
  193. r.Pn(arg.Page).Ps(arg.Size).Index("up_base_info").
  194. Fields("id")
  195. if arg.Mid != 0 {
  196. r.WhereEq("mid", arg.Mid)
  197. } else {
  198. if arg.AccountState != 0 {
  199. // 字段有0值,所以接口改为1,2表示字段的0,1,接口的0表示没有此条件
  200. var realArg = arg.AccountState - 1
  201. r.WhereEq("account_state", realArg)
  202. }
  203. if arg.Activity != 0 {
  204. r.WhereEq("activity", arg.Activity)
  205. }
  206. var startdate, _ = time.Parse(upcrmmodel.TimeFmtDate, arg.FirstDateBegin)
  207. var enddate, _ = time.Parse(upcrmmodel.TimeFmtDate, arg.FirstDateEnd)
  208. enddate = enddate.AddDate(0, 0, 1)
  209. var startStr = startdate.Format(upcrmmodel.TimeFmtMysql)
  210. var endStr = enddate.Format(upcrmmodel.TimeFmtMysql)
  211. if arg.FirstDateBegin != "" && arg.FirstDateEnd != "" {
  212. r.WhereRange("first_up_time", startStr, endStr, elastic.RangeScopeLcRc)
  213. }
  214. if arg.Order.Order == "" {
  215. arg.Order.Order = "desc"
  216. }
  217. if arg.Order.Field == "" {
  218. arg.Order.Field = "first_up_time"
  219. arg.Order.Order = "desc"
  220. }
  221. var combos = getEsCombo(arg.Attrs)
  222. r.WhereCombo(combos...)
  223. r.Order(arg.Order.Field, arg.Order.Order)
  224. }
  225. err = r.Scan(c, &searchData)
  226. return
  227. }
  228. func (s *Service) queryUpBaseInfo(c context.Context, ids ...int64) (result []*upcrmmodel.InfoQueryResult, err error) {
  229. var group, ctx = errgroup.WithContext(c)
  230. var infoData []*upcrmmodel.InfoQueryResult
  231. group.Go(func() error {
  232. var e error
  233. infoData, e = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByID, ids...)
  234. if e != nil {
  235. err = e
  236. log.Error("get base info fail, err=%+v", err)
  237. }
  238. return nil
  239. })
  240. var tidMap = make(map[int64]*datamodel.UpArchiveTypeData)
  241. var mapLock sync.Mutex
  242. for _, mid := range ids {
  243. group.Go(func() error {
  244. var arg = datamodel.GetUpArchiveTypeInfoArg{Mid: mid}
  245. var tidData, e = s.dataService.GetUpArchiveTypeInfo(ctx, &arg)
  246. if e != nil || tidData == nil {
  247. log.Error("get up type info err, err=%v", e)
  248. return nil
  249. }
  250. mapLock.Lock()
  251. tidMap[arg.Mid] = tidData
  252. mapLock.Unlock()
  253. return nil
  254. })
  255. }
  256. if err = group.Wait(); err != nil {
  257. log.Error("get data error, err=%v", err)
  258. return
  259. }
  260. var infoIDMap = make(map[uint32]*upcrmmodel.InfoQueryResult)
  261. for _, v := range infoData {
  262. infoIDMap[v.ID] = v
  263. }
  264. for _, v := range ids {
  265. var info, ok = infoIDMap[uint32(v)]
  266. if !ok {
  267. continue
  268. }
  269. if typeInfo, ok := tidMap[v]; ok {
  270. info.ActiveTid = typeInfo.Tid
  271. info.ActiveSubtid = typeInfo.SubTid
  272. }
  273. result = append(result, info)
  274. }
  275. return
  276. }
  277. //QueryUpInfoWithViewerData query with view data
  278. func (s *Service) QueryUpInfoWithViewerData(c context.Context, arg *upcrmmodel.UpInfoWithViewerArg) (result upcrmmodel.UpInfoWithViewerResult, err error) {
  279. if arg.Page*arg.Size > maxSearchItemCount {
  280. err = ErrTooManySearchItem
  281. return
  282. }
  283. // 如果是0,则默认设置所有的tag
  284. if arg.Flag == 0 {
  285. arg.Flag = -1
  286. }
  287. var mids []int64
  288. if arg.Mids != "" {
  289. mids = util.ExplodeInt64(arg.Mids, ",")
  290. var midlen = len(mids)
  291. if midlen == 0 {
  292. err = ErrNoMid
  293. log.Error("no mid get from mids, arg=%+v", arg)
  294. return
  295. }
  296. if midlen > maxBatchCount {
  297. mids = mids[:maxBatchCount]
  298. }
  299. } else {
  300. var searchSdk = elastic.NewElastic(nil)
  301. var r = searchSdk.NewRequest("up_crm_info")
  302. if arg.Size > maxBatchCount {
  303. arg.Size = maxBatchCount
  304. }
  305. r.Pn(arg.Page).Ps(arg.Size).Index("up_base_info").
  306. Fields("mid").
  307. Order(arg.Sort, arg.Order)
  308. var searchData esResult
  309. err = r.Scan(c, &searchData)
  310. if err != nil {
  311. log.Error("fail to get from search, arg=%+v", arg)
  312. return
  313. }
  314. for _, v := range searchData.Result {
  315. mids = append(mids, int64(v.Mid))
  316. }
  317. result.PageInfo = searchData.Page.ToPageInfo()
  318. }
  319. var group, ctx = errgroup.WithContext(c)
  320. var infoData []*upcrmmodel.InfoQueryResult
  321. var playData []*upcrmmodel.UpPlayInfo
  322. group.Go(func() error {
  323. if arg.Flag&upcrmmodel.FlagUpBaseData != 0 {
  324. infoData, err = s.upBaseInfoQueryBatch(s.crmdb.QueryUpBaseInfoBatchByMid, mids...)
  325. if err != nil {
  326. log.Error("query up base error, err=%v", err)
  327. return err
  328. }
  329. }
  330. if arg.Flag&upcrmmodel.FlagUpPlayData != 0 {
  331. playData, err = s.crmdb.QueryPlayInfoBatch(mids, upcrmmodel.BusinessTypeVideo)
  332. if err != nil {
  333. log.Error("query play info err, err=%v", err)
  334. return err
  335. }
  336. }
  337. return nil
  338. })
  339. var dataMap = make(map[int64]*upcrmmodel.UpInfoWithViewerData)
  340. if arg.Flag&upcrmmodel.FlagViewData != 0 {
  341. for _, v := range mids {
  342. var mid = v // copy this v
  343. group.Go(func() error {
  344. var info, e = s.dataService.GetViewData(ctx, mid)
  345. if e != nil {
  346. err = e
  347. log.Error("query up view info from hbase error, err=%v", err)
  348. return err
  349. }
  350. var data = getOrCreateUpInfo(dataMap, mid)
  351. data.ViewerBase = info.Base
  352. data.ViewerTrend = info.Trend
  353. data.ViewerArea = info.Area
  354. return err
  355. })
  356. }
  357. }
  358. if err = group.Wait(); err != nil {
  359. log.Error("get data fail, err=%v", err)
  360. return
  361. }
  362. for _, baseInfo := range infoData {
  363. var data = getOrCreateUpInfo(dataMap, baseInfo.Mid)
  364. data.UpBaseInfo = baseInfo
  365. }
  366. for _, playInfo := range playData {
  367. var data = getOrCreateUpInfo(dataMap, playInfo.Mid)
  368. data.UpPlayInfo = playInfo
  369. }
  370. for _, mid := range mids {
  371. var data, ok = dataMap[mid]
  372. if !ok {
  373. log.Warn("up info not found, mid=%d", mid)
  374. continue
  375. }
  376. data.Mid = mid
  377. result.Result = append(result.Result, data)
  378. }
  379. log.Info("query up with view ok, arg=%+v, result count=%d", arg, len(mids))
  380. return
  381. }
  382. var dataMapMutex sync.Mutex
  383. func getOrCreateUpInfo(dataMap map[int64]*upcrmmodel.UpInfoWithViewerData, mid int64) (result *upcrmmodel.UpInfoWithViewerData) {
  384. dataMapMutex.Lock()
  385. defer dataMapMutex.Unlock()
  386. var ok bool
  387. if result, ok = dataMap[mid]; !ok {
  388. result = &upcrmmodel.UpInfoWithViewerData{}
  389. dataMap[mid] = result
  390. }
  391. return result
  392. }