lancermonitor.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package lancermonitor
  2. import (
  3. "time"
  4. "net"
  5. "sync"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "errors"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _separator = "####"
  14. )
  15. var (
  16. lm *LancerMonitor
  17. started bool
  18. )
  19. type LancerMonitor struct {
  20. c *Config
  21. logRevStatusLock sync.Mutex
  22. logRevStatus map[string]int64
  23. ipAddr string
  24. }
  25. func InitLancerMonitor(config *Config) (l *LancerMonitor, err error) {
  26. if started {
  27. return nil, errors.New("lancer Monitor can only be init Once")
  28. }
  29. if err = config.ConfigValidate(); err != nil {
  30. return nil, err
  31. }
  32. l = new(LancerMonitor)
  33. l.c = config
  34. l.logRevStatus = make(map[string]int64)
  35. l.ipAddr = InternalIP()
  36. go l.reportStatus()
  37. started = true
  38. lm = l
  39. return l, nil
  40. }
  41. func (l *LancerMonitor) reportStatus() {
  42. reportStatusTk := time.Tick(time.Duration(60 * time.Second))
  43. for {
  44. select {
  45. case <-reportStatusTk:
  46. logCount := l.getLogCount()
  47. conn, error := net.DialTimeout("tcp", l.c.Addr, time.Second*5)
  48. if error != nil {
  49. log.Error("failed to connect to lancer when report status")
  50. } else {
  51. for k, v := range logCount {
  52. fields := strings.Split(k, _separator)
  53. if len(fields) == 2 {
  54. fmt.Fprintf(conn, fields[0]+"\u0001"+strconv.FormatInt(v, 10)+"\u0001"+l.ipAddr+"\u0001"+strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10)+"\u0001"+fields[1]+"\u0001\u0001")
  55. }
  56. }
  57. log.Info("report status to lancer")
  58. conn.Close()
  59. }
  60. }
  61. }
  62. }
  63. // InternalIP get internal ip.
  64. func InternalIP() string {
  65. inters, err := net.Interfaces()
  66. if err != nil {
  67. return ""
  68. }
  69. for _, inter := range inters {
  70. if !strings.HasPrefix(inter.Name, "lo") {
  71. addrs, err := inter.Addrs()
  72. if err != nil {
  73. continue
  74. }
  75. for _, addr := range addrs {
  76. if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  77. if ipnet.IP.To4() != nil {
  78. return ipnet.IP.String()
  79. }
  80. }
  81. }
  82. }
  83. }
  84. return ""
  85. }
  86. //get log count of each logid since last call
  87. func (l *LancerMonitor) getLogCount() map[string]int64 {
  88. l.logRevStatusLock.Lock()
  89. defer l.logRevStatusLock.Unlock()
  90. logRevSendStatus := make(map[string]int64)
  91. for k, v := range l.logRevStatus {
  92. logRevSendStatus[k] = v
  93. }
  94. for k := range l.logRevStatus {
  95. delete(l.logRevStatus, k)
  96. }
  97. return logRevSendStatus
  98. }
  99. func IncreaseLogCount(name string, logId string) {
  100. if lm == nil || !started {
  101. return
  102. }
  103. if name == "" || logId == "" {
  104. return
  105. }
  106. key := name + _separator + logId
  107. lm.logRevStatusLock.Lock()
  108. defer lm.logRevStatusLock.Unlock()
  109. lm.logRevStatus[key] += 1
  110. }