flowmonitor.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package flowmonitor
  2. import (
  3. "errors"
  4. "time"
  5. "os"
  6. "strconv"
  7. "encoding/json"
  8. "net"
  9. "go-common/library/log"
  10. "go-common/app/service/ops/log-agent/pkg/flowmonitor/counter"
  11. "go-common/app/service/ops/log-agent/event"
  12. )
  13. type FlowMonitor struct {
  14. conf *Config
  15. monitorLogChanSize int
  16. currentVer int
  17. verMap map[int]*prometheus.CounterVec
  18. flowMonitorThrottle bool
  19. indexName string
  20. conn net.Conn
  21. }
  22. var argsError = errors.New("appId timeRangeKey status must be specified")
  23. var throttledError = errors.New("flow monitor is throttled")
  24. var notInitError = errors.New("flow monitor does not init")
  25. var Fm *FlowMonitor
  26. // InitFlowMonitor init flow monitor
  27. func InitFlowMonitor(conf *Config) (err error) {
  28. fm := new(FlowMonitor)
  29. fm.conf = conf
  30. if err = fm.checkConfig(); err != nil {
  31. return err
  32. }
  33. fm.flowMonitorThrottle = false
  34. fm.verMap = make(map[int]*prometheus.CounterVec)
  35. ver1 := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "FlowMonitor1", Help: "help"}, []string{"appId", "timeRangeKey", "source", "kind", "status"})
  36. prometheus.MustRegister(ver1)
  37. fm.verMap[0] = ver1
  38. ver2 := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "FlowMonitor2", Help: "help"}, []string{"appId", "timeRangeKey", "source", "kind", "status"})
  39. prometheus.MustRegister(ver2)
  40. fm.verMap[1] = ver2
  41. fm.currentVer = 0
  42. fm.newConn()
  43. go fm.flowmonitorreport()
  44. Fm = fm
  45. return nil
  46. }
  47. // getCurrentVer get the ver being used
  48. func (fm *FlowMonitor) getCurrentVer() *prometheus.CounterVec {
  49. return fm.verMap[fm.currentVer]
  50. }
  51. // rollVer roll to the next ver
  52. func (fm *FlowMonitor) rollVer() {
  53. fm.currentVer = (fm.currentVer + 1) % 2
  54. }
  55. func (fm *FlowMonitor) AddEvent(e *event.ProcessorEvent, source string, kind string, status string) {
  56. if len(e.AppId) != 0 {
  57. fm.Add(string(e.AppId), source, e.TimeRangeKey, kind, status)
  58. }
  59. fm.Add(e.LogId, source, e.TimeRangeKey, kind, status)
  60. }
  61. // Add do the metric
  62. func (fm *FlowMonitor) Add(appId string, source string, timeRangeKey string, kind string, status string) (error) {
  63. if fm == nil {
  64. return notInitError
  65. }
  66. if fm.flowMonitorThrottle {
  67. return throttledError
  68. }
  69. if appId == "" || source == "" || status == "" {
  70. return argsError
  71. }
  72. if timeRangeKey == "" {
  73. timeRangeKey = strconv.FormatInt(time.Now().Unix()/100*100, 10)
  74. }
  75. if counter, err := fm.getCurrentVer().GetMetricWithLabelValues(appId, timeRangeKey, source, kind, status); err != nil {
  76. return err
  77. } else {
  78. counter.Inc()
  79. return nil
  80. }
  81. }
  82. // readVec read metrics from one vec
  83. func (fm *FlowMonitor) readVec(ver *prometheus.CounterVec) error {
  84. if fm.conn == nil {
  85. if err := fm.newConn(); err != nil {
  86. return err
  87. }
  88. }
  89. metrics := make(chan prometheus.Metric)
  90. go func() {
  91. ver.Collect(metrics)
  92. close(metrics)
  93. }()
  94. hostname, _ := os.Hostname()
  95. var ignore_metric bool = false
  96. for {
  97. select {
  98. case metric, ok := <-metrics:
  99. if !ok {
  100. return nil
  101. }
  102. if ignore_metric {
  103. continue
  104. }
  105. data := make(map[string]interface{})
  106. for _, label := range metric.(prometheus.Counter).Lables() {
  107. data[*label.Name] = *label.Value
  108. }
  109. if timeRangeKey, ok := data["timeRangeKey"]; ok {
  110. timeint64, _ := strconv.ParseInt(timeRangeKey.(string), 10, 64)
  111. data["time"] = time.Unix(timeint64, 0).UTC().Format("2006-01-02T15:04:05")
  112. }
  113. data["hostname"] = hostname
  114. data["counter"] = metric.(prometheus.Counter).Value()
  115. if dataSend, err := json.Marshal(data); err == nil {
  116. dataSend = append(dataSend, []byte("\n")...)
  117. n, err := fm.conn.Write(dataSend)
  118. if err == nil && n < len(dataSend) {
  119. log.Error("Error: flow monitor write error: short write")
  120. }
  121. if err != nil {
  122. log.Error("Error: flow monitor write error: %v", err)
  123. fm.conn.Close()
  124. fm.conn = nil
  125. // if conn write error, just ignore. ver.Collect must be finished or RLock will not be released
  126. ignore_metric = true
  127. }
  128. }
  129. }
  130. }
  131. }
  132. // linkmonitorreport report link monitor data periodicity
  133. func (fm *FlowMonitor) flowmonitorreport() {
  134. for {
  135. time.Sleep(time.Duration(fm.conf.Interval))
  136. currentVer := fm.getCurrentVer()
  137. fm.rollVer()
  138. fm.readVec(currentVer)
  139. currentVer.Reset()
  140. }
  141. }
  142. // newConn make a conn to logstash(monitor data receiver)
  143. func (fm *FlowMonitor) newConn() error {
  144. conn, err := net.DialTimeout("tcp", fm.conf.Addr, time.Duration(time.Second*5))
  145. if err == nil && conn != nil {
  146. fm.conn = conn
  147. fm.flowMonitorThrottle = false
  148. log.Info("init flow monitor conn to: %s", fm.conf.Addr)
  149. return nil
  150. } else {
  151. log.Error("flow monitor conn failed: %s: %v", fm.conf.Addr, err)
  152. fm.flowMonitorThrottle = true
  153. return err
  154. }
  155. }