item_cf_job.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package service
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "os/exec"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync/atomic"
  15. "time"
  16. "github.com/siddontang/go/ioutil2"
  17. "go-common/app/service/live/recommend/recconst"
  18. "go-common/app/job/live/recommend-job/internal/conf"
  19. "go-common/library/cache/redis"
  20. "go-common/library/log"
  21. "go-common/library/sync/errgroup"
  22. "github.com/pkg/errors"
  23. )
  24. // ItemCFJob 把 hadoop 的数据写到redis
  25. type ItemCFJob struct {
  26. Conf *conf.JobConfig
  27. RedisConf *redis.Config
  28. HadoopConf *conf.HadoopConfig
  29. }
  30. // Run ...
  31. func (j *ItemCFJob) Run() {
  32. log.Info("Run ItemCFJob")
  33. processFile(j.Conf, j.HadoopConf, j.RedisConf, writeItemCFToRedis)
  34. log.Info("ItemCFJob Done")
  35. }
  36. func fetchFiles(jobConf *conf.JobConfig,
  37. hadoopConf *conf.HadoopConfig) (paths []string, err error) {
  38. path := jobConf.InputFile
  39. if path == "" {
  40. var javaHome string
  41. var hadoopHome string
  42. hadoopHome, javaHome, err = downloadHadoop(hadoopConf)
  43. if err != nil {
  44. log.Info("download hadoop err %+v ", err)
  45. return
  46. }
  47. log.Info("download hadoop success: " + hadoopHome)
  48. path, err = downloadFileFromHadoop(jobConf, hadoopHome, javaHome)
  49. if err != nil {
  50. log.Info("path %s downloaded err %+v ", path, err)
  51. return
  52. }
  53. log.Info("path downloaded success: " + path)
  54. var file os.FileInfo
  55. file, err = os.Stat(path)
  56. if err != nil {
  57. log.Error("cannot open file %s err: %+v", file, err)
  58. return
  59. }
  60. if file.IsDir() {
  61. var files []os.FileInfo
  62. files, err = ioutil.ReadDir(path)
  63. if err != nil {
  64. log.Error("no file in dir: %d, err: %+v", path, err)
  65. return
  66. }
  67. for _, f := range files {
  68. if strings.Index(f.Name(), ".") != 0 && strings.Index(f.Name(), "_") != 0 {
  69. paths = append(paths, path+"/"+f.Name())
  70. }
  71. }
  72. } else {
  73. paths = []string{path}
  74. }
  75. } else {
  76. if strings.Index(path, "http://") == 0 {
  77. var httpPath = path
  78. path, err = downloadFileFromHttp(httpPath, "/tmp/job-downloaded.txt")
  79. if err != nil {
  80. log.Error("download from http path=%s, error=%+v", httpPath, err)
  81. return
  82. }
  83. log.Info("file downloaded from http %s to %s", httpPath, path)
  84. } else {
  85. var file os.FileInfo
  86. file, err = os.Stat(path)
  87. if err != nil {
  88. log.Error("cannot open file %s err: %+v", file, err)
  89. return
  90. }
  91. if file.IsDir() {
  92. var files []os.FileInfo
  93. files, err = ioutil.ReadDir(path)
  94. if err != nil {
  95. log.Error("no file in dir: %d, err: %+v", path, err)
  96. return
  97. }
  98. for _, f := range files {
  99. if strings.Index(f.Name(), ".") != 0 && strings.Index(f.Name(), "_") != 0 {
  100. paths = append(paths, path+"/"+f.Name())
  101. }
  102. }
  103. } else {
  104. paths = []string{path}
  105. }
  106. }
  107. }
  108. return
  109. }
  110. // 按行读取文件,并调用handler处理
  111. func processFile(jobConf *conf.JobConfig,
  112. hadoopConf *conf.HadoopConfig,
  113. redisConf *redis.Config,
  114. handler func(line string, pool *redis.Pool) error,
  115. ) {
  116. paths, err := fetchFiles(jobConf, hadoopConf)
  117. if err != nil {
  118. return
  119. }
  120. var workerNum = jobConf.WorkerNum
  121. var r = redis.NewPool(redisConf)
  122. defer r.Close()
  123. log.Info("all of files %+v", paths)
  124. for _, path := range paths {
  125. var startLineNum int
  126. file, err := os.Open(path)
  127. if err != nil {
  128. log.Error("open path %s err %+v", path, errors.WithStack(err))
  129. panic(err)
  130. }
  131. name := filepath.Base(filepath.Dir(path)) + "-" + filepath.Base(path)
  132. offsetPath := "/tmp/" + name + ".offset"
  133. // 读取上一次的位置
  134. contentB, e := ioutil.ReadFile(offsetPath)
  135. if e == nil {
  136. content := string(contentB)
  137. offset, e := strconv.Atoi(content)
  138. if e == nil {
  139. startLineNum = offset
  140. } else {
  141. startLineNum = 1
  142. }
  143. } else {
  144. startLineNum = 1
  145. }
  146. log.Info("start from line: %d, file : %s offset file: %s", startLineNum, path, offsetPath)
  147. var ops uint64
  148. totalCount := lineCounter(path)
  149. scanner := bufio.NewScanner(file)
  150. g := errgroup.Group{}
  151. g.GOMAXPROCS(workerNum)
  152. var lineNum = 0
  153. for scanner.Scan() {
  154. lineNum++
  155. if lineNum < startLineNum {
  156. continue
  157. }
  158. line := scanner.Text() // 10001 [1:0.9,2:0.9]
  159. g.Go(func() error {
  160. handler(line, r)
  161. atomic.AddUint64(&ops, 1)
  162. if ops%20000 == 0 {
  163. fmt.Printf("progress %d / %d percent %f %s \r", ops+uint64(startLineNum)-1, totalCount,
  164. float32(uint64(startLineNum)+ops-1)/float32(totalCount)*100, "%")
  165. } else if ops == uint64(totalCount) {
  166. fmt.Printf("progress %d / %d 100%%\n", ops, totalCount)
  167. }
  168. return nil
  169. })
  170. if lineNum%100000 == 0 {
  171. g.Wait()
  172. ioutil.WriteFile(offsetPath,
  173. []byte(strconv.FormatInt(int64(lineNum), 10)),
  174. os.ModePerm)
  175. g = errgroup.Group{}
  176. g.GOMAXPROCS(workerNum)
  177. }
  178. }
  179. g.Wait()
  180. if err = scanner.Err(); err != nil {
  181. log.Error("err %+v", errors.WithStack(err))
  182. panic(err)
  183. }
  184. file.Close()
  185. }
  186. }
  187. func writeItemCFToRedis(line string, r *redis.Pool) (err error) {
  188. start := strings.Index(line, "[")
  189. end := strings.LastIndex(line, "]")
  190. userIdStr := line[0 : start-1]
  191. items := strings.Split(line[start+1:end], ",")
  192. c := r.Get(context.Background())
  193. defer c.Close()
  194. userId, _ := strconv.Atoi(userIdStr)
  195. key := fmt.Sprintf(recconst.UserItemCFRecKey, userId)
  196. // 最多保留50个推荐
  197. var inMemoryCount int
  198. inMemoryCount, err = redis.Int(c.Do("ZCARD", key))
  199. if err != nil {
  200. log.Error("zcard err: %+v", err)
  201. } else {
  202. var toBeRemovedCount = inMemoryCount + len(items) - 60
  203. if toBeRemovedCount > 0 {
  204. var removed int
  205. removed, err = redis.Int(c.Do("ZREMRANGEBYRANK", key, 0, toBeRemovedCount-1))
  206. if err != nil {
  207. log.Error("ZREMRANGEBYRANK key:%s, err: +%v", key, err)
  208. } else {
  209. log.Info("zset removed %d count, key:%s", removed, key)
  210. }
  211. }
  212. }
  213. for _, item := range items {
  214. split := strings.Split(item, ":")
  215. itemID := split[0]
  216. score := split[1]
  217. c.Send("ZADD", key, score, itemID)
  218. }
  219. c.Send("EXPIRE", key, 86400*30)
  220. err = c.Flush()
  221. if err != nil {
  222. log.Error("zadd to redis error: %+v , key=%s", err, key)
  223. return err
  224. }
  225. for i := 0; i < len(items)+1; i++ {
  226. _, err = c.Receive()
  227. if err != nil {
  228. log.Error("zadd to redis error: %+v , key=%s, line=%s", err, key, line)
  229. return err
  230. }
  231. }
  232. return nil
  233. }
  234. func lineCounter(path string) int {
  235. buf := make([]byte, 32*1024)
  236. r, _ := os.Open(path)
  237. defer r.Close()
  238. count := 0
  239. lineSep := []byte{'\n'}
  240. for {
  241. c, err := r.Read(buf)
  242. count += bytes.Count(buf[:c], lineSep)
  243. switch {
  244. case err == io.EOF:
  245. return count
  246. case err != nil:
  247. return count
  248. }
  249. }
  250. }
  251. func downloadHadoop(hadoopConf *conf.HadoopConfig) (hadoopHome string, javaHome string, err error) {
  252. if strings.LastIndex(hadoopConf.HadoopDir, "/") == len(hadoopConf.HadoopDir)-1 {
  253. hadoopHome = hadoopConf.HadoopDir + "hadoop-2.8.4"
  254. } else {
  255. hadoopHome = hadoopConf.HadoopDir + "/hadoop-2.8.4"
  256. }
  257. javaHome = hadoopHome + "/jdk1.8.0_60"
  258. if ioutil2.FileExists(hadoopHome) {
  259. return
  260. }
  261. var cmd = "curl -sSLf " + hadoopConf.TarUrl + " -o /tmp/hadoop.tar.gz"
  262. err = runCmd(cmd)
  263. if err != nil {
  264. return
  265. }
  266. cmd = "tar -C " + hadoopConf.HadoopDir + " -xf /tmp/hadoop.tar.gz"
  267. err = runCmd(cmd)
  268. if err != nil {
  269. return
  270. }
  271. return
  272. }
  273. func downloadFileFromHttp(url string, output string) (string, error) {
  274. var localPath = output
  275. var cmd = "curl -sSLf " + url + " -o " + localPath
  276. var err = runCmd(cmd)
  277. if err != nil {
  278. return "", err
  279. }
  280. return localPath, nil
  281. }
  282. func downloadFileFromHadoop(jobConf *conf.JobConfig, hadoopHome string, javaHome string) (string, error) {
  283. t := time.Now().AddDate(0, 0, -1)
  284. day := t.Format("20060102")
  285. localPath := fmt.Sprintf(jobConf.LocalTmpFile, day)
  286. if ioutil2.FileExists(localPath) {
  287. return localPath, nil
  288. }
  289. remotePath := fmt.Sprintf(jobConf.HadoopFile, day)
  290. cmd := fmt.Sprintf("export JAVA_HOME=%s; %s/bin/hdfs dfs -get %s %s",
  291. javaHome, hadoopHome, remotePath, localPath)
  292. err := runCmd(cmd)
  293. return localPath, err
  294. }
  295. // runCmd runs the cmd & print output (both stdout & stderr)
  296. func runCmd(cmd string) (err error) {
  297. fmt.Printf("CMD: %s \n", cmd)
  298. out, err := exec.Command("/bin/bash", "-c", cmd).CombinedOutput()
  299. log.Info(string(out))
  300. if err != nil {
  301. err = errors.Wrap(err, string(out))
  302. }
  303. return
  304. }