unicom.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package unicom
  2. import (
  3. "context"
  4. "database/sql"
  5. "time"
  6. "go-common/app/interface/main/app-wall/conf"
  7. "go-common/app/interface/main/app-wall/model/unicom"
  8. "go-common/library/cache/memcache"
  9. "go-common/library/database/elastic"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. httpx "go-common/library/net/http/blademaster"
  13. )
  14. const (
  15. //unicom
  16. _inOrderSyncSQL = `INSERT IGNORE INTO unicom_order (usermob,cpid,spid,type,ordertime,canceltime,endtime,channelcode,province,area,ordertype,videoid,ctime,mtime)
  17. VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE cpid=?,spid=?,type=?,ordertime=?,canceltime=?,endtime=?,channelcode=?,province=?,area=?,ordertype=?,videoid=?,mtime=?`
  18. _inAdvanceSyncSQL = `INSERT IGNORE INTO unicom_order_advance (usermob,userphone,cpid,spid,ordertime,channelcode,province,area,ctime,mtime)
  19. VALUES(?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE cpid=?,spid=?,ordertime=?,channelcode=?,province=?,area=?,mtime=?`
  20. _upOrderFlowSQL = `UPDATE unicom_order SET time=?,flowbyte=?,mtime=? WHERE usermob=?`
  21. _orderUserSyncSQL = `SELECT usermob,cpid,spid,type,ordertime,canceltime,endtime,channelcode,province,area,ordertype,videoid,time,flowbyte FROM unicom_order WHERE usermob=?
  22. ORDER BY type DESC`
  23. _inIPSyncSQL = `INSERT IGNORE INTO unicom_ip (ipbegion,ipend,provinces,isopen,opertime,sign,ctime,mtime) VALUES(?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE
  24. ipbegion=?,ipend=?,provinces=?,isopen=?,opertime=?,sign=?,mtime=?`
  25. _ipSyncSQL = `SELECT ipbegion,ipend FROM unicom_ip WHERE isopen=1`
  26. //pack
  27. _inPackSQL = `INSERT IGNORE INTO unicom_pack (usermob,mid) VALUES(?,?)`
  28. _packSQL = `SELECT usermob,mid FROM unicom_pack WHERE usermob=?`
  29. // update unicom ip
  30. _inUnicomIPSyncSQL = `INSERT IGNORE INTO unicom_ip (ipbegion,ipend,isopen,ctime,mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE
  31. ipbegion=?,ipend=?,isopen=?,mtime=?`
  32. _upUnicomIPSQL = `UPDATE unicom_ip SET isopen=?,mtime=? WHERE ipbegion=? AND ipend=?`
  33. // unicom integral change
  34. _inUserBindSQL = `INSERT IGNORE INTO unicom_user_bind (usermob,phone,mid,state,integral,flow) VALUES(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE
  35. phone=?,state=?`
  36. _userBindSQL = `SELECT usermob,phone,mid,state,integral,flow,monthlytime FROM unicom_user_bind WHERE state=1 AND mid=?`
  37. _userBindPhoneMidSQL = `SELECT mid FROM unicom_user_bind WHERE phone=? AND state=1`
  38. _upUserIntegralSQL = `UPDATE unicom_user_bind SET integral=?,flow=? WHERE phone=? AND mid=?`
  39. _userPacksSQL = `SELECT id,ptype,pdesc,amount,capped,integral,param FROM unicom_user_packs WHERE state=1`
  40. _userPacksByIDSQL = `SELECT id,ptype,pdesc,amount,capped,integral,param,state FROM unicom_user_packs WHERE id=?`
  41. _upUserPacksSQL = `UPDATE unicom_user_packs SET amount=?,state=? WHERE id=?`
  42. _inUserPackLogSQL = `INSERT INTO unicom_user_packs_log (phone,usermob,mid,request_no,ptype,integral,pdesc) VALUES (?,?,?,?,?,?,?)`
  43. _userBindOldSQL = `SELECT usermob,phone,mid,state,integral,flow FROM unicom_user_bind WHERE phone=? ORDER BY mtime DESC limit 1`
  44. _userPacksLogSQL = `SELECT phone,integral,ptype,pdesc FROM unicom_user_packs_log WHERE mtime>=? AND mtime<?`
  45. )
  46. type Dao struct {
  47. db *xsql.DB
  48. client *httpx.Client
  49. uclient *httpx.Client
  50. //unicom
  51. inOrderSyncSQL *xsql.Stmt
  52. inAdvanceSyncSQL *xsql.Stmt
  53. upOrderFlowSQL *xsql.Stmt
  54. orderUserSyncSQL *xsql.Stmt
  55. inIPSyncSQL *xsql.Stmt
  56. ipSyncSQL *xsql.Stmt
  57. // unicom integral change
  58. inUserBindSQL *xsql.Stmt
  59. userBindSQL *xsql.Stmt
  60. userBindPhoneMidSQL *xsql.Stmt
  61. upUserIntegralSQL *xsql.Stmt
  62. userPacksSQL *xsql.Stmt
  63. userPacksByIDSQL *xsql.Stmt
  64. upUserPacksSQL *xsql.Stmt
  65. inUserPackLogSQL *xsql.Stmt
  66. userBindOldSQL *xsql.Stmt
  67. //pack
  68. inPackSQL *xsql.Stmt
  69. packSQL *xsql.Stmt
  70. // memcache
  71. mc *memcache.Pool
  72. expire int32
  73. flowKeyExpired int32
  74. flowWait int32
  75. // unicom url
  76. unicomIPURL string
  77. unicomFlowExchangeURL string
  78. // order url
  79. orderURL string
  80. ordercancelURL string
  81. sendsmscodeURL string
  82. smsNumberURL string
  83. // elastic
  84. es *elastic.Elastic
  85. }
  86. func New(c *conf.Config) (d *Dao) {
  87. d = &Dao{
  88. db: xsql.NewMySQL(c.MySQL.Show),
  89. client: httpx.NewClient(conf.Conf.HTTPBroadband),
  90. uclient: httpx.NewClient(conf.Conf.HTTPUnicom),
  91. // unicom url
  92. unicomIPURL: c.Host.Unicom + _unicomIPURL,
  93. unicomFlowExchangeURL: c.Host.UnicomFlow + _unicomFlowExchangeURL,
  94. // memcache
  95. mc: memcache.NewPool(c.Memcache.Operator.Config),
  96. expire: int32(time.Duration(c.Memcache.Operator.Expire) / time.Second),
  97. flowKeyExpired: int32(time.Duration(c.Unicom.KeyExpired) / time.Second),
  98. flowWait: int32(time.Duration(c.Unicom.FlowWait) / time.Second),
  99. // order url
  100. orderURL: c.Host.Broadband + _orderURL,
  101. ordercancelURL: c.Host.Broadband + _ordercancelURL,
  102. sendsmscodeURL: c.Host.Broadband + _sendsmscodeURL,
  103. smsNumberURL: c.Host.Broadband + _smsNumberURL,
  104. // elastic
  105. es: elastic.NewElastic(nil),
  106. }
  107. d.inOrderSyncSQL = d.db.Prepared(_inOrderSyncSQL)
  108. d.inAdvanceSyncSQL = d.db.Prepared(_inAdvanceSyncSQL)
  109. d.upOrderFlowSQL = d.db.Prepared(_upOrderFlowSQL)
  110. d.orderUserSyncSQL = d.db.Prepared(_orderUserSyncSQL)
  111. d.inIPSyncSQL = d.db.Prepared(_inIPSyncSQL)
  112. d.ipSyncSQL = d.db.Prepared(_ipSyncSQL)
  113. // unicom integral change
  114. d.inUserBindSQL = d.db.Prepared(_inUserBindSQL)
  115. d.userBindSQL = d.db.Prepared(_userBindSQL)
  116. d.userBindPhoneMidSQL = d.db.Prepared(_userBindPhoneMidSQL)
  117. d.upUserIntegralSQL = d.db.Prepared(_upUserIntegralSQL)
  118. d.userPacksSQL = d.db.Prepared(_userPacksSQL)
  119. d.upUserPacksSQL = d.db.Prepared(_upUserPacksSQL)
  120. d.userPacksByIDSQL = d.db.Prepared(_userPacksByIDSQL)
  121. d.inUserPackLogSQL = d.db.Prepared(_inUserPackLogSQL)
  122. d.userBindOldSQL = d.db.Prepared(_userBindOldSQL)
  123. //pack
  124. d.inPackSQL = d.db.Prepared(_inPackSQL)
  125. d.packSQL = d.db.Prepared(_packSQL)
  126. return
  127. }
  128. // InOrdersSync insert OrdersSync
  129. func (d *Dao) InOrdersSync(ctx context.Context, usermob string, u *unicom.UnicomJson, now time.Time) (row int64, err error) {
  130. res, err := d.inOrderSyncSQL.Exec(ctx, usermob,
  131. u.Cpid, u.Spid, u.TypeInt, u.Ordertime, u.Canceltime, u.Endtime,
  132. u.Channelcode, u.Province, u.Area, u.Ordertypes, u.Videoid, now, now,
  133. u.Cpid, u.Spid, u.TypeInt, u.Ordertime, u.Canceltime, u.Endtime,
  134. u.Channelcode, u.Province, u.Area, u.Ordertypes, u.Videoid, now)
  135. if err != nil {
  136. log.Error("d.inOrderSyncSQL.Exec error(%v)", err)
  137. return
  138. }
  139. utmp := &unicom.Unicom{}
  140. utmp.UnicomJSONTOUincom(usermob, u)
  141. if err = d.UpdateUnicomCache(ctx, usermob, utmp); err != nil {
  142. log.Error("d.UpdateUnicomCache usermob(%v) error(%v)", usermob, err)
  143. return 0, err
  144. }
  145. return res.RowsAffected()
  146. }
  147. // InAdvanceSync insert AdvanceSync
  148. func (d *Dao) InAdvanceSync(ctx context.Context, usermob string, u *unicom.UnicomJson, now time.Time) (row int64, err error) {
  149. res, err := d.inAdvanceSyncSQL.Exec(ctx, usermob, u.Userphone,
  150. u.Cpid, u.Spid, u.Ordertime, u.Channelcode, u.Province, u.Area, now, now,
  151. u.Cpid, u.Spid, u.Ordertime, u.Channelcode, u.Province, u.Area, now)
  152. if err != nil {
  153. log.Error("d.inAdvanceSyncSQL.Exec error(%v)", err)
  154. return 0, err
  155. }
  156. return res.RowsAffected()
  157. }
  158. // FlowSync update OrdersSync
  159. func (d *Dao) FlowSync(ctx context.Context, flowbyte int, usermob, time string, now time.Time) (row int64, err error) {
  160. res, err := d.upOrderFlowSQL.Exec(ctx, time, flowbyte, now, usermob)
  161. if err != nil {
  162. log.Error("d.upOrderFlowSQL.Exec error(%v)", err)
  163. return 0, err
  164. }
  165. return res.RowsAffected()
  166. }
  167. // OrdersUserFlow select user OrdersSync
  168. func (d *Dao) OrdersUserFlow(ctx context.Context, usermob string) (res []*unicom.Unicom, err error) {
  169. rows, err := d.orderUserSyncSQL.Query(ctx, usermob)
  170. if err != nil {
  171. log.Error("query error (%v)", err)
  172. return
  173. }
  174. defer rows.Close()
  175. for rows.Next() {
  176. u := &unicom.Unicom{}
  177. if err = rows.Scan(&u.Usermob, &u.Cpid, &u.Spid, &u.TypeInt, &u.Ordertime, &u.Canceltime, &u.Endtime, &u.Channelcode, &u.Province,
  178. &u.Area, &u.Ordertypes, &u.Videoid, &u.Time, &u.Flowbyte); err != nil {
  179. log.Error("OrdersUserFlow row.Scan err (%v)", err)
  180. return
  181. }
  182. u.UnicomChange()
  183. res = append(res, u)
  184. }
  185. return
  186. }
  187. // InIPSync insert IpSync
  188. func (d *Dao) InIPSync(ctx context.Context, u *unicom.UnicomIpJson, now time.Time) (row int64, err error) {
  189. res, err := d.inIPSyncSQL.Exec(ctx, u.Ipbegin, u.Ipend, u.Provinces, u.Isopen, u.Opertime, u.Sign, now, now,
  190. u.Ipbegin, u.Ipend, u.Provinces, u.Isopen, u.Opertime, u.Sign, now)
  191. if err != nil {
  192. log.Error("d.inIPSyncSQL.Exec error(%v)", err)
  193. return 0, err
  194. }
  195. return res.RowsAffected()
  196. }
  197. // InPack insert Privilege pack
  198. func (d *Dao) InPack(ctx context.Context, usermob string, mid int64) (row int64, err error) {
  199. res, err := d.inPackSQL.Exec(ctx, usermob, mid)
  200. if err != nil {
  201. log.Error("d.inPackSQL.Exec error(%v)", err)
  202. return 0, err
  203. }
  204. return res.RowsAffected()
  205. }
  206. // Pack select Privilege pack
  207. func (d *Dao) Pack(ctx context.Context, usermobStr string) (res map[string]map[int64]struct{}, err error) {
  208. row := d.packSQL.QueryRow(ctx, usermobStr)
  209. var (
  210. usermob string
  211. mid int64
  212. )
  213. res = map[string]map[int64]struct{}{}
  214. if err = row.Scan(&usermob, &mid); err != nil {
  215. if err == sql.ErrNoRows {
  216. err = nil
  217. } else {
  218. log.Error("OrdersUserFlow rows.Scan err (%v)", err)
  219. }
  220. }
  221. if user, ok := res[usermob]; !ok {
  222. res[usermob] = map[int64]struct{}{
  223. mid: struct{}{},
  224. }
  225. } else {
  226. user[mid] = struct{}{}
  227. }
  228. return
  229. }
  230. // IPSync select all ipSync
  231. func (d *Dao) IPSync(ctx context.Context) (res []*unicom.UnicomIP, err error) {
  232. rows, err := d.ipSyncSQL.Query(ctx)
  233. if err != nil {
  234. log.Error("query error (%v)", err)
  235. return
  236. }
  237. defer rows.Close()
  238. res = []*unicom.UnicomIP{}
  239. for rows.Next() {
  240. u := &unicom.UnicomIP{}
  241. if err = rows.Scan(&u.Ipbegin, &u.Ipend); err != nil {
  242. log.Error("rows.Scan error(%v)", err)
  243. return
  244. }
  245. u.UnicomIPChange()
  246. res = append(res, u)
  247. }
  248. return
  249. }
  250. // InUnicomIPSync insert or update unicom_ip
  251. func (d *Dao) InUnicomIPSync(tx *xsql.Tx, u *unicom.UnicomIP, now time.Time) (row int64, err error) {
  252. res, err := tx.Exec(_inUnicomIPSyncSQL, u.Ipbegin, u.Ipend, 1, now, now,
  253. u.Ipbegin, u.Ipend, 1, now)
  254. if err != nil {
  255. log.Error("tx.inUnicomIPSyncSQL.Exec error(%v)", err)
  256. return 0, err
  257. }
  258. return res.RowsAffected()
  259. }
  260. // UpUnicomIP update unicom_ip state
  261. func (d *Dao) UpUnicomIP(tx *xsql.Tx, ipstart, ipend, state int, now time.Time) (row int64, err error) {
  262. res, err := tx.Exec(_upUnicomIPSQL, state, now, ipstart, ipend)
  263. if err != nil {
  264. log.Error("tx.upUnicomIPSQL.Exec error(%v)", err)
  265. return 0, err
  266. }
  267. return res.RowsAffected()
  268. }
  269. // InUserBind unicom insert user bind
  270. func (d *Dao) InUserBind(ctx context.Context, ub *unicom.UserBind) (row int64, err error) {
  271. res, err := d.inUserBindSQL.Exec(ctx, ub.Usermob, ub.Phone, ub.Mid, ub.State, ub.Integral, ub.Flow, ub.Phone, ub.State)
  272. if err != nil {
  273. log.Error("insert user bind error(%v)", err)
  274. return
  275. }
  276. return res.RowsAffected()
  277. }
  278. // UserBind unicom select user bind
  279. func (d *Dao) UserBind(ctx context.Context, mid int64) (res *unicom.UserBind, err error) {
  280. row := d.userBindSQL.QueryRow(ctx, mid)
  281. if row == nil {
  282. log.Error("userBindSQL is null")
  283. return
  284. }
  285. res = &unicom.UserBind{}
  286. if err = row.Scan(&res.Usermob, &res.Phone, &res.Mid, &res.State, &res.Integral, &res.Flow, &res.Monthly); err != nil {
  287. if err == sql.ErrNoRows {
  288. err = nil
  289. } else {
  290. log.Error("userBindSQL row.Scan error(%v)", err)
  291. }
  292. res = nil
  293. return
  294. }
  295. return
  296. }
  297. // UserPacks user pack list
  298. func (d *Dao) UserPacks(ctx context.Context) (res []*unicom.UserPack, err error) {
  299. rows, err := d.userPacksSQL.Query(ctx)
  300. if err != nil {
  301. log.Error("user pack sql error(%v)", err)
  302. return
  303. }
  304. defer rows.Close()
  305. for rows.Next() {
  306. u := &unicom.UserPack{}
  307. if err = rows.Scan(&u.ID, &u.Type, &u.Desc, &u.Amount, &u.Capped, &u.Integral, &u.Param); err != nil {
  308. log.Error("user pack sql error(%v)", err)
  309. return
  310. }
  311. res = append(res, u)
  312. }
  313. return
  314. }
  315. // UserPackByID user pack by id
  316. func (d *Dao) UserPackByID(ctx context.Context, id int64) (res map[int64]*unicom.UserPack, err error) {
  317. res = map[int64]*unicom.UserPack{}
  318. row := d.userPacksByIDSQL.QueryRow(ctx, id)
  319. if row == nil {
  320. log.Error("user pack sql is null")
  321. return
  322. }
  323. u := &unicom.UserPack{}
  324. if err = row.Scan(&u.ID, &u.Type, &u.Desc, &u.Amount, &u.Capped, &u.Integral, &u.Param, &u.State); err != nil {
  325. if err == sql.ErrNoRows {
  326. err = nil
  327. } else {
  328. log.Error("userPacksByIDSQL row.Scan error(%v)", err)
  329. }
  330. return
  331. }
  332. res[id] = u
  333. return
  334. }
  335. // UpUserPacks update user packs
  336. func (d *Dao) UpUserPacks(ctx context.Context, u *unicom.UserPack, id int64) (row int64, err error) {
  337. res, err := d.upUserPacksSQL.Exec(ctx, u.Amount, u.State, id)
  338. if err != nil {
  339. log.Error("update user pack sql error(%v)", err)
  340. return 0, err
  341. }
  342. return res.RowsAffected()
  343. }
  344. // UpUserIntegral update unicom user integral
  345. func (d *Dao) UpUserIntegral(ctx context.Context, ub *unicom.UserBind) (row int64, err error) {
  346. res, err := d.upUserIntegralSQL.Exec(ctx, ub.Integral, ub.Flow, ub.Phone, ub.Mid)
  347. if err != nil {
  348. log.Error("update user integral sql error(%v)", err)
  349. return 0, err
  350. }
  351. return res.RowsAffected()
  352. }
  353. // UserBindPhoneMid mid by phone
  354. func (d *Dao) UserBindPhoneMid(ctx context.Context, phone string) (mid int64, err error) {
  355. row := d.userBindPhoneMidSQL.QueryRow(ctx, phone)
  356. if row == nil {
  357. log.Error("user pack sql is null")
  358. return
  359. }
  360. if err = row.Scan(&mid); err != nil {
  361. if err == sql.ErrNoRows {
  362. err = nil
  363. } else {
  364. log.Error("userPacksByIDSQL row.Scan error(%v)", err)
  365. }
  366. return
  367. }
  368. return
  369. }
  370. // InUserPackLog insert unicom user pack log
  371. func (d *Dao) InUserPackLog(ctx context.Context, u *unicom.UserPackLog) (row int64, err error) {
  372. res, err := d.inUserPackLogSQL.Exec(ctx, u.Phone, u.Usermob, u.Mid, u.RequestNo, u.Type, u.Integral, u.Desc)
  373. if err != nil {
  374. log.Error("insert user pack log integral sql error(%v)", err)
  375. return 0, err
  376. }
  377. return res.RowsAffected()
  378. }
  379. // UserBindOld user by phone
  380. func (d *Dao) UserBindOld(ctx context.Context, phone string) (res *unicom.UserBind, err error) {
  381. row := d.userBindOldSQL.QueryRow(ctx, phone)
  382. if row == nil {
  383. log.Error("user bind old sql is null")
  384. return
  385. }
  386. res = &unicom.UserBind{}
  387. if err = row.Scan(&res.Usermob, &res.Phone, &res.Mid, &res.State, &res.Integral, &res.Flow); err != nil {
  388. log.Error("userBindSQL row.Scan error(%v)", err)
  389. res = nil
  390. return
  391. }
  392. return
  393. }
  394. // UserPacksLog user pack logs
  395. func (d *Dao) UserPacksLog(ctx context.Context, start, end time.Time) (res []*unicom.UserPackLog, err error) {
  396. rows, err := d.db.Query(ctx, _userPacksLogSQL, start, end)
  397. if err != nil {
  398. log.Error("query error (%v)", err)
  399. return
  400. }
  401. defer rows.Close()
  402. for rows.Next() {
  403. u := &unicom.UserPackLog{}
  404. if err = rows.Scan(&u.Phone, &u.Integral, &u.Type, &u.UserDesc); err != nil {
  405. log.Error("user packs log sql error(%v)", err)
  406. return
  407. }
  408. res = append(res, u)
  409. }
  410. err = rows.Err()
  411. return
  412. }
  413. // BeginTran begin a transacition
  414. func (d *Dao) BeginTran(ctx context.Context) (tx *xsql.Tx, err error) {
  415. return d.db.Begin(ctx)
  416. }