httpstream.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package httpstream
  2. import (
  3. "net/http"
  4. "sync"
  5. "regexp"
  6. "bytes"
  7. "strconv"
  8. "context"
  9. "go-common/app/service/ops/log-agent/event"
  10. )
  11. type HttpStream struct {
  12. c *Config
  13. logstreams map[chan *event.ProcessorEvent]*filterRule
  14. l sync.Mutex
  15. }
  16. type filterRule struct {
  17. maxLines int
  18. appId string
  19. instanceId string
  20. reg *regexp.Regexp
  21. }
  22. var LogSourceChan = make(chan *event.ProcessorEvent)
  23. // initlogStream init log stream
  24. func NewHttpStream(config *Config) (httpStream *HttpStream, err error) {
  25. h := new(HttpStream)
  26. if err := config.ConfigValidate(); err != nil {
  27. return nil, err
  28. }
  29. h.c = config
  30. h.logstreams = make(map[chan *event.ProcessorEvent]*filterRule)
  31. http.HandleFunc("/logs", h.LogStreamer())
  32. go h.route()
  33. go http.ListenAndServe(h.c.Addr, nil)
  34. return h, nil
  35. }
  36. // route 把日志路由到所有注册的logstream
  37. func (s *HttpStream) route() {
  38. for buf := range LogSourceChan {
  39. for logstream, _ := range s.logstreams {
  40. logstream <- buf
  41. }
  42. }
  43. }
  44. // LogStreamer 接收请求
  45. func (s *HttpStream) LogStreamer() func(w http.ResponseWriter, req *http.Request) {
  46. logsHandler := func(w http.ResponseWriter, req *http.Request) {
  47. logstream := make(chan *event.ProcessorEvent)
  48. f := new(filterRule)
  49. // parse params
  50. params := req.URL.Query()
  51. if appId, ok := params["app_id"]; ok {
  52. f.appId = appId[0]
  53. } else {
  54. w.Write(append([]byte("必须指定app_id"), '\n'))
  55. return
  56. }
  57. if reg, ok := params["regexp"]; ok {
  58. if filterReg, err := regexp.Compile(reg[0]); err == nil {
  59. f.reg = filterReg
  60. } else {
  61. w.Write(append([]byte("正则表达式格式错误"), '\n'))
  62. return
  63. }
  64. }
  65. if instanceId, ok := params["instance_id"]; ok {
  66. f.instanceId = instanceId[0]
  67. }
  68. if maxLines, ok := params["max_lines"]; ok {
  69. if n, err := strconv.Atoi(maxLines[0]); err == nil {
  70. f.maxLines = n
  71. } else {
  72. w.Write(append([]byte("max_lines格式错误"), '\n'))
  73. return
  74. }
  75. }
  76. s.add(logstream, f)
  77. go func() {
  78. select {
  79. case <-req.Context().Done():
  80. s.remove(logstream)
  81. }
  82. }()
  83. defer s.httpStreamer(req.Context(), w, req, logstream, f)
  84. }
  85. return logsHandler
  86. }
  87. // httpStreamer 过滤并输出日志
  88. func (s *HttpStream) httpStreamer(ctx context.Context, w http.ResponseWriter, req *http.Request, logstream chan *event.ProcessorEvent, f *filterRule) {
  89. w.Header().Add("Content-Type", "text/plain; charset=utf-8")
  90. if f.instanceId != "" {
  91. //w.Write([]byte(fmt.Sprintf(
  92. // "\x1b[1;31m%s\x1b[0m\n", "注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出\n")))
  93. w.Write([]byte("注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出 \n"))
  94. w.(http.Flusher).Flush()
  95. }
  96. c := 0
  97. for {
  98. select {
  99. case e := <-logstream:
  100. if f.appId != "" && string(e.AppId) != f.appId {
  101. continue
  102. }
  103. if f.reg != nil && !f.reg.Match(e.Bytes()) {
  104. continue
  105. }
  106. if f.instanceId != "" && !bytes.Contains(e.Bytes(), []byte(f.instanceId)) {
  107. continue
  108. }
  109. if f.maxLines != 0 && c >= f.maxLines {
  110. s.remove(logstream)
  111. return
  112. }
  113. c += 1
  114. // TODO event recycle
  115. w.Write(append(e.Bytes(), '\n'))
  116. w.(http.Flusher).Flush()
  117. case <-ctx.Done():
  118. return
  119. }
  120. }
  121. }
  122. // add 注册logstream
  123. func (s *HttpStream) add(logstream chan *event.ProcessorEvent, f *filterRule) {
  124. s.l.Lock()
  125. defer s.l.Unlock()
  126. s.logstreams[logstream] = f
  127. }
  128. // remove 注销logstream
  129. func (s *HttpStream) remove(logstream chan *event.ProcessorEvent) {
  130. s.l.Lock()
  131. defer s.l.Unlock()
  132. delete(s.logstreams, logstream)
  133. }