registry.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "go-common/app/infra/discovery/model"
  10. "go-common/library/ecode"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _evictThreshold = int64(90 * time.Second)
  15. _evictCeiling = int64(3600 * time.Second)
  16. )
  17. // Registry handles replication of all operations to peer Discovery nodes to keep them all in sync.
  18. type Registry struct {
  19. appm map[string]*model.Apps // appid-env -> apps
  20. aLock sync.RWMutex
  21. conns map[string]map[string]*conn // zone.env.appid-> host
  22. cLock sync.RWMutex
  23. gd *Guard
  24. }
  25. // conn the poll chan contains consumer.
  26. type conn struct {
  27. ch chan map[string]*model.InstanceInfo // TODO(felix): increase
  28. arg *model.ArgPolls
  29. latestTime int64
  30. count int
  31. }
  32. // newConn new consumer chan.
  33. func newConn(ch chan map[string]*model.InstanceInfo, latestTime int64, arg *model.ArgPolls) *conn {
  34. return &conn{ch: ch, latestTime: latestTime, arg: arg, count: 1}
  35. }
  36. // NewRegistry new register.
  37. func NewRegistry() (r *Registry) {
  38. r = &Registry{
  39. appm: make(map[string]*model.Apps),
  40. conns: make(map[string]map[string]*conn),
  41. gd: new(Guard),
  42. }
  43. go r.proc()
  44. return
  45. }
  46. func (r *Registry) newapps(appid, env string) (a *model.Apps, ok bool) {
  47. key := appsKey(appid, env)
  48. r.aLock.Lock()
  49. if a, ok = r.appm[key]; !ok {
  50. a = model.NewApps()
  51. r.appm[key] = a
  52. }
  53. r.aLock.Unlock()
  54. return
  55. }
  56. func (r *Registry) apps(appid, env, zone string) (as []*model.App, a *model.Apps, ok bool) {
  57. key := appsKey(appid, env)
  58. r.aLock.RLock()
  59. a, ok = r.appm[key]
  60. r.aLock.RUnlock()
  61. if ok {
  62. as = a.App(zone)
  63. }
  64. return
  65. }
  66. func appsKey(appid, env string) string {
  67. // NOTE disocvery 不区分具体环境
  68. if appid == model.AppID {
  69. return appid
  70. }
  71. return fmt.Sprintf("%s-%s", appid, env)
  72. }
  73. func (r *Registry) newApp(ins *model.Instance) (a *model.App) {
  74. as, _ := r.newapps(ins.Appid, ins.Env)
  75. a, _ = as.NewApp(ins.Zone, ins.Appid, ins.Treeid, ins.LatestTimestamp)
  76. return
  77. }
  78. // Register a new instance.
  79. func (r *Registry) Register(ins *model.Instance, latestTime int64) (err error) {
  80. a := r.newApp(ins)
  81. i, ok := a.NewInstance(ins, latestTime)
  82. if ok {
  83. r.gd.incrExp()
  84. }
  85. // NOTE: make sure free poll before update appid latest timestamp.
  86. r.broadcast(i.Env, i.Appid, a)
  87. return
  88. }
  89. // Renew marks the given instance of the given app name as renewed, and also marks whether it originated from replication.
  90. func (r *Registry) Renew(arg *model.ArgRenew) (i *model.Instance, ok bool) {
  91. a, _, _ := r.apps(arg.Appid, arg.Env, arg.Zone)
  92. if len(a) == 0 {
  93. return
  94. }
  95. if i, ok = a[0].Renew(arg.Hostname); !ok {
  96. return
  97. }
  98. r.gd.incrFac()
  99. return
  100. }
  101. // Cancel cancels the registration of an instance.
  102. func (r *Registry) Cancel(arg *model.ArgCancel) (i *model.Instance, ok bool) {
  103. if i, ok = r.cancel(arg.Zone, arg.Env, arg.Appid, arg.Hostname, arg.LatestTimestamp); !ok {
  104. return
  105. }
  106. r.gd.decrExp()
  107. return
  108. }
  109. func (r *Registry) cancel(zone, env, appid, hostname string, latestTime int64) (i *model.Instance, ok bool) {
  110. var l int
  111. a, as, _ := r.apps(appid, env, zone)
  112. if len(a) == 0 {
  113. return
  114. }
  115. if i, l, ok = a[0].Cancel(hostname, latestTime); !ok {
  116. return
  117. }
  118. as.UpdateLatest(latestTime)
  119. if l == 0 {
  120. if a[0].Len() == 0 {
  121. as.Del(zone)
  122. }
  123. }
  124. if len(as.App("")) == 0 {
  125. r.aLock.Lock()
  126. delete(r.appm, appsKey(appid, env))
  127. r.aLock.Unlock()
  128. }
  129. r.broadcast(env, appid, a[0]) // NOTE: make sure free poll before update appid latest timestamp.
  130. return
  131. }
  132. // FetchAll fetch all instances of all the families.
  133. func (r *Registry) FetchAll() (im map[string][]*model.Instance) {
  134. ass := r.allapp()
  135. im = make(map[string][]*model.Instance)
  136. for _, as := range ass {
  137. for _, a := range as.App("") {
  138. im[a.AppID] = append(im[a.AppID], a.Instances()...)
  139. }
  140. }
  141. return
  142. }
  143. // Fetch fetch all instances by appid.
  144. func (r *Registry) Fetch(zone, env, appid string, latestTime int64, status uint32) (info *model.InstanceInfo, err error) {
  145. key := appsKey(appid, env)
  146. r.aLock.RLock()
  147. a, ok := r.appm[key]
  148. r.aLock.RUnlock()
  149. if !ok {
  150. err = ecode.NothingFound
  151. return
  152. }
  153. info, err = a.InstanceInfo(zone, latestTime, status)
  154. return
  155. }
  156. // Polls hangs request and then write instances when that has changes, or return NotModified.
  157. func (r *Registry) Polls(arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, err error) {
  158. var (
  159. ins = make(map[string]*model.InstanceInfo, len(arg.Treeid))
  160. in *model.InstanceInfo
  161. )
  162. if len(arg.Appid) != len(arg.LatestTimestamp) {
  163. arg.LatestTimestamp = make([]int64, len(arg.Appid))
  164. }
  165. for i := range arg.Appid {
  166. in, err = r.Fetch(arg.Zone, arg.Env, arg.Appid[i], arg.LatestTimestamp[i], model.InstanceStatusUP)
  167. if err == ecode.NothingFound {
  168. log.Error("Polls region(%s) zone(%s) env(%s) appid(%s) error(%v)", arg.Region, arg.Zone, arg.Env, arg.Appid[i], err)
  169. return
  170. }
  171. if err == nil {
  172. if len(arg.Treeid) != 0 {
  173. ins[strconv.FormatInt(arg.Treeid[i], 10)] = in
  174. } else {
  175. ins[arg.Appid[i]] = in
  176. }
  177. new = true
  178. }
  179. }
  180. if new {
  181. ch = make(chan map[string]*model.InstanceInfo, 1)
  182. ch <- ins
  183. return
  184. }
  185. r.cLock.Lock()
  186. for i := range arg.Appid {
  187. k := appsKey(arg.Appid[i], arg.Env)
  188. if _, ok := r.conns[k]; !ok {
  189. r.conns[k] = make(map[string]*conn, 1)
  190. }
  191. connection, ok := r.conns[k][arg.Hostname]
  192. if !ok {
  193. if ch == nil {
  194. ch = make(chan map[string]*model.InstanceInfo, 5) // NOTE: there maybe have more than one connection on the same hostname!!!
  195. }
  196. connection = newConn(ch, arg.LatestTimestamp[i], arg)
  197. log.Info("Polls from(%s) new connection(%d)", arg.Hostname, connection.count)
  198. } else {
  199. connection.count++ // NOTE: there maybe have more than one connection on the same hostname!!!
  200. if ch == nil {
  201. ch = connection.ch
  202. }
  203. log.Info("Polls from(%s) reuse connection(%d)", arg.Hostname, connection.count)
  204. }
  205. r.conns[k][arg.Hostname] = connection
  206. }
  207. r.cLock.Unlock()
  208. return
  209. }
  210. // Polling get polling clients.
  211. func (r *Registry) Polling(arg *model.ArgPolling) (resp []string, err error) {
  212. r.cLock.RLock()
  213. conns, ok := r.conns[appsKey(arg.Appid, arg.Env)]
  214. if !ok {
  215. r.cLock.RUnlock()
  216. return
  217. }
  218. resp = make([]string, 0, len(conns))
  219. for host := range conns {
  220. resp = append(resp, host)
  221. }
  222. r.cLock.RUnlock()
  223. return
  224. }
  225. // broadcast on poll by chan.
  226. // NOTE: make sure free poll before update appid latest timestamp.
  227. func (r *Registry) broadcast(env, appid string, a *model.App) {
  228. key := appsKey(appid, env)
  229. r.cLock.Lock()
  230. defer r.cLock.Unlock()
  231. conns, ok := r.conns[key]
  232. if !ok {
  233. return
  234. }
  235. delete(r.conns, key)
  236. for _, conn := range conns {
  237. ii, _ := r.Fetch(conn.arg.Zone, env, appid, 0, model.InstanceStatusUP) // TODO(felix): latesttime!=0 increase
  238. var key string
  239. if len(conn.arg.Treeid) != 0 {
  240. key = strconv.FormatInt(a.Treeid, 10)
  241. } else {
  242. key = a.AppID
  243. }
  244. for i := 0; i < conn.count; i++ { // NOTE: there maybe have more than one connection on the same hostname!!!
  245. select {
  246. case conn.ch <- map[string]*model.InstanceInfo{key: ii}: // NOTE: if chan is full, means no poller.
  247. log.Info("broadcast to(%s) success(%d)", conn.arg.Hostname, i+1)
  248. case <-time.After(time.Millisecond * 500):
  249. log.Info("broadcast to(%s) failed(%d) maybe chan full", conn.arg.Hostname, i+1)
  250. }
  251. }
  252. }
  253. }
  254. // Set Set the status of instance by hostnames.
  255. func (r *Registry) Set(c context.Context, arg *model.ArgSet) (ok bool) {
  256. a, _, _ := r.apps(arg.Appid, arg.Env, arg.Zone)
  257. if len(a) == 0 {
  258. return
  259. }
  260. if ok = a[0].Set(arg); !ok {
  261. return
  262. }
  263. r.broadcast(arg.Env, arg.Appid, a[0])
  264. return
  265. }
  266. func (r *Registry) allapp() (ass []*model.Apps) {
  267. r.aLock.RLock()
  268. ass = make([]*model.Apps, 0, len(r.appm))
  269. for _, as := range r.appm {
  270. ass = append(ass, as)
  271. }
  272. r.aLock.RUnlock()
  273. return
  274. }
  275. // reset expect renews, count the renew of all app, one app has two expect remews in minute.
  276. func (r *Registry) resetExp() {
  277. cnt := int64(0)
  278. for _, p := range r.allapp() {
  279. for _, a := range p.App("") {
  280. cnt += int64(a.Len())
  281. }
  282. }
  283. r.gd.setExp(cnt)
  284. }
  285. func (r *Registry) proc() {
  286. tk := time.Tick(1 * time.Minute)
  287. tk2 := time.Tick(15 * time.Minute)
  288. for {
  289. select {
  290. case <-tk:
  291. r.gd.updateFac()
  292. r.evict()
  293. case <-tk2:
  294. r.resetExp()
  295. }
  296. }
  297. }
  298. func (r *Registry) evict() {
  299. protect := r.gd.ok()
  300. // We collect first all expired items, to evict them in random order. For large eviction sets,
  301. // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
  302. // the impact should be evenly distributed across all applications.
  303. var eis []*model.Instance
  304. var registrySize int
  305. // all projects
  306. ass := r.allapp()
  307. for _, as := range ass {
  308. for _, a := range as.App("") {
  309. registrySize += a.Len()
  310. is := a.Instances()
  311. for _, i := range is {
  312. delta := time.Now().UnixNano() - i.RenewTimestamp
  313. if (!protect && delta > _evictThreshold) || delta > _evictCeiling {
  314. eis = append(eis, i)
  315. }
  316. }
  317. }
  318. }
  319. // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
  320. // triggering self-preservation. Without that we would wipe out full registry.
  321. eCnt := len(eis)
  322. registrySizeThreshold := int(float64(registrySize) * _percentThreshold)
  323. evictionLimit := registrySize - registrySizeThreshold
  324. if eCnt > evictionLimit {
  325. eCnt = evictionLimit
  326. }
  327. if eCnt == 0 {
  328. return
  329. }
  330. for i := 0; i < eCnt; i++ {
  331. // Pick a random item (Knuth shuffle algorithm)
  332. next := i + rand.Intn(len(eis)-i)
  333. eis[i], eis[next] = eis[next], eis[i]
  334. ei := eis[i]
  335. r.cancel(ei.Zone, ei.Env, ei.Appid, ei.Hostname, time.Now().UnixNano())
  336. }
  337. }
  338. // DelConns delete conn of host in appid
  339. func (r *Registry) DelConns(arg *model.ArgPolls) {
  340. r.cLock.Lock()
  341. for i := range arg.Appid {
  342. k := appsKey(arg.Appid[i], arg.Env)
  343. conns, ok := r.conns[k]
  344. if !ok {
  345. log.Warn("DelConn key(%s) not found", k)
  346. continue
  347. }
  348. if connection, ok := conns[arg.Hostname]; ok {
  349. if connection.count > 1 {
  350. log.Info("DelConns from(%s) count decr(%d)", arg.Hostname, connection.count)
  351. connection.count--
  352. } else {
  353. log.Info("DelConns from(%s) delete(%d)", arg.Hostname, connection.count)
  354. delete(conns, arg.Hostname)
  355. }
  356. }
  357. }
  358. r.cLock.Unlock()
  359. }