hbase_set2.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package data
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "go-common/app/admin/main/up/model/datamodel"
  8. "go-common/app/admin/main/up/util/hbaseutil"
  9. "go-common/library/log"
  10. "go-common/library/sync/errgroup"
  11. "github.com/tsuna/gohbase/hrpc"
  12. )
  13. // 这里是集群2的数据请求
  14. const (
  15. // HbaseUpArchiveInfoPrefix archive info表
  16. HbaseUpArchiveInfoPrefix = "upcrm:up_influence"
  17. // HbaseUpArchiveTagInfoPrefix tag表
  18. HbaseUpArchiveTagInfoPrefix = "upcrm:up_archive_tag_info"
  19. // HbaseUpArchiveTypeInfoPrefix 分区表
  20. HbaseUpArchiveTypeInfoPrefix = "upcrm:up_archive_type_info"
  21. )
  22. var (
  23. //ErrInvalidDataType invalid data type
  24. ErrInvalidDataType = fmt.Errorf("invalid data type")
  25. )
  26. //UpArchiveDataType data type
  27. type UpArchiveDataType int
  28. const (
  29. //DataTypeDay7 7 day
  30. DataTypeDay7 UpArchiveDataType = 1
  31. //DataTypeDay30 30 day
  32. DataTypeDay30 UpArchiveDataType = 2
  33. //DataTypeDay90 90 day
  34. DataTypeDay90 UpArchiveDataType = 3
  35. //DataTypeDay180 180 day
  36. DataTypeDay180 UpArchiveDataType = 4
  37. //DataTypeDayAll accumulated
  38. DataTypeDayAll UpArchiveDataType = 5
  39. )
  40. var (
  41. dataType2FamilyMap = map[UpArchiveDataType]string{
  42. DataTypeDay7: "d7",
  43. DataTypeDay30: "d30",
  44. DataTypeDay90: "d90",
  45. DataTypeDay180: "d180",
  46. DataTypeDayAll: "dall",
  47. }
  48. )
  49. //UpArchiveInfo get up archive info
  50. func (d *Dao) UpArchiveInfo(c context.Context, mids []int64, dataType UpArchiveDataType) (result map[int64]*datamodel.UpArchiveData, err error) {
  51. var now = time.Now()
  52. var date = now.AddDate(0, 0, -1).Add(-12 * time.Hour).Format("20060102")
  53. var tableName = generateTableName(HbaseUpArchiveInfoPrefix, date)
  54. var family, ok = dataType2FamilyMap[dataType]
  55. if !ok {
  56. log.Error("invalid data type, type=%d", dataType)
  57. err = ErrInvalidDataType
  58. return
  59. }
  60. var group, ctx = errgroup.WithContext(c)
  61. var lock sync.Mutex
  62. result = make(map[int64]*datamodel.UpArchiveData)
  63. for _, mid := range mids {
  64. var copymid = mid
  65. group.Go(
  66. func() error {
  67. var data = &datamodel.UpArchiveData{}
  68. var key = hbaseMd5Key(copymid)
  69. if e := d.getHbaseRowResult(ctx, tableName, key, data, hrpc.Families(map[string][]string{family: nil})); e != nil {
  70. log.Error("get up archive info fail, mid=%d, err=%v", err)
  71. return nil
  72. }
  73. lock.Lock()
  74. result[copymid] = data
  75. lock.Unlock()
  76. return nil
  77. })
  78. }
  79. if err = group.Wait(); err != nil {
  80. log.Error("batch get fail, err=%v", err)
  81. return
  82. }
  83. log.Info("get archive info ok, find result count=%d", len(result))
  84. return
  85. }
  86. //UpArchiveTagInfo get up archive tag info
  87. func (d *Dao) UpArchiveTagInfo(c context.Context, mid int64) (result datamodel.UpArchiveTagData, err error) {
  88. var now = time.Now()
  89. var date = now.AddDate(0, 0, -1).Add(-12 * time.Hour).Format("20060102")
  90. var tableName = generateTableName(HbaseUpArchiveTagInfoPrefix, date)
  91. var key = hbaseMd5Key(mid)
  92. if err = d.getHbaseRowResult(c, tableName, key, &result); err != nil {
  93. log.Error("get up archive tag info fail, err=%v", err)
  94. }
  95. return
  96. }
  97. //UpArchiveTypeInfo get up archive type info
  98. func (d *Dao) UpArchiveTypeInfo(c context.Context, mid int64) (result datamodel.UpArchiveTypeData, err error) {
  99. var now = time.Now()
  100. var date = now.AddDate(0, 0, -1).Add(-12 * time.Hour).Format("20060102")
  101. var tableName = generateTableName(HbaseUpArchiveTypeInfoPrefix, date)
  102. var key = hbaseMd5Key(mid)
  103. if err = d.getHbaseRowResult(c, tableName, key, &result); err != nil {
  104. log.Error("get up archive type info fail, err=%v", err)
  105. }
  106. return
  107. }
  108. func (d *Dao) getHbaseRowResult(c context.Context, table, key string, result interface{}, options ...func(hrpc.Call) error) (err error) {
  109. var ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  110. defer cancel()
  111. res, err := d.hbase.GetStr(ctx, table, key, options...)
  112. if err != nil {
  113. log.Error("fail to get data from hbase, table=%s, key=%s, err=%v", table, key, err)
  114. return
  115. }
  116. if len(res.Cells) == 0 {
  117. log.Warn("no cells get, table=%s, key=%s, err=%v", table, key, err)
  118. return
  119. }
  120. var parser = hbaseutil.Parser{}
  121. if err = parser.Parse(res.Cells, result); err != nil {
  122. log.Error("parse data fail, table=%s, key=%s, err=%v", table, key, err)
  123. return
  124. }
  125. return
  126. }
  127. // dont use date string
  128. func generateTableName(prefix, date string) string {
  129. return prefix
  130. }