reporter.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package infoc
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. "go-common/library/log"
  8. "go-common/library/net/ip"
  9. )
  10. type reporter struct {
  11. taskID string
  12. addr string
  13. iip string
  14. receiveCount int64
  15. sendCount int64
  16. fails []string
  17. }
  18. func newReporter(taskID, addr string) (r *reporter) {
  19. r = &reporter{
  20. taskID: taskID,
  21. addr: addr,
  22. iip: ip.InternalIP(),
  23. }
  24. return
  25. }
  26. func (r *reporter) receiveIncr(delta int64) {
  27. atomic.AddInt64(&r.receiveCount, delta)
  28. }
  29. func (r *reporter) sendIncr(delta int64) {
  30. atomic.AddInt64(&r.sendCount, delta)
  31. }
  32. func (r *reporter) reportproc() {
  33. tick := time.NewTicker(1 * time.Minute)
  34. for {
  35. <-tick.C
  36. r.reporter()
  37. }
  38. }
  39. func (r *reporter) flush() {
  40. r.reporter()
  41. }
  42. func (r *reporter) reporter() {
  43. const _timeout = time.Second
  44. conn, err := net.DialTimeout("tcp", r.addr, _timeout)
  45. if err != nil {
  46. log.Error("infoc reporter flush dial error(%v)", err)
  47. return
  48. }
  49. defer conn.Close()
  50. conn.SetDeadline(time.Now().Add(_timeout))
  51. var fails []string
  52. for _, fail := range r.fails {
  53. if _, err = conn.Write([]byte(fail)); err != nil {
  54. log.Error("infoc reporter write fail error(%v)", err)
  55. fails = append(fails, fail)
  56. }
  57. }
  58. for _, rc := range r.record(time.Now()) {
  59. if _, err = conn.Write([]byte(rc)); err != nil {
  60. log.Error("infoc reporter write error(%v)", err)
  61. fails = append(fails, rc)
  62. }
  63. }
  64. r.fails = fails
  65. }
  66. func (r *reporter) record(now time.Time) []string {
  67. rc := atomic.SwapInt64(&r.receiveCount, 0)
  68. sc := atomic.SwapInt64(&r.sendCount, 0)
  69. rcW := fmt.Sprintf("agent.receive.count\001%d\001%s\001%d\001%s\001\001", rc, r.iip, now.UnixNano()/int64(time.Millisecond), r.taskID)
  70. scW := fmt.Sprintf("agent.send.success.count\001%d\001%s\001%d\001%s\001\001", sc, r.iip, now.UnixNano()/int64(time.Millisecond), r.taskID)
  71. return []string{rcW, scW}
  72. }