stdout.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package stdout
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/app/service/ops/log-agent/output"
  6. "go-common/app/service/ops/log-agent/event"
  7. )
  8. type Stdout struct {
  9. c *Config
  10. ctx context.Context
  11. cancel context.CancelFunc
  12. i chan *event.ProcessorEvent
  13. }
  14. func init() {
  15. err := output.Register("stdout", NewStdout)
  16. if err != nil {
  17. panic(err)
  18. }
  19. }
  20. func NewStdout(ctx context.Context, config interface{}) (output.Output, error) {
  21. var err error
  22. stdout := new(Stdout)
  23. if c, ok := config.(*Config); !ok {
  24. return nil, fmt.Errorf("Error config for Lancer output")
  25. } else {
  26. if err = c.ConfigValidate(); err != nil {
  27. return nil, err
  28. }
  29. stdout.c = c
  30. }
  31. stdout.i = make(chan *event.ProcessorEvent)
  32. stdout.ctx, stdout.cancel = context.WithCancel(ctx)
  33. return stdout, nil
  34. }
  35. func (s *Stdout) Run() (err error) {
  36. go func() {
  37. for {
  38. select {
  39. case e := <-s.i:
  40. fmt.Println(string(e.Body))
  41. case <-s.ctx.Done():
  42. return
  43. }
  44. }
  45. }()
  46. if s.c.Name != "" {
  47. output.RegisterOutput(s.c.Name, s)
  48. }
  49. return nil
  50. }
  51. func (s *Stdout) Stop() {
  52. s.cancel()
  53. }
  54. func (s *Stdout) InputChan() (chan *event.ProcessorEvent) {
  55. return s.i
  56. }