grok.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package grok
  2. import (
  3. "context"
  4. "go-common/app/service/ops/log-agent/event"
  5. "go-common/app/service/ops/log-agent/processor"
  6. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  7. "github.com/vjeantet/grok"
  8. )
  9. type Grok struct {
  10. c *Config
  11. g *grok.Grok
  12. }
  13. func init() {
  14. err := processor.Register("grok", Process)
  15. if err != nil {
  16. panic(err)
  17. }
  18. }
  19. func Process(ctx context.Context, config interface{}, input <-chan *event.ProcessorEvent) (output chan *event.ProcessorEvent, err error) {
  20. g := new(Grok)
  21. if c, ok := config.(*Config); !ok {
  22. panic("Error config for Grok Processor")
  23. } else {
  24. if err = c.ConfigValidate(); err != nil {
  25. return nil, err
  26. }
  27. g.c = c
  28. }
  29. if g.g, err = grok.New(); err != nil {
  30. return nil, err
  31. }
  32. output = make(chan *event.ProcessorEvent)
  33. go func() {
  34. for {
  35. select {
  36. case e := <-input:
  37. values, err := g.g.Parse(g.c.Pattern, e.String())
  38. if err != nil || len(values) == 0 {
  39. flowmonitor.Fm.AddEvent(e, "log-agent.processor.grok", "WARN", "grok error")
  40. e.Tags = append(e.Tags, "grok_error")
  41. output <- e
  42. continue
  43. }
  44. e.ParsedFields = values
  45. output <- e
  46. case <-ctx.Done():
  47. return
  48. }
  49. }
  50. }()
  51. return output, nil
  52. }