unicom.go 16 KB


  1. package unicom
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/app-wall/model/unicom"
  11. "go-common/library/cache/memcache"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. "go-common/library/queue/databus/report"
  15. )
  16. const (
  17. _initIPUnicomKey = "ipunicom_%v_%v"
  18. )
  19. func (s *Service) clickConsumer() {
  20. defer s.waiter.Done()
  21. msgs := s.clickSub.Messages()
  22. for {
  23. msg, ok := <-msgs
  24. if !ok || s.closed {
  25. log.Info("s.clickSub.Cloesd")
  26. return
  27. }
  28. msg.Commit()
  29. var (
  30. sbs [][]byte
  31. err error
  32. )
  33. if err = json.Unmarshal(msg.Value, &sbs); err != nil {
  34. log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  35. continue
  36. }
  37. for _, bs := range sbs {
  38. var (
  39. err error
  40. click *unicom.ClickMsg
  41. )
  42. if click, err = s.checkMsgIllegal(bs); err != nil {
  43. log.Error("s.checkMsgIllegal(%s) error(%v)", strings.Replace(string(bs), "\001", "|", -1), err)
  44. continue
  45. }
  46. log.Info("clickConsumer s.checkMsgIllegal(%s)", strings.Replace(string(bs), "\001", "|", -1))
  47. s.cliChan[click.AID%s.c.ChanNum] <- click
  48. }
  49. }
  50. }
  51. func (s *Service) checkMsgIllegal(msg []byte) (click *unicom.ClickMsg, err error) {
  52. var (
  53. aid int64
  54. clickMsg []string
  55. plat int64
  56. bvID string
  57. mid int64
  58. lv int64
  59. ctime int64
  60. stime int64
  61. epid int64
  62. ip string
  63. seasonType int
  64. userAgent string
  65. )
  66. clickMsg = strings.Split(string(msg), "\001")
  67. if len(clickMsg) < 10 {
  68. err = errors.New("click msg error")
  69. return
  70. }
  71. if aid, err = strconv.ParseInt(clickMsg[1], 10, 64); err != nil {
  72. err = fmt.Errorf("aid(%s) error", clickMsg[1])
  73. return
  74. }
  75. if aid <= 0 {
  76. err = fmt.Errorf("wocao aid(%s) error", clickMsg[1])
  77. return
  78. }
  79. if plat, err = strconv.ParseInt(clickMsg[0], 10, 64); err != nil {
  80. err = fmt.Errorf("plat(%s) error", clickMsg[0])
  81. return
  82. }
  83. if plat != 3 && plat != 4 {
  84. err = fmt.Errorf("plat(%d) is not android or ios", plat)
  85. return
  86. }
  87. userAgent = clickMsg[10]
  88. bvID = clickMsg[8]
  89. if bvID == "" {
  90. err = fmt.Errorf("bvID(%s) is illegal", clickMsg[8])
  91. return
  92. }
  93. if clickMsg[4] != "" && clickMsg[4] != "0" {
  94. if mid, err = strconv.ParseInt(clickMsg[4], 10, 64); err != nil {
  95. err = fmt.Errorf("mid(%s) is illegal", clickMsg[4])
  96. return
  97. }
  98. }
  99. if clickMsg[5] != "" {
  100. if lv, err = strconv.ParseInt(clickMsg[5], 10, 64); err != nil {
  101. err = fmt.Errorf("lv(%s) is illegal", clickMsg[5])
  102. return
  103. }
  104. }
  105. if ctime, err = strconv.ParseInt(clickMsg[6], 10, 64); err != nil {
  106. err = fmt.Errorf("ctime(%s) is illegal", clickMsg[6])
  107. return
  108. }
  109. if stime, err = strconv.ParseInt(clickMsg[7], 10, 64); err != nil {
  110. err = fmt.Errorf("stime(%s) is illegal", clickMsg[7])
  111. return
  112. }
  113. if ip = clickMsg[9]; ip == "" {
  114. err = errors.New("ip is illegal")
  115. return
  116. }
  117. if clickMsg[17] != "" {
  118. if epid, err = strconv.ParseInt(clickMsg[17], 10, 64); err != nil {
  119. err = fmt.Errorf("epid(%s) is illegal", clickMsg[17])
  120. return
  121. }
  122. if clickMsg[15] != "null" {
  123. if seasonType, err = strconv.Atoi(clickMsg[15]); err != nil {
  124. err = fmt.Errorf("seasonType(%s) is illegal", clickMsg[15])
  125. return
  126. }
  127. }
  128. }
  129. click = &unicom.ClickMsg{
  130. Plat: int8(plat),
  131. AID: aid,
  132. MID: mid,
  133. Lv: int8(lv),
  134. CTime: ctime,
  135. STime: stime,
  136. BvID: bvID,
  137. IP: ip,
  138. KafkaBs: msg,
  139. EpID: epid,
  140. SeasonType: seasonType,
  141. UserAgent: userAgent,
  142. }
  143. return
  144. }
  145. func (s *Service) cliChanProc(i int64) {
  146. defer s.waiter.Done()
  147. var (
  148. cli *unicom.ClickMsg
  149. cliChan = s.cliChan[i]
  150. )
  151. for {
  152. var (
  153. ub *unicom.UserBind
  154. c = context.TODO()
  155. err error
  156. ok bool
  157. count int
  158. addFlow int
  159. now = time.Now()
  160. u *unicom.Unicom
  161. cardType string
  162. )
  163. if cli, ok = <-cliChan; !ok || s.closed {
  164. return
  165. }
  166. if count, err = s.dao.UserPackReceiveCache(c, cli.MID); err != nil {
  167. log.Error("s.dao.UserBindCache error(%v) mid(%v) count(%v)", err, cli.MID, count)
  168. continue
  169. }
  170. if count > 0 {
  171. log.Info("s.dao.UserBindCache mid(%v) count(%v)", cli.MID, count)
  172. continue
  173. }
  174. if ub, err = s.dao.UserBindCache(c, cli.MID); err != nil {
  175. continue
  176. }
  177. if ub == nil || ub.Phone == 0 {
  178. continue
  179. }
  180. res := s.unicomInfo(c, ub.Usermob, now)
  181. if u, ok = res[ub.Usermob]; !ok || u == nil {
  182. continue
  183. }
  184. switch u.Spid {
  185. case 10019:
  186. cardType = "22卡"
  187. case 10020:
  188. cardType = "33卡"
  189. case 10021:
  190. cardType = "小电视卡"
  191. default:
  192. log.Info("unicom spid equal 979 (%v)", ub)
  193. continue
  194. }
  195. ub.Integral = ub.Integral + 10
  196. switch cli.Lv {
  197. case 0, 1, 2, 3:
  198. addFlow = 10
  199. case 4:
  200. addFlow = 15
  201. case 5:
  202. addFlow = 20
  203. case 6:
  204. addFlow = 30
  205. }
  206. ub.Flow = ub.Flow + addFlow
  207. if err = s.dao.AddUserBindCache(c, ub.Mid, ub); err != nil {
  208. log.Error("s.dao.AddUserBindCache error(%v)", err)
  209. continue
  210. }
  211. if err = s.dao.AddUserPackReceiveCache(c, ub.Mid, 1, now); err != nil {
  212. log.Error("s.dao.AddUserPackReceiveCache error(%v)", err)
  213. continue
  214. }
  215. s.dbcliChan[ub.Mid%s.c.ChanDBNum] <- ub
  216. log.Info("unicom mobile cliChanProc userbind(%v)", ub)
  217. s.unicomInfoc(ub.Usermob, ub.Phone, int(cli.Lv), 10, addFlow, cardType, ub.Mid, now)
  218. s.addUserIntegralLog(&unicom.UserIntegralLog{Phone: ub.Phone, Mid: ub.Mid, UnicomDesc: cardType, Type: 0, Integral: 10, Flow: addFlow, Desc: "每日礼包"})
  219. }
  220. }
  221. func (s *Service) dbcliChanProc(i int64) {
  222. defer s.waiter.Done()
  223. var (
  224. ub *unicom.UserBind
  225. dbcliChan = s.dbcliChan[i]
  226. )
  227. for {
  228. var (
  229. c = context.TODO()
  230. ok bool
  231. row int64
  232. err error
  233. )
  234. if ub, ok = <-dbcliChan; !ok || s.closed {
  235. return
  236. }
  237. if row, err = s.dao.UpUserIntegral(c, ub); err != nil || row == 0 {
  238. log.Error("s.dao.UpUserIntegral ub(%v) error(%v) or result==0", ub, err)
  239. continue
  240. }
  241. log.Info("unicom mobile dbcliChanProc userbind(%v)", ub)
  242. }
  243. }
  244. // unicomInfo
  245. func (s *Service) unicomInfo(c context.Context, usermob string, now time.Time) (res map[string]*unicom.Unicom) {
  246. var (
  247. err error
  248. u []*unicom.Unicom
  249. )
  250. res = map[string]*unicom.Unicom{}
  251. if u, err = s.dao.UnicomCache(c, usermob); err == nil && len(u) > 0 {
  252. s.pHit.Incr("unicoms_cache")
  253. } else {
  254. if u, err = s.dao.OrdersUserFlow(context.TODO(), usermob); err != nil {
  255. log.Error("unicom_s.dao.OrdersUserFlow error(%v)", err)
  256. return
  257. }
  258. s.pMiss.Incr("unicoms_cache")
  259. }
  260. if len(u) > 0 {
  261. row := &unicom.Unicom{}
  262. for _, user := range u {
  263. if user.TypeInt == 1 && now.Unix() <= int64(user.Endtime) {
  264. *row = *user
  265. break
  266. } else if user.TypeInt == 0 {
  267. if user.Spid == 979 {
  268. continue
  269. }
  270. if int64(row.Ordertime) > int64(user.Ordertime) {
  271. continue
  272. }
  273. *row = *user
  274. }
  275. }
  276. if row.Spid == 0 {
  277. return
  278. }
  279. res[usermob] = row
  280. }
  281. return
  282. }
  283. func (s *Service) upBindAll() {
  284. var (
  285. orders []*unicom.UserBind
  286. err error
  287. start = 0
  288. end = 1000
  289. )
  290. for {
  291. var tmp []*unicom.UserBind
  292. if tmp, err = s.dao.BindAll(context.TODO(), start, end); err != nil {
  293. log.Error("s.dao.BindAll error(%v)", err)
  294. return
  295. }
  296. start = end + start
  297. if len(tmp) == 0 {
  298. break
  299. }
  300. orders = append(orders, tmp...)
  301. }
  302. for _, b := range orders {
  303. var (
  304. c = context.TODO()
  305. u *unicom.Unicom
  306. ok bool
  307. now = time.Now()
  308. integral int
  309. ub *unicom.UserBind
  310. err error
  311. cardType string
  312. )
  313. if now.Month() == b.Monthly.Month() && now.Year() == b.Monthly.Year() {
  314. continue
  315. }
  316. res := s.unicomInfo(c, b.Usermob, now)
  317. if u, ok = res[b.Usermob]; !ok || u == nil {
  318. continue
  319. }
  320. switch u.Spid {
  321. case 10019:
  322. integral = 220
  323. cardType = "22卡"
  324. case 10020:
  325. integral = 330
  326. cardType = "33卡"
  327. case 10021:
  328. integral = 660
  329. cardType = "小电视卡"
  330. default:
  331. continue
  332. }
  333. if ub, err = s.dao.UserBindCache(c, b.Mid); err != nil {
  334. continue
  335. }
  336. if ub == nil || ub.Phone == 0 {
  337. continue
  338. }
  339. ub.Integral = ub.Integral + integral
  340. ub.Monthly = now
  341. if err = s.dao.AddUserBindCache(c, ub.Mid, ub); err != nil {
  342. log.Error("s.dao.AddUserBindCache error(%v)", err)
  343. continue
  344. }
  345. s.dbcliChan[ub.Mid%s.c.ChanDBNum] <- ub
  346. log.Info("unicom mobile upBindAll userbind(%v)", ub)
  347. s.unicomInfoc(ub.Usermob, ub.Phone, 0, integral, 0, cardType, ub.Mid, now)
  348. s.addUserIntegralLog(&unicom.UserIntegralLog{Phone: ub.Phone, Mid: ub.Mid, UnicomDesc: cardType, Type: 1, Integral: integral, Flow: 0, Desc: "每月礼包"})
  349. }
  350. }
  351. func (s *Service) updatemonth(now time.Time) {
  352. m := int(now.Month())
  353. if lmonth, ok := s.lastmonth[m]; !ok || !lmonth {
  354. if now.Day() == 1 {
  355. s.upBindAll()
  356. s.lastmonth[m] = true
  357. if m = m + 1; m > 12 {
  358. m = 1
  359. }
  360. s.lastmonth[m] = false
  361. log.Info("updatepro user monthly integral success")
  362. }
  363. }
  364. }
  365. func (s *Service) loadUnicomFlow() {
  366. var (
  367. list map[string]*unicom.UnicomUserFlow
  368. err error
  369. )
  370. if list, err = s.dao.UserFlowListCache(context.TODO()); err != nil {
  371. log.Error("load unicom s.dao.UserFlowListCache error(%v)", err)
  372. return
  373. }
  374. log.Info("load unicom flow total len(%v)", len(list))
  375. for key, u := range list {
  376. var (
  377. c = context.TODO()
  378. requestNo int64
  379. orderstatus string
  380. msg string
  381. )
  382. if err = s.dao.UserFlowCache(c, key); err != nil {
  383. if err == memcache.ErrNotFound {
  384. if err = s.returnPoints(c, u); err != nil {
  385. if err != ecode.NothingFound {
  386. log.Error("load unicom s.returnPoints error(%v)", err)
  387. continue
  388. }
  389. err = nil
  390. }
  391. log.Info("load unicom userbind timeout flow(%v)", u)
  392. } else {
  393. log.Error("load unicom s.dao.UserFlowCache error(%v)", err)
  394. continue
  395. }
  396. } else {
  397. if requestNo, err = s.seqdao.SeqID(c); err != nil {
  398. log.Error("load unicom s.seqdao.SeqID error(%v)", err)
  399. continue
  400. }
  401. if orderstatus, msg, err = s.dao.FlowQry(c, u.Phone, requestNo, u.Outorderid, u.Orderid, time.Now()); err != nil {
  402. log.Error("load unicom s.dao.FlowQry error(%v) msg(%s)", err, msg)
  403. continue
  404. }
  405. log.Info("load unicom userbind flow(%v) orderstatus(%s)", u, orderstatus)
  406. if orderstatus == "00" {
  407. continue
  408. } else if orderstatus != "01" {
  409. if err = s.returnPoints(c, u); err != nil {
  410. if err != ecode.NothingFound {
  411. log.Error("load unicom s.returnPoints error(%v)", err)
  412. continue
  413. }
  414. err = nil
  415. }
  416. }
  417. }
  418. delete(list, key)
  419. if err = s.dao.DeleteUserFlowCache(c, key); err != nil {
  420. log.Error("load unicom s.dao.DeleteUserFlowCache error(%v)", err)
  421. continue
  422. }
  423. }
  424. if err = s.dao.AddUserFlowListCache(context.TODO(), list); err != nil {
  425. log.Error("load unicom s.dao.AddUserFlowListCache error(%v)", err)
  426. return
  427. }
  428. log.Info("load unicom flow last len(%v) success", len(list))
  429. }
  430. // returnPoints retutn user integral and flow
  431. func (s *Service) returnPoints(c context.Context, u *unicom.UnicomUserFlow) (err error) {
  432. var (
  433. userbind *unicom.UserBind
  434. result int64
  435. )
  436. if userbind, err = s.unicomBindInfo(c, u.Mid); err != nil {
  437. return
  438. }
  439. ub := &unicom.UserBind{}
  440. *ub = *userbind
  441. ub.Flow = ub.Flow + u.Flow
  442. ub.Integral = ub.Integral + u.Integral
  443. if err = s.dao.AddUserBindCache(c, ub.Mid, ub); err != nil {
  444. log.Error("unicom s.dao.AddUserBindCache error(%v)", err)
  445. return
  446. }
  447. if result, err = s.dao.UpUserIntegral(c, ub); err != nil || result == 0 {
  448. log.Error("unicom s.dao.UpUserIntegral error(%v) or result==0", err)
  449. return
  450. }
  451. var packInt int
  452. if u.Integral > 0 {
  453. packInt = u.Integral
  454. } else {
  455. packInt = u.Flow
  456. }
  457. ul := &unicom.UserPackLog{
  458. Phone: u.Phone,
  459. Usermob: ub.Usermob,
  460. Mid: u.Mid,
  461. RequestNo: u.Outorderid,
  462. Type: 0,
  463. Desc: u.Desc + ",领取失败并返还",
  464. Integral: packInt,
  465. }
  466. s.addUserPackLog(ul)
  467. s.addUserIntegralLog(&unicom.UserIntegralLog{Phone: u.Phone, Mid: u.Mid, UnicomDesc: "", Type: 2, Integral: u.Integral, Flow: u.Flow, Desc: u.Desc + ",领取失败并返还"})
  468. log.Info("unicom_pack(%v) mid(%v)", u.Desc+",领取失败并返还", userbind.Mid)
  469. s.unicomPackInfoc(userbind.Usermob, u.Desc+",领取失败并返还", u.Orderid, userbind.Phone, packInt, 0, userbind.Mid, time.Now())
  470. return
  471. }
  472. // unicomBindInfo unicom bind info
  473. func (s *Service) unicomBindInfo(c context.Context, mid int64) (res *unicom.UserBind, err error) {
  474. if res, err = s.dao.UserBindCache(c, mid); err != nil {
  475. if res, err = s.dao.UserBind(c, mid); err != nil {
  476. log.Error("s.dao.UserBind error(%v)", err)
  477. return
  478. }
  479. if res == nil {
  480. err = ecode.NothingFound
  481. return
  482. }
  483. if err = s.dao.AddUserBindCache(c, mid, res); err != nil {
  484. log.Error("s.dao.AddUserBindCache mid(%d) error(%v)", mid, err)
  485. return
  486. }
  487. }
  488. return
  489. }
  490. // loadUnicomIPOrder load unciom ip order update
  491. func (s *Service) loadUnicomIPOrder(now time.Time) {
  492. var (
  493. dbips map[string]*unicom.UnicomIP
  494. err error
  495. )
  496. if dbips, err = s.loadUnicomIP(context.TODO()); err != nil {
  497. log.Error("s.loadUnicomIP", err)
  498. return
  499. }
  500. if len(dbips) == 0 {
  501. log.Error("db cache ip len 0")
  502. return
  503. }
  504. unicomIP, err := s.dao.UnicomIP(context.TODO(), now)
  505. if err != nil {
  506. log.Error("s.dao.UnicomIP(%v)", err)
  507. return
  508. }
  509. if len(unicomIP) == 0 {
  510. log.Info("unicom ip orders is null")
  511. return
  512. }
  513. tx, err := s.dao.BeginTran(context.TODO())
  514. if err != nil {
  515. log.Error("s.dao.BeginTran error(%v)", err)
  516. return
  517. }
  518. for _, uip := range unicomIP {
  519. key := fmt.Sprintf(_initIPUnicomKey, uip.Ipbegin, uip.Ipend)
  520. if _, ok := dbips[key]; ok {
  521. delete(dbips, key)
  522. continue
  523. }
  524. var (
  525. result int64
  526. )
  527. if result, err = s.dao.InUnicomIPSync(tx, uip, time.Now()); err != nil || result == 0 {
  528. tx.Rollback()
  529. log.Error("s.dao.InUnicomIPSync error(%v)", err)
  530. return
  531. }
  532. }
  533. for _, uold := range dbips {
  534. var (
  535. result int64
  536. )
  537. if result, err = s.dao.UpUnicomIP(tx, uold.Ipbegin, uold.Ipend, 0, time.Now()); err != nil || result == 0 {
  538. tx.Rollback()
  539. log.Error("s.dao.UpUnicomIP error(%v)", err)
  540. return
  541. }
  542. }
  543. if err = tx.Commit(); err != nil {
  544. log.Error("tx.Commit error(%v)", err)
  545. return
  546. }
  547. log.Info("update unicom ip success")
  548. }
  549. // loadUnicomIP load unicom ip
  550. func (s *Service) loadUnicomIP(c context.Context) (res map[string]*unicom.UnicomIP, err error) {
  551. var unicomIP []*unicom.UnicomIP
  552. unicomIP, err = s.dao.IPSync(c)
  553. if err != nil {
  554. log.Error("s.dao.IPSync error(%v)", err)
  555. return
  556. }
  557. tmp := map[string]*unicom.UnicomIP{}
  558. for _, u := range unicomIP {
  559. key := fmt.Sprintf(_initIPUnicomKey, u.Ipbegin, u.Ipend)
  560. tmp[key] = u
  561. }
  562. res = tmp
  563. log.Info("loadUnicomIPCache success")
  564. return
  565. }
  566. func (s *Service) addUserPackLog(u *unicom.UserPackLog) {
  567. select {
  568. case s.packLogCh <- u:
  569. default:
  570. log.Warn("user pack log buffer is full")
  571. }
  572. }
  573. func (s *Service) addUserIntegralLog(u *unicom.UserIntegralLog) {
  574. select {
  575. case s.integralLogCh[u.Mid%s.c.ChanDBNum] <- u:
  576. default:
  577. log.Warn("user add integral and flow log buffer is full")
  578. }
  579. }
  580. func (s *Service) addUserPackLogproc() {
  581. for {
  582. i, ok := <-s.packLogCh
  583. if !ok || s.closed {
  584. log.Warn("user pack log proc exit")
  585. return
  586. }
  587. var (
  588. c = context.TODO()
  589. result int64
  590. err error
  591. )
  592. switch v := i.(type) {
  593. case *unicom.UserPackLog:
  594. if result, err = s.dao.InUserPackLog(c, v); err != nil || result == 0 {
  595. log.Error("s.dao.UpUserIntegral error(%v) or result==0", err)
  596. continue
  597. }
  598. log.Info("unicom user flow or integral back mid(%d) phone(%d)", v.Mid, v.Phone)
  599. }
  600. }
  601. }
  602. func (s *Service) addUserIntegralLogproc(i int64) {
  603. var (
  604. dbcliChan = s.integralLogCh[i]
  605. )
  606. for {
  607. i, ok := <-dbcliChan
  608. if !ok || s.closed {
  609. log.Warn("user pack log proc exit")
  610. return
  611. }
  612. var (
  613. logID = 91
  614. )
  615. switch v := i.(type) {
  616. case *unicom.UserIntegralLog:
  617. // if result, err = s.dao.InUserIntegralLog(c, v); err != nil || result == 0 {
  618. // log.Error("s.dao.InUserIntegralLog error(%v) or result==0", err)
  619. // continue
  620. // }
  621. report.User(&report.UserInfo{
  622. Mid: v.Mid,
  623. Business: logID,
  624. Action: "unicom_userpack_add",
  625. Ctime: time.Now(),
  626. Content: map[string]interface{}{
  627. "phone": v.Phone,
  628. "pack_desc": v.Desc,
  629. "integral": v.Integral,
  630. },
  631. })
  632. }
  633. }
  634. }