pool.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package lancergrpc
  2. import (
  3. "net"
  4. "time"
  5. "sync"
  6. "errors"
  7. "math/rand"
  8. "expvar"
  9. "go-common/library/log"
  10. "go-common/app/service/ops/log-agent/pkg/bufio"
  11. )
  12. var (
  13. ErrAddrListNil = errors.New("addrList can't be nil")
  14. ErrPoolSize = errors.New("Pool size should be no greater then length of addr list")
  15. )
  16. type LancerBufConn struct {
  17. conn net.Conn
  18. wr *bufio.Writer
  19. enabled bool
  20. ctime time.Time
  21. }
  22. type connPool struct {
  23. c *ConnPoolConfig
  24. invalidUpstreams map[string]interface{}
  25. invalidUpstreamsLock sync.RWMutex
  26. validBufConnChan chan *LancerBufConn
  27. invalidBufConnChan chan *LancerBufConn
  28. connCounter map[string]int
  29. connCounterLock sync.RWMutex
  30. newConnLock sync.Mutex
  31. }
  32. type ConnPoolConfig struct {
  33. Name string `tome:"name"`
  34. AddrList []string `tome:"addrList"`
  35. DialTimeout time.Duration `tome:"dialTimeout"`
  36. IdleTimeout time.Duration `tome:"idleTimeout"`
  37. BufSize int `tome:"bufSize"`
  38. PoolSize int `tome:"poolSize"`
  39. }
  40. func (c *ConnPoolConfig) ConfigValidate() (error) {
  41. if c == nil {
  42. return errors.New("Config of pool is nil")
  43. }
  44. if len(c.AddrList) == 0 {
  45. return errors.New("pool addr list can't be empty")
  46. }
  47. if c.DialTimeout.Seconds() == 0 {
  48. c.DialTimeout = time.Second * 5
  49. }
  50. if c.IdleTimeout.Seconds() == 0 {
  51. c.IdleTimeout = time.Minute * 15
  52. }
  53. if c.BufSize == 0 {
  54. c.BufSize = 1024 * 1024 * 2 // 2M by default
  55. }
  56. if c.PoolSize == 0 {
  57. c.PoolSize = len(c.AddrList)
  58. }
  59. return nil
  60. }
  61. // newConn make a connection to lancer
  62. func (cp *connPool) newConn() (conn net.Conn, err error) {
  63. cp.newConnLock.Lock()
  64. defer cp.newConnLock.Unlock()
  65. for {
  66. if addr, err := cp.randomOneUpstream(); err == nil {
  67. if conn, err := net.DialTimeout("tcp", addr, cp.c.DialTimeout); err == nil && conn != nil {
  68. log.Info("connect to %s success", addr)
  69. cp.connCounterAdd(addr)
  70. return conn, nil
  71. } else {
  72. cp.markUpstreamInvalid(addr)
  73. continue
  74. }
  75. } else {
  76. return nil, err
  77. }
  78. }
  79. }
  80. // newBufConn 创建一个buf连接, buf连接绑定一个conn(无论连接是否可用)
  81. func (cp *connPool) newBufConn() (bufConn *LancerBufConn, err error) {
  82. bufConn = new(LancerBufConn)
  83. bufConn.wr = bufio.NewWriterSize(nil, cp.c.BufSize)
  84. if err := cp.setConn(bufConn); err == nil {
  85. bufConn.enabled = true
  86. } else {
  87. bufConn.enabled = false
  88. }
  89. return bufConn, nil
  90. }
  91. // flushBufConn 定期flush buffer
  92. func (cp *connPool) flushBufConn() {
  93. for {
  94. bufConn, _ := cp.getBufConn()
  95. bufConn.conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
  96. if err := bufConn.wr.Flush(); err != nil {
  97. log.Error("Error when flush to %s: %s", bufConn.conn.RemoteAddr().String(), err)
  98. bufConn.enabled = false
  99. }
  100. cp.putBufConn(bufConn)
  101. time.Sleep(time.Second * 5)
  102. }
  103. }
  104. // initConnPool 初始化conn pool对象
  105. func initConnPool(c *ConnPoolConfig) (cp *connPool, err error) {
  106. if err = c.ConfigValidate(); err != nil {
  107. return nil, err
  108. }
  109. if len(c.AddrList) == 0 {
  110. return nil, ErrAddrListNil
  111. }
  112. if c.PoolSize > len(c.AddrList) {
  113. return nil, ErrPoolSize
  114. }
  115. rand.Seed(time.Now().Unix())
  116. cp = new(connPool)
  117. cp.c = c
  118. cp.validBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize)
  119. cp.invalidBufConnChan = make(chan *LancerBufConn, cp.c.PoolSize)
  120. cp.invalidUpstreams = make(map[string]interface{})
  121. cp.connCounter = make(map[string]int)
  122. cp.initPool()
  123. go cp.maintainUpstream()
  124. go cp.flushBufConn()
  125. go cp.maintainBufConnPool()
  126. expvar.Publish("conn_pool"+cp.c.Name, expvar.Func(cp.connPoolStatus))
  127. return cp, nil
  128. }
  129. // connableUpstreams 返回可以建立连接的upstream列表
  130. func (cp *connPool) connableUpstreams() ([]string) {
  131. list := make([]string, 0)
  132. cp.invalidUpstreamsLock.RLock()
  133. defer cp.invalidUpstreamsLock.RUnlock()
  134. for _, addr := range cp.c.AddrList {
  135. if _, ok := cp.invalidUpstreams[addr]; !ok {
  136. if count, ok := cp.connCounter[addr]; ok && count == 0 {
  137. list = append(list, addr)
  138. }
  139. }
  140. }
  141. return list
  142. }
  143. // write write []byte to BufConn
  144. func (bc *LancerBufConn) write(p []byte) (int, error) {
  145. bc.conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
  146. return bc.wr.Write(p)
  147. }
  148. // randomOneUpstream 随机返回一个可以建立连接的upstream
  149. func (cp *connPool) randomOneUpstream() (s string, err error) {
  150. list := cp.connableUpstreams()
  151. if len(list) == 0 {
  152. err = errors.New("No valid upstreams")
  153. return
  154. }
  155. return list[rand.Intn(len(list))], nil
  156. }
  157. // initPool 初始化poolSize个数的bufConn
  158. func (cp *connPool) initPool() {
  159. for _, addr := range cp.c.AddrList {
  160. cp.connCounter[addr] = 0
  161. }
  162. for i := 0; i < cp.c.PoolSize; i++ {
  163. if bufConn, err := cp.newBufConn(); err == nil {
  164. cp.putBufConn(bufConn)
  165. }
  166. }
  167. }
  168. // novalidUpstream check if there is no validUpstream
  169. func (cp *connPool) novalidUpstream() bool {
  170. return len(cp.invalidUpstreams) == len(cp.c.AddrList)
  171. }
  172. //GetConn 从pool中取一个BufConn
  173. func (cp *connPool) getBufConn() (*LancerBufConn, error) {
  174. for {
  175. select {
  176. case bufConn := <-cp.validBufConnChan:
  177. if !bufConn.enabled {
  178. cp.putInvalidBufConn(bufConn)
  179. continue
  180. }
  181. return bufConn, nil
  182. case <-time.After(10 * time.Second):
  183. log.Warn("timeout when get conn from conn pool")
  184. continue
  185. }
  186. }
  187. }
  188. // setConn 为bufConn绑定一个新的Conn
  189. func (cp *connPool) setConn(bufConn *LancerBufConn) (error) {
  190. if bufConn.conn != nil {
  191. if bufConn.enabled == false {
  192. cp.markUpstreamInvalid(bufConn.conn.RemoteAddr().String())
  193. }
  194. cp.connCounterDel(bufConn.conn.RemoteAddr().String())
  195. bufConn.conn.Close()
  196. bufConn.conn = nil
  197. bufConn.enabled = false
  198. }
  199. if conn, err := cp.newConn(); err == nil {
  200. bufConn.conn = conn
  201. bufConn.wr.Reset(conn)
  202. bufConn.ctime = time.Now()
  203. bufConn.enabled = true
  204. return nil
  205. } else {
  206. bufConn.enabled = false
  207. return err
  208. }
  209. }
  210. //putBufConn 把BufConn放回到pool中
  211. func (cp *connPool) putBufConn(bufConn *LancerBufConn) {
  212. if bufConn.enabled == false {
  213. cp.putInvalidBufConn(bufConn)
  214. return
  215. }
  216. if bufConn.ctime.Add(cp.c.IdleTimeout).Before(time.Now()) {
  217. bufConn.wr.Flush()
  218. cp.putInvalidBufConn(bufConn)
  219. return
  220. }
  221. cp.putValidBufConn(bufConn)
  222. }
  223. // putValidBufConn 把 bufConn放到可用的pool中
  224. func (cp *connPool) putValidBufConn(bufConn *LancerBufConn) {
  225. select {
  226. case cp.validBufConnChan <- bufConn:
  227. return
  228. default:
  229. log.Warn("BufConnChan full, discard,this shouldn't happen")
  230. return
  231. }
  232. }
  233. // putInvalidBufConn 把bufConn放到不可用的pool中
  234. func (cp *connPool) putInvalidBufConn(bufConn *LancerBufConn) {
  235. select {
  236. case cp.invalidBufConnChan <- bufConn:
  237. return
  238. default:
  239. log.Warn("invalidBufConnChan full, discard,this shouldn't happen")
  240. return
  241. }
  242. }
  243. // maintainBufConnPool 维护BufConnPool状态
  244. func (cp *connPool) maintainBufConnPool() {
  245. for {
  246. select {
  247. case bufConn := <-cp.invalidBufConnChan:
  248. cp.setConn(bufConn)
  249. cp.putBufConn(bufConn)
  250. }
  251. time.Sleep(time.Second * 1)
  252. }
  253. }
  254. //markConnInvalid会将链接关闭并且将相应upstreamserver设置为不可用
  255. func (cp *connPool) markUpstreamInvalid(addr string) (err error) {
  256. log.Error("mark upstream %s invalid", addr)
  257. cp.invalidUpstreamsLock.Lock()
  258. cp.invalidUpstreams[addr] = nil
  259. cp.invalidUpstreamsLock.Unlock()
  260. return
  261. }
  262. // markUpstreamValid 将某一addr设置为不可用
  263. func (cp *connPool) markUpstreamValid(addr string) (err error) {
  264. log.Info("%s is valid again", addr)
  265. cp.invalidUpstreamsLock.Lock()
  266. delete(cp.invalidUpstreams, addr)
  267. cp.invalidUpstreamsLock.Unlock()
  268. return
  269. }
  270. // connCounterAdd 连接数+1
  271. func (cp *connPool) connCounterAdd(addr string) {
  272. cp.connCounterLock.Lock()
  273. defer cp.connCounterLock.Unlock()
  274. if _, ok := cp.connCounter[addr]; ok {
  275. cp.connCounter[addr] += 1
  276. } else {
  277. cp.connCounter[addr] = 1
  278. }
  279. return
  280. }
  281. //connCounterDel 连接数-1
  282. func (cp *connPool) connCounterDel(addr string) {
  283. cp.connCounterLock.Lock()
  284. defer cp.connCounterLock.Unlock()
  285. if _, ok := cp.connCounter[addr]; ok {
  286. cp.connCounter[addr] -= 1
  287. }
  288. }
  289. // connPoolStatus 返回connPool状态
  290. func (cp *connPool) connPoolStatus() interface{} {
  291. status := make(map[string]interface{})
  292. status["conn_num"] = cp.connCounter
  293. status["invalidUpstreams"] = cp.invalidUpstreams
  294. return status
  295. }
  296. // maintainUpstream 维护upstream的健康状态
  297. func (cp *connPool) maintainUpstream() {
  298. for {
  299. cp.invalidUpstreamsLock.RLock()
  300. tryAddrs := make([]string, 0, len(cp.invalidUpstreams))
  301. for k := range cp.invalidUpstreams {
  302. tryAddrs = append(tryAddrs, k)
  303. }
  304. cp.invalidUpstreamsLock.RUnlock()
  305. for _, addr := range tryAddrs {
  306. if conn, err := net.DialTimeout("tcp", addr, cp.c.DialTimeout); err == nil && conn != nil {
  307. conn.Close()
  308. cp.markUpstreamValid(addr)
  309. }
  310. }
  311. time.Sleep(time.Second * 10)
  312. }
  313. }
  314. //ReleaseConnPool 释放连接池中所有链接
  315. func (cp *connPool) ReleaseConnPool() {
  316. log.Info("Release Conn Pool")
  317. close(cp.validBufConnChan)
  318. close(cp.invalidBufConnChan)
  319. for conn := range cp.validBufConnChan {
  320. conn.enabled = false
  321. conn.wr.Flush()
  322. conn.conn.Close()
  323. }
  324. }