lancer.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package lancergrpc
  2. import (
  3. "context"
  4. "fmt"
  5. "bytes"
  6. "sync"
  7. "strconv"
  8. "time"
  9. "math"
  10. "go-common/app/service/ops/log-agent/event"
  11. "go-common/app/service/ops/log-agent/output"
  12. "go-common/app/service/ops/log-agent/pkg/flowmonitor"
  13. "go-common/app/service/ops/log-agent/pkg/common"
  14. "go-common/app/service/ops/log-agent/output/cache/file"
  15. "go-common/library/log"
  16. "go-common/app/service/ops/log-agent/pkg/lancermonitor"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/codes"
  19. "go-common/app/service/ops/log-agent/output/lancergrpc/lancergateway"
  20. )
  21. const (
  22. _appIdKey = `"app_id":`
  23. _levelKey = `"level":`
  24. _logTime = `"time":`
  25. )
  26. var (
  27. logMagic = []byte{0xAC, 0xBE}
  28. logMagicBuf = []byte{0xAC, 0xBE}
  29. _logType = []byte{0, 1}
  30. _logLength = []byte{0, 0, 0, 0}
  31. local, _ = time.LoadLocation("Local")
  32. )
  33. type logDoc struct {
  34. b []byte
  35. logId string
  36. }
  37. func init() {
  38. err := output.Register("lancergrpc", NewLancer)
  39. if err != nil {
  40. panic(err)
  41. }
  42. }
  43. type Lancer struct {
  44. c *Config
  45. next chan string
  46. i chan *event.ProcessorEvent
  47. cache *file.FileCache
  48. logAggrBuf map[string]*bytes.Buffer
  49. logAggrBufLock sync.Mutex
  50. sendChan chan *logDoc
  51. lancerClient lancergateway.Gateway2ServerClient
  52. ctx context.Context
  53. cancel context.CancelFunc
  54. }
  55. func NewLancer(ctx context.Context, config interface{}) (output.Output, error) {
  56. var err error
  57. lancer := new(Lancer)
  58. if c, ok := config.(*Config); !ok {
  59. return nil, fmt.Errorf("Error config for Lancer output")
  60. } else {
  61. if err = c.ConfigValidate(); err != nil {
  62. return nil, err
  63. }
  64. lancer.c = c
  65. }
  66. if output.OutputRunning(lancer.c.Name) {
  67. return nil, fmt.Errorf("Output %s already running", lancer.c.Name)
  68. }
  69. lancer.i = make(chan *event.ProcessorEvent)
  70. lancer.next = make(chan string, 1)
  71. lancer.logAggrBuf = make(map[string]*bytes.Buffer)
  72. lancer.sendChan = make(chan *logDoc)
  73. cache, err := file.NewFileCache(lancer.c.CacheConfig)
  74. if err != nil {
  75. return nil, err
  76. }
  77. lancer.cache = cache
  78. lancer.lancerClient, err = lancergateway.NewClient(lancer.c.LancerGateway)
  79. if err != nil {
  80. return nil, err
  81. }
  82. lancer.ctx, lancer.cancel = context.WithCancel(ctx)
  83. return lancer, nil
  84. }
  85. func (l *Lancer) InputChan() (chan *event.ProcessorEvent) {
  86. return l.i
  87. }
  88. func (l *Lancer) Run() (err error) {
  89. go l.readFromProcessor()
  90. go l.consumeCache()
  91. go l.flushLogAggrPeriodically()
  92. for i := 0; i < l.c.SendConcurrency; i++ {
  93. go l.sendToLancer()
  94. }
  95. if l.c.Name != "" {
  96. output.RegisterOutput(l.c.Name, l)
  97. }
  98. return nil
  99. }
  100. func (l *Lancer) Stop() {
  101. l.cancel()
  102. }
  103. func (l *Lancer) readFromProcessor() {
  104. for e := range l.i {
  105. // only cache for sock input
  106. if e.Source == "sock" {
  107. l.cache.WriteToCache(e)
  108. continue
  109. }
  110. // without cache
  111. l.preWriteToLancer(e)
  112. }
  113. }
  114. func (l *Lancer) preWriteToLancer(e *event.ProcessorEvent) {
  115. flowmonitor.Fm.AddEvent(e, "log-agent.output.lancer", "OK", "write to lancer")
  116. lancermonitor.IncreaseLogCount("agent.send.success.count", e.LogId)
  117. if l.c.Name == "lancer-ops-log" {
  118. l.logAggr(e)
  119. } else {
  120. l.sendLogDirectToLancer(e)
  121. }
  122. }
  123. // consumeCache consume logs from cache
  124. func (l *Lancer) consumeCache() {
  125. for {
  126. e := l.cache.ReadFromCache()
  127. if e.Length < _logLancerHeaderLen {
  128. event.PutEvent(e)
  129. continue
  130. }
  131. // monitor should be called before event recycle
  132. l.parseOpslog(e)
  133. l.preWriteToLancer(e)
  134. }
  135. }
  136. func (l *Lancer) parseOpslog(e *event.ProcessorEvent) {
  137. if l.c.Name == "lancer-ops-log" {
  138. e.AppId, _ = common.SeekValue([]byte(_appIdKey), e.Bytes())
  139. if timeValue, err := common.SeekValue([]byte(_logTime), e.Bytes()); err == nil {
  140. if len(timeValue) >= 19 {
  141. // parse time
  142. var t time.Time
  143. if t, err = time.Parse(time.RFC3339Nano, string(timeValue)); err != nil {
  144. if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue), local); err != nil {
  145. if t, err = time.ParseInLocation("2006-01-02T15:04:05", string(timeValue[0:19]), local); err != nil {
  146. }
  147. }
  148. }
  149. if !t.IsZero() {
  150. e.TimeRangeKey = strconv.FormatInt(t.Unix()/100*100, 10)
  151. }
  152. }
  153. }
  154. }
  155. }
  156. // sendLogDirectToLancer send log direct to lancer without aggr
  157. func (l *Lancer) sendLogDirectToLancer(e *event.ProcessorEvent) {
  158. logDoc := new(logDoc)
  159. logDoc.b = make([]byte, e.Length)
  160. copy(logDoc.b, e.Bytes())
  161. logDoc.logId = e.LogId
  162. event.PutEvent(e)
  163. l.sendChan <- logDoc
  164. }
  165. func (l *Lancer) nextRetry(retry int) (time.Duration) {
  166. // avoid d too large
  167. if retry > 10 {
  168. return time.Duration(l.c.MaxRetryDuration)
  169. }
  170. d := time.Duration(math.Pow(2, float64(retry))) * time.Duration(l.c.InitialRetryDuration)
  171. if d > time.Duration(l.c.MaxRetryDuration) {
  172. return time.Duration(l.c.MaxRetryDuration)
  173. }
  174. return d
  175. }
  176. func (l *Lancer) bulkSendToLancerWithRetry(in *lancergateway.EventList) {
  177. retry := 0
  178. for {
  179. ctx, _ := context.WithTimeout(context.Background(), time.Duration(l.c.SendBatchTimeout))
  180. t1 := time.Now()
  181. resp, err := l.lancerClient.SendList(ctx, in)
  182. if err == nil {
  183. if resp.Code == lancergateway.StatusCode_SUCCESS {
  184. log.Info("get 200 from lancer gateway: size %d, count %d, cost %s", in.Size(), len(in.Events), time.Since(t1).String())
  185. return
  186. }
  187. flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer None 200: %s", resp.Code))
  188. log.Warn("get None 200 from lancer gateway, retry: %s", resp.Code)
  189. }
  190. if err != nil {
  191. switch grpc.Code(err) {
  192. case codes.Canceled, codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted:
  193. flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, retry: %s", err))
  194. log.Warn("get error from lancer gateway, retry: %s", err)
  195. default:
  196. flowmonitor.Fm.Add("log-agent", "log-agent.output.lancer", "", "ERROR", fmt.Sprintf("write to lancer failed, no retry: %s", err))
  197. log.Warn("get error from lancer gateway, no retry: %s", err)
  198. return
  199. }
  200. }
  201. time.Sleep(l.nextRetry(retry))
  202. retry ++
  203. }
  204. }
  205. // sendproc send the proc to lancer
  206. func (l *Lancer) sendToLancer() {
  207. eventList := new(lancergateway.EventList)
  208. eventListLock := sync.Mutex{}
  209. lastSend := time.Now()
  210. ticker := time.Tick(time.Second * 1)
  211. size := 0
  212. for {
  213. select {
  214. case <-ticker:
  215. if lastSend.Add(time.Duration(l.c.SendFlushInterval)).Before(time.Now()) && len(eventList.Events) > 0 {
  216. eventListLock.Lock()
  217. l.bulkSendToLancerWithRetry(eventList)
  218. eventList.Reset()
  219. size = 0
  220. eventListLock.Unlock()
  221. lastSend = time.Now()
  222. }
  223. case logDoc := <-l.sendChan:
  224. event := new(lancergateway.SimpleEvent)
  225. event.LogId = logDoc.logId
  226. event.Header = map[string]string{"timestamp": strconv.FormatInt(time.Now().Unix()/100*100, 10)}
  227. event.Data = logDoc.b
  228. size += len(event.Data)
  229. eventListLock.Lock()
  230. eventList.Events = append(eventList.Events, event)
  231. if size > l.c.SendBatchSize || len(eventList.Events) > l.c.SendBatchNum {
  232. l.bulkSendToLancerWithRetry(eventList)
  233. eventList.Reset()
  234. size = 0
  235. lastSend = time.Now()
  236. }
  237. eventListLock.Unlock()
  238. }
  239. }
  240. }