service.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package report
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "io"
  7. xhttp "net/http"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go-common/app/job/main/tv/conf"
  12. "go-common/app/job/main/tv/dao/report"
  13. mdlrep "go-common/app/job/main/tv/model/report"
  14. "go-common/library/log"
  15. "go-common/library/sync/pipeline/fanout"
  16. "github.com/robfig/cron"
  17. )
  18. const (
  19. _retry = 3
  20. _jobRunning = 3
  21. _startJob = 4
  22. _readSize = 1024
  23. )
  24. // Service struct of service .
  25. type Service struct {
  26. c *conf.Config
  27. ch chan bool
  28. dao *report.Dao
  29. respURL map[string]interface{}
  30. cache *fanout.Fanout
  31. lock sync.Mutex
  32. labelRes map[int]map[string]int
  33. readSize int
  34. // cron
  35. cron *cron.Cron
  36. }
  37. // New creates a Service instance.
  38. func New(c *conf.Config) (s *Service) {
  39. s = &Service{
  40. c: c,
  41. dao: report.New(c),
  42. respURL: make(map[string]interface{}),
  43. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  44. labelRes: make(map[int]map[string]int),
  45. readSize: c.Report.ReadSize * _readSize,
  46. cron: cron.New(),
  47. ch: make(chan bool, c.Report.RoutineCount),
  48. }
  49. if err := s.cron.AddFunc(s.c.Report.CronAc, s.oneWork(mdlrep.ArchiveClick)); err != nil { // corn report run
  50. panic(err)
  51. }
  52. if err := s.cron.AddFunc(s.c.Report.CronAd, s.oneWork(mdlrep.ActiveDuration)); err != nil { // corn report run
  53. panic(err)
  54. }
  55. if err := s.cron.AddFunc(s.c.Report.CronPd, s.oneWork(mdlrep.PlayDuration)); err != nil { // corn report run
  56. panic(err)
  57. }
  58. if err := s.cron.AddFunc(s.c.Report.CronVe, s.oneWork(mdlrep.VisitEvent)); err != nil { // corn report run
  59. panic(err)
  60. }
  61. s.cron.Start()
  62. s.readCache() // data report
  63. s.readLabelCache() // label style
  64. go s.reportCon() // data report
  65. go s.showStyle() // label style
  66. go s.showLabel() // label style
  67. return
  68. }
  69. func (s *Service) oneWork(table string) func() {
  70. return func() {
  71. if s.c.Report.Env != "prod" {
  72. return
  73. }
  74. var (
  75. res string
  76. err error
  77. )
  78. if res, err = s.requestURL(table); err != nil {
  79. log.Error("reportPro s.requestURL() error(%v)", err)
  80. return
  81. }
  82. if res == "" {
  83. return
  84. }
  85. s.lock.Lock()
  86. s.respURL[res] = struct{}{}
  87. s.lock.Unlock()
  88. s.setCache()
  89. }
  90. }
  91. func (s *Service) reportCon() {
  92. if s.c.Report.Env != "prod" {
  93. return
  94. }
  95. var (
  96. err error
  97. info *mdlrep.DpCheckJobResult
  98. )
  99. for {
  100. var (
  101. flags, failStr []string
  102. )
  103. s.lock.Lock()
  104. for k := range s.respURL {
  105. flags = append(flags, k)
  106. }
  107. s.respURL = make(map[string]interface{})
  108. s.lock.Unlock()
  109. for _, v := range flags {
  110. if v == "" {
  111. continue
  112. }
  113. // loop send http request and return result
  114. if info, err = s.check(v); err == nil && len(info.Files) > 0 {
  115. now := time.Now()
  116. s.upReport(info)
  117. log.Warn("report success fileNum(%d) url(%s) 本次上报数据耗时: %s", len(info.Files), v, time.Since(now))
  118. continue
  119. }
  120. if info.StatusID == _jobRunning || info.StatusID == _startJob {
  121. failStr = append(failStr, v)
  122. }
  123. }
  124. s.lock.Lock()
  125. for _, v := range failStr {
  126. s.respURL[v] = struct{}{}
  127. }
  128. s.lock.Unlock()
  129. s.setCache()
  130. time.Sleep(3 * time.Second)
  131. }
  132. }
  133. func (s *Service) readFile(path string) {
  134. var (
  135. n int
  136. err error
  137. resdata []map[string]interface{}
  138. resp *xhttp.Response
  139. buf = make([]byte, 1024)
  140. chunks []byte
  141. req *xhttp.Request
  142. fileCnt = 0
  143. )
  144. client := &xhttp.Client{
  145. Transport: &xhttp.Transport{
  146. DisableKeepAlives: true,
  147. },
  148. }
  149. req, err = xhttp.NewRequest("GET", path, strings.NewReader(""))
  150. if err != nil {
  151. log.Error("[url(%s)] xhttp.NewRequest error(%v)", path, err)
  152. return
  153. }
  154. resp, err = client.Do(req)
  155. if err != nil {
  156. log.Error("[url(%s)] client.Do error(%v)", path, err)
  157. return
  158. }
  159. defer resp.Body.Close()
  160. for {
  161. n, err = resp.Body.Read(buf)
  162. if err != nil {
  163. if err == io.EOF {
  164. break
  165. }
  166. log.Error("resp.Body.Read error(%v)", err)
  167. return
  168. }
  169. if 0 == n {
  170. break
  171. }
  172. chunks = append(chunks, buf[:n]...)
  173. if len(chunks) > s.readSize { // 500K
  174. lastPos := bytes.LastIndex(chunks, []byte("\n"))
  175. if lastPos < 0 {
  176. continue
  177. }
  178. fileCnt = fileCnt + 1
  179. results := append([]byte{}, chunks[:lastPos]...)
  180. chunks = append([]byte{}, chunks[lastPos:]...)
  181. bsdata := bytes.Split(results, []byte("\n"))
  182. for _, bs := range bsdata {
  183. n := bytes.Split(bs, []byte("\u0001"))
  184. m := mdlrep.ArcClickParam(n)
  185. resdata = append(resdata, m)
  186. }
  187. if err = s.postData(resdata); err != nil {
  188. log.Error("[url(%s)] s.postData error(%v)", path, err)
  189. }
  190. resdata = make([]map[string]interface{}, 0)
  191. }
  192. }
  193. if len(chunks) > 0 {
  194. bsdata := bytes.Split(chunks, []byte("\n"))
  195. for _, bs := range bsdata {
  196. n := bytes.Split(bs, []byte("\u0001"))
  197. m := mdlrep.ArcClickParam(n)
  198. resdata = append(resdata, m)
  199. }
  200. if err = s.postData(resdata); err != nil {
  201. log.Error("[url(%s)] s.postData error(%v)", path, err)
  202. }
  203. }
  204. }
  205. func (s *Service) requestURL(table string) (res string, err error) {
  206. for i := 0; i < _retry; i++ {
  207. if res, err = s.dao.Report(context.Background(), table); err == nil {
  208. break
  209. }
  210. }
  211. return
  212. }
  213. func (s *Service) check(res string) (info *mdlrep.DpCheckJobResult, err error) {
  214. for i := 0; i < _retry; i++ {
  215. if info, err = s.dao.CheckJob(context.Background(), res); err == nil {
  216. break
  217. }
  218. }
  219. return
  220. }
  221. // upReport .
  222. func (s *Service) upReport(info *mdlrep.DpCheckJobResult) {
  223. for _, v := range info.Files {
  224. s.readFile(v)
  225. }
  226. }
  227. func (s *Service) postData(param []map[string]interface{}) (err error) {
  228. for _, v := range param {
  229. s.ch <- true
  230. go s.sendOnce(v)
  231. }
  232. return
  233. }
  234. func (s *Service) sendOnce(v map[string]interface{}) (err error) {
  235. var (
  236. body string
  237. data []byte
  238. )
  239. defer func() {
  240. <-s.ch
  241. }()
  242. if data, err = json.Marshal(v); err != nil {
  243. log.Error("Service postData json.Marshal error(%v)", err)
  244. return
  245. }
  246. body = body + string(data) + ","
  247. s.dealBody(body)
  248. return
  249. }
  250. func (s *Service) readCache() {
  251. if s.c.Report.Env != "prod" {
  252. return
  253. }
  254. var (
  255. err error
  256. btRes = make(map[string]interface{})
  257. )
  258. if btRes, err = s.dao.GetReportCache(context.Background()); err != nil {
  259. log.Error("s.dao.GetReportCache error(%v)", err)
  260. panic(err)
  261. }
  262. s.respURL = btRes
  263. }
  264. func (s *Service) dealBody(body string) {
  265. body = strings.Replace(body, `\\N`, "", -1)
  266. body = strings.TrimSuffix(body, ",")
  267. body = `{"code": 0,"message": "0","ttl": 1,"data":[` + body + `]}`
  268. if err := s.dao.PostRequest(context.Background(), body); err != nil {
  269. log.Error("s.dao.PostRequest error(%v)", err)
  270. }
  271. }
  272. func (s *Service) setCache() {
  273. s.cache.Do(context.Background(), func(c context.Context) {
  274. s.dao.SetReportCache(c, s.respURL)
  275. })
  276. }