123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package httpstream
- import (
- "net/http"
- "sync"
- "regexp"
- "bytes"
- "strconv"
- "context"
- "go-common/app/service/ops/log-agent/event"
- )
- type HttpStream struct {
- c *Config
- logstreams map[chan *event.ProcessorEvent]*filterRule
- l sync.Mutex
- }
- type filterRule struct {
- maxLines int
- appId string
- instanceId string
- reg *regexp.Regexp
- }
- var LogSourceChan = make(chan *event.ProcessorEvent)
- // initlogStream init log stream
- func NewHttpStream(config *Config) (httpStream *HttpStream, err error) {
- h := new(HttpStream)
- if err := config.ConfigValidate(); err != nil {
- return nil, err
- }
- h.c = config
- h.logstreams = make(map[chan *event.ProcessorEvent]*filterRule)
- http.HandleFunc("/logs", h.LogStreamer())
- go h.route()
- go http.ListenAndServe(h.c.Addr, nil)
- return h, nil
- }
- // route 把日志路由到所有注册的logstream
- func (s *HttpStream) route() {
- for buf := range LogSourceChan {
- for logstream, _ := range s.logstreams {
- logstream <- buf
- }
- }
- }
- // LogStreamer 接收请求
- func (s *HttpStream) LogStreamer() func(w http.ResponseWriter, req *http.Request) {
- logsHandler := func(w http.ResponseWriter, req *http.Request) {
- logstream := make(chan *event.ProcessorEvent)
- f := new(filterRule)
- // parse params
- params := req.URL.Query()
- if appId, ok := params["app_id"]; ok {
- f.appId = appId[0]
- } else {
- w.Write(append([]byte("必须指定app_id"), '\n'))
- return
- }
- if reg, ok := params["regexp"]; ok {
- if filterReg, err := regexp.Compile(reg[0]); err == nil {
- f.reg = filterReg
- } else {
- w.Write(append([]byte("正则表达式格式错误"), '\n'))
- return
- }
- }
- if instanceId, ok := params["instance_id"]; ok {
- f.instanceId = instanceId[0]
- }
- if maxLines, ok := params["max_lines"]; ok {
- if n, err := strconv.Atoi(maxLines[0]); err == nil {
- f.maxLines = n
- } else {
- w.Write(append([]byte("max_lines格式错误"), '\n'))
- return
- }
- }
- s.add(logstream, f)
- go func() {
- select {
- case <-req.Context().Done():
- s.remove(logstream)
- }
- }()
- defer s.httpStreamer(req.Context(), w, req, logstream, f)
- }
- return logsHandler
- }
- // httpStreamer 过滤并输出日志
- func (s *HttpStream) httpStreamer(ctx context.Context, w http.ResponseWriter, req *http.Request, logstream chan *event.ProcessorEvent, f *filterRule) {
- w.Header().Add("Content-Type", "text/plain; charset=utf-8")
- if f.instanceId != "" {
- //w.Write([]byte(fmt.Sprintf(
- // "\x1b[1;31m%s\x1b[0m\n", "注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出\n")))
- w.Write([]byte("注意:caster中,只有(1)app_id满足服务树三级格式 (2)日志包含instance_id且值为实例名称 的情况下,日志才能输出 \n"))
- w.(http.Flusher).Flush()
- }
- c := 0
- for {
- select {
- case e := <-logstream:
- if f.appId != "" && string(e.AppId) != f.appId {
- continue
- }
- if f.reg != nil && !f.reg.Match(e.Bytes()) {
- continue
- }
- if f.instanceId != "" && !bytes.Contains(e.Bytes(), []byte(f.instanceId)) {
- continue
- }
- if f.maxLines != 0 && c >= f.maxLines {
- s.remove(logstream)
- return
- }
- c += 1
- // TODO event recycle
- w.Write(append(e.Bytes(), '\n'))
- w.(http.Flusher).Flush()
- case <-ctx.Done():
- return
- }
- }
- }
- // add 注册logstream
- func (s *HttpStream) add(logstream chan *event.ProcessorEvent, f *filterRule) {
- s.l.Lock()
- defer s.l.Unlock()
- s.logstreams[logstream] = f
- }
- // remove 注销logstream
- func (s *HttpStream) remove(logstream chan *event.ProcessorEvent) {
- s.l.Lock()
- defer s.l.Unlock()
- delete(s.logstreams, logstream)
- }
|