123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package infoc
- import (
- "fmt"
- "net"
- "sync/atomic"
- "time"
- "go-common/library/log"
- "go-common/library/net/ip"
- )
- type reporter struct {
- taskID string
- addr string
- iip string
- receiveCount int64
- sendCount int64
- fails []string
- }
- func newReporter(taskID, addr string) (r *reporter) {
- r = &reporter{
- taskID: taskID,
- addr: addr,
- iip: ip.InternalIP(),
- }
- return
- }
- func (r *reporter) receiveIncr(delta int64) {
- atomic.AddInt64(&r.receiveCount, delta)
- }
- func (r *reporter) sendIncr(delta int64) {
- atomic.AddInt64(&r.sendCount, delta)
- }
- func (r *reporter) reportproc() {
- tick := time.NewTicker(1 * time.Minute)
- for {
- <-tick.C
- r.reporter()
- }
- }
- func (r *reporter) flush() {
- r.reporter()
- }
- func (r *reporter) reporter() {
- const _timeout = time.Second
- conn, err := net.DialTimeout("tcp", r.addr, _timeout)
- if err != nil {
- log.Error("infoc reporter flush dial error(%v)", err)
- return
- }
- defer conn.Close()
- conn.SetDeadline(time.Now().Add(_timeout))
- var fails []string
- for _, fail := range r.fails {
- if _, err = conn.Write([]byte(fail)); err != nil {
- log.Error("infoc reporter write fail error(%v)", err)
- fails = append(fails, fail)
- }
- }
- for _, rc := range r.record(time.Now()) {
- if _, err = conn.Write([]byte(rc)); err != nil {
- log.Error("infoc reporter write error(%v)", err)
- fails = append(fails, rc)
- }
- }
- r.fails = fails
- }
- func (r *reporter) record(now time.Time) []string {
- rc := atomic.SwapInt64(&r.receiveCount, 0)
- sc := atomic.SwapInt64(&r.sendCount, 0)
- rcW := fmt.Sprintf("agent.receive.count\001%d\001%s\001%d\001%s\001\001", rc, r.iip, now.UnixNano()/int64(time.Millisecond), r.taskID)
- scW := fmt.Sprintf("agent.send.success.count\001%d\001%s\001%d\001%s\001\001", sc, r.iip, now.UnixNano()/int64(time.Millisecond), r.taskID)
- return []string{rcW, scW}
- }
|