tcp.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package tcp
  2. import (
  3. "errors"
  4. lg "log"
  5. "net"
  6. "os"
  7. "sync"
  8. "time"
  9. "go-common/app/infra/databus/conf"
  10. "go-common/app/infra/databus/dsn"
  11. "go-common/app/infra/databus/model"
  12. "go-common/app/infra/databus/service"
  13. "go-common/library/log"
  14. "github.com/Shopify/sarama"
  15. metrics "github.com/rcrowley/go-metrics"
  16. )
  17. const (
  18. // redis proto
  19. _protoStr = '+'
  20. _protoErr = '-'
  21. _protoInt = ':'
  22. _protoBulk = '$'
  23. _protoArray = '*'
  24. // redis cmd
  25. _ping = "ping"
  26. _auth = "auth"
  27. _quit = "quit"
  28. _set = "set"
  29. _hset = "hset"
  30. _mget = "mget"
  31. _ok = "OK"
  32. _pong = "PONG"
  33. // client role
  34. _rolePub = "pub"
  35. _roleSub = "sub"
  36. _listenDelay = 5 * time.Millisecond // how long to sleep on accept failure
  37. _clearDelay = 30 * time.Second
  38. _batchNum = 100 // batch write message length
  39. _batchInterval = 100 * time.Millisecond // batch write interval
  40. _batchTimeout = 30 * time.Second // return empty if timeout
  41. // connection timeout
  42. _readTimeout = 5 * time.Second
  43. _writeTimeout = 5 * time.Second
  44. _pubReadTimeout = 20 * time.Minute
  45. _subReadTimeout = _batchTimeout + 10*time.Second
  46. // conn read buffer size 64K
  47. _readBufSize = 1024 * 64
  48. // conn write buffer size 8K
  49. _writeBufSize = 1024 * 8
  50. // conn max value size(kafka 1M)
  51. _maxValueSize = 1000000
  52. // prom operation
  53. _opAddConsumerRequest = "request_add_comsuner"
  54. _opCurrentConsumer = "current_consumer"
  55. _opAddProducerRequest = "request_add_producer"
  56. _opAuthError = "auth_error"
  57. _opProducerMsgSpeed = "producer_msg_speed"
  58. _opConsumerMsgSpeed = "consumer_msg_speed"
  59. _opConsumerPartition = "consumer_partition_speed"
  60. _opPartitionOffset = "consumer_partition_offset"
  61. )
  62. var (
  63. _nullBulk = []byte("-1")
  64. // kafka header
  65. _headerColor = []byte("color")
  66. _headerMetadata = []byte("metadata")
  67. // encode type pb/json
  68. _encodePB = []byte("pb")
  69. )
  70. var (
  71. errCmdAuthFailed = errors.New("auth failed")
  72. errAuthInfo = errors.New("auth info error")
  73. errPubParams = errors.New("pub params error")
  74. errCmdNotSupport = errors.New("command not support")
  75. errClusterNotExist = errors.New("cluster not exist")
  76. errClusterNotSupport = errors.New("cluster not support")
  77. errConnClosedByServer = errors.New("connection closed by databus")
  78. errConnClosedByClient = errors.New("connection closed by client")
  79. errClosedMsgChannel = errors.New("message channel is closed")
  80. errClosedNotifyChannel = errors.New("notification channel is closed")
  81. errNoPubPermission = errors.New("no publish permission")
  82. errNoSubPermission = errors.New("no subscribe permission")
  83. errConsumerClosed = errors.New("kafka consumer closed")
  84. errCommitParams = errors.New("commit offset params error")
  85. errMsgFormat = errors.New("message format must be json")
  86. errConsumerOver = errors.New("too many consumers")
  87. errConsumerTimeout = errors.New("consumer initial timeout")
  88. errConnRead = errors.New("connection read error")
  89. errUseLessConsumer = errors.New("useless consumer")
  90. errKafKaData = errors.New("err kafka data maybe rebalancing")
  91. errCousmerCreateLimiter = errors.New("err consumer create limiter")
  92. )
  93. var (
  94. // tcp listener
  95. listener net.Listener
  96. quit = make(chan struct{})
  97. // producer snapshot, key:group+topic
  98. producers = make(map[string]sarama.SyncProducer)
  99. pLock sync.RWMutex
  100. // Pubs
  101. pubs = make(map[*Pub]struct{})
  102. pubLock sync.RWMutex
  103. // Subs
  104. subs = make(map[*Sub]struct{})
  105. subLock sync.RWMutex
  106. // service for auth
  107. svc *service.Service
  108. // limiter
  109. consumerLimter = make(chan struct{}, 100)
  110. )
  111. // Init init service
  112. func Init(c *conf.Config, s *service.Service) {
  113. var err error
  114. if listener, err = net.Listen("tcp", c.Addr); err != nil {
  115. panic(err)
  116. }
  117. // sarama should be initialized otherwise errors will be ignored when sarama catch error
  118. sarama.Logger = lg.New(os.Stdout, "[Sarama] ", lg.LstdFlags)
  119. // sarama metrics disable
  120. metrics.UseNilMetrics = true
  121. svc = s
  122. log.Info("start tcp listen addr: %s", c.Addr)
  123. go accept()
  124. go clear()
  125. go clusterproc()
  126. }
  127. func newProducer(group, topic string, pCfg *conf.Kafka) (p sarama.SyncProducer, err error) {
  128. var (
  129. ok bool
  130. key = key(pCfg.Cluster, group, topic)
  131. )
  132. pLock.RLock()
  133. if p, ok = producers[key]; ok {
  134. pLock.RUnlock()
  135. return
  136. }
  137. pLock.RUnlock()
  138. // new
  139. conf := sarama.NewConfig()
  140. conf.Producer.Return.Successes = true
  141. conf.Version = sarama.V1_0_0_0
  142. if p, err = sarama.NewSyncProducer(pCfg.Brokers, conf); err != nil {
  143. log.Error("group(%s) topic(%s) cluster(%s) NewSyncProducer error(%v)", group, topic, pCfg.Cluster, err)
  144. return
  145. }
  146. pLock.Lock()
  147. producers[key] = p
  148. pLock.Unlock()
  149. return
  150. }
  151. // Close close all producers and consumers
  152. func Close() {
  153. close(quit)
  154. if listener != nil {
  155. listener.Close()
  156. }
  157. // close all consumers
  158. subLock.RLock()
  159. for sub := range subs {
  160. sub.Close()
  161. }
  162. subLock.RUnlock()
  163. pubLock.RLock()
  164. for pub := range pubs {
  165. pub.Close(true)
  166. }
  167. pubLock.RUnlock()
  168. pLock.RLock()
  169. for _, p := range producers {
  170. p.Close()
  171. }
  172. pLock.RUnlock()
  173. }
  174. func accept() {
  175. var (
  176. err error
  177. ok bool
  178. netC net.Conn
  179. netE net.Error
  180. )
  181. for {
  182. if netC, err = listener.Accept(); err != nil {
  183. if netE, ok = err.(net.Error); ok && netE.Temporary() {
  184. log.Error("tcp: Accept error: %v; retrying in %v", err, _listenDelay)
  185. time.Sleep(_listenDelay)
  186. continue
  187. }
  188. return
  189. }
  190. select {
  191. case <-quit:
  192. netC.Close()
  193. return
  194. default:
  195. }
  196. go serveConn(netC)
  197. }
  198. }
  199. // serveConn serve tcp connect.
  200. func serveConn(nc net.Conn) {
  201. var (
  202. err error
  203. p *Pub
  204. s *Sub
  205. d *dsn.DSN
  206. cfg *conf.Kafka
  207. batch int64
  208. addr = nc.RemoteAddr().String()
  209. )
  210. c := newConn(nc, _readTimeout, _writeTimeout)
  211. if d, cfg, batch, err = auth(c); err != nil {
  212. log.Error("auth failed addr(%s) error(%v)", addr, err)
  213. c.WriteError(err)
  214. return
  215. }
  216. // auth succeed
  217. if err = c.Write(proto{prefix: _protoStr, message: _ok}); err != nil {
  218. log.Error("c.Write() error(%v)", err)
  219. c.Close()
  220. return
  221. }
  222. if err = c.Flush(); err != nil {
  223. c.Close()
  224. return
  225. }
  226. log.Info("auth succeed group(%s) topic(%s) color(%s) cluster(%s) addr(%s) role(%s)", d.Group, d.Topic, d.Color, cfg.Cluster, addr, d.Role)
  227. // command
  228. switch d.Role {
  229. case _rolePub: // producer
  230. svc.CountProm.Incr(_opAddProducerRequest, d.Group, d.Topic)
  231. if p, err = NewPub(c, d.Group, d.Topic, d.Color, cfg); err != nil {
  232. c.WriteError(err)
  233. log.Error("group(%s) topic(%s) color(%s) cluster(%s) addr(%s) NewPub error(%v)", d.Group, d.Topic, d.Color, cfg.Cluster, addr, err)
  234. return
  235. }
  236. pubLock.Lock()
  237. pubs[p] = struct{}{}
  238. pubLock.Unlock()
  239. p.Serve()
  240. case _roleSub: // consumer
  241. svc.CountProm.Incr(_opAddConsumerRequest, d.Group, d.Topic)
  242. select {
  243. case consumerLimter <- struct{}{}:
  244. default:
  245. err = errCousmerCreateLimiter
  246. c.WriteError(err)
  247. log.Error("group(%s) topic(%s) color(%s) cluster(%s) addr(%s) error(%v)", d.Group, d.Topic, d.Color, cfg.Cluster, addr, err)
  248. return
  249. }
  250. if s, err = NewSub(c, d.Group, d.Topic, d.Color, cfg, batch); err != nil {
  251. c.WriteError(err)
  252. log.Error("group(%s) topic(%s) color(%s) cluster(%s) addr(%s) NewSub error(%v)", d.Group, d.Topic, d.Color, cfg.Cluster, addr, err)
  253. return
  254. }
  255. subLock.Lock()
  256. subs[s] = struct{}{}
  257. subLock.Unlock()
  258. s.Serve()
  259. svc.CountProm.Incr(_opCurrentConsumer, d.Group, d.Topic)
  260. default:
  261. // other command will not be, auth check that.
  262. }
  263. }
  264. func auth(c *conn) (d *dsn.DSN, cfg *conf.Kafka, batch int64, err error) {
  265. var (
  266. args [][]byte
  267. cmd string
  268. addr = c.conn.RemoteAddr().String()
  269. )
  270. if cmd, args, err = c.Read(); err != nil {
  271. log.Error("c.Read addr(%s) error(%v)", addr, err)
  272. return
  273. }
  274. if cmd != _auth || len(args) != 1 {
  275. log.Error("c.Read addr(%s) first cmd(%s) not auth or have not enough args(%v)", addr, cmd, args)
  276. err = errCmdAuthFailed
  277. return
  278. }
  279. // key:secret@group/topic=?&role=?&offset=?
  280. if d, err = dsn.ParseDSN(string(args[0])); err != nil {
  281. log.Error("auth failed arg(%s) is illegal,addr(%s) error(%v)", args[0], addr, err)
  282. return
  283. }
  284. cfg, batch, err = Auth(d, addr)
  285. return
  286. }
  287. // Auth 校验认证信息并反回相应配置
  288. // 与 http 接口共用,不要在此方法执行 io 操作
  289. func Auth(d *dsn.DSN, addr string) (cfg *conf.Kafka, batch int64, err error) {
  290. var (
  291. a *model.Auth
  292. ok bool
  293. )
  294. if a, ok = svc.AuthApp(d.Group); !ok {
  295. log.Error("addr(%s) group(%s) cant not be found", addr, d.Group)
  296. svc.CountProm.Incr(_opAuthError, d.Group, d.Topic)
  297. err = errAuthInfo
  298. return
  299. }
  300. batch = a.Batch
  301. if err = a.Auth(d.Group, d.Topic, d.Key, d.Secret); err != nil {
  302. log.Error("a.Auth addr(%s) group(%s) topic(%s) color(%s) key(%s) secret(%s) error(%v)", addr, d.Group, d.Topic, d.Color, d.Key, d.Secret, err)
  303. svc.CountProm.Incr(_opAuthError, d.Group, d.Topic)
  304. return
  305. }
  306. switch d.Role {
  307. case _rolePub:
  308. if !a.CanPub() {
  309. err = errNoPubPermission
  310. return
  311. }
  312. case _roleSub:
  313. if !a.CanSub() {
  314. err = errNoSubPermission
  315. return
  316. }
  317. default:
  318. err = errCmdNotSupport
  319. return
  320. }
  321. if len(conf.Conf.Clusters) == 0 {
  322. err = errClusterNotExist
  323. return
  324. }
  325. if cfg, ok = conf.Conf.Clusters[a.Cluster]; !ok || cfg == nil {
  326. log.Error("a.Auth addr(%s) group(%s) topic(%s) color(%s) key(%s) secret(%s) cluster(%s) not support", addr, d.Group, d.Topic, d.Color, d.Key, d.Secret, a.Cluster)
  327. err = errClusterNotSupport
  328. }
  329. // TODO check ip addr
  330. // rAddr = conn.RemoteAddr().String()
  331. return
  332. }
  333. // ConsumerAddrs returns consumer addrs.
  334. func ConsumerAddrs(group string) (addrs []string, err error) {
  335. subLock.RLock()
  336. for sub := range subs {
  337. if sub.group == group {
  338. addrs = append(addrs, sub.addr)
  339. }
  340. }
  341. subLock.RUnlock()
  342. return
  343. }
  344. func key(cluster, group, topic string) string {
  345. return cluster + ":" + group + ":" + topic
  346. }
  347. func clear() {
  348. for {
  349. time.Sleep(_clearDelay)
  350. t := time.Now()
  351. log.Info("clear proc start,id(%d)", t.Nanosecond())
  352. subLock.Lock()
  353. for sub := range subs {
  354. if sub.Closed() {
  355. delete(subs, sub)
  356. }
  357. }
  358. subLock.Unlock()
  359. pubLock.Lock()
  360. for pub := range pubs {
  361. if pub.Closed() {
  362. delete(pubs, pub)
  363. }
  364. }
  365. pubLock.Unlock()
  366. log.Info("clear proc end,id(%d) used(%d)", t.Nanosecond(), time.Since(t))
  367. }
  368. }
  369. func clusterproc() {
  370. for {
  371. oldAuth, ok := <-svc.ClusterEvent()
  372. if !ok {
  373. return
  374. }
  375. log.Info("cluster changed event group(%s) topic(%s) cluster(%s)", oldAuth.Group, oldAuth.Topic, oldAuth.Cluster)
  376. k := key(oldAuth.Cluster, oldAuth.Group, oldAuth.Topic)
  377. pLock.Lock()
  378. if p, ok := producers[k]; ok {
  379. // renew producer
  380. if newAuth, ok := svc.AuthApp(oldAuth.Group); ok {
  381. pLock.Unlock()
  382. np, err := newProducer(newAuth.Group, newAuth.Topic, conf.Conf.Clusters[newAuth.Cluster])
  383. pLock.Lock()
  384. // check pubs
  385. pubLock.Lock()
  386. for pub := range pubs {
  387. if pub.group == oldAuth.Group && pub.topic == oldAuth.Topic {
  388. if err != nil {
  389. pub.Close(true)
  390. } else {
  391. pub.producer = np
  392. }
  393. }
  394. }
  395. pubLock.Unlock()
  396. }
  397. // close unused producer
  398. p.Close()
  399. delete(producers, k)
  400. }
  401. pLock.Unlock()
  402. // wait closing subs
  403. subLock.Lock()
  404. for sub := range subs {
  405. if sub.group == oldAuth.Group && sub.topic == oldAuth.Topic {
  406. sub.WaitClosing()
  407. }
  408. }
  409. subLock.Unlock()
  410. }
  411. }