shared_informer.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. "k8s.io/apimachinery/pkg/util/clock"
  20. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/client-go/util/buffer"
  23. "k8s.io/client-go/util/retry"
  24. "github.com/golang/glog"
  25. )
  26. // SharedInformer has a shared data cache and is capable of distributing notifications for changes
  27. // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
  28. // one behavior change compared to a standard Informer. When you receive a notification, the cache
  29. // will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
  30. // on the contents of the cache exactly matching the notification you've received in handler
  31. // functions. If there was a create, followed by a delete, the cache may NOT have your item. This
  32. // has advantages over the broadcaster since it allows us to share a common cache across many
  33. // controllers. Extending the broadcaster would have required us keep duplicate caches for each
  34. // watch.
  35. type SharedInformer interface {
  36. // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
  37. // period. Events to a single handler are delivered sequentially, but there is no coordination
  38. // between different handlers.
  39. AddEventHandler(handler ResourceEventHandler)
  40. // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
  41. // specified resync period. Events to a single handler are delivered sequentially, but there is
  42. // no coordination between different handlers.
  43. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
  44. // GetStore returns the Store.
  45. GetStore() Store
  46. // GetController gives back a synthetic interface that "votes" to start the informer
  47. GetController() Controller
  48. // Run starts the shared informer, which will be stopped when stopCh is closed.
  49. Run(stopCh <-chan struct{})
  50. // HasSynced returns true if the shared informer's store has synced.
  51. HasSynced() bool
  52. // LastSyncResourceVersion is the resource version observed when last synced with the underlying
  53. // store. The value returned is not synchronized with access to the underlying store and is not
  54. // thread-safe.
  55. LastSyncResourceVersion() string
  56. }
  57. type SharedIndexInformer interface {
  58. SharedInformer
  59. // AddIndexers add indexers to the informer before it starts.
  60. AddIndexers(indexers Indexers) error
  61. GetIndexer() Indexer
  62. }
  63. // NewSharedInformer creates a new instance for the listwatcher.
  64. func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
  65. return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
  66. }
  67. // NewSharedIndexInformer creates a new instance for the listwatcher.
  68. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  69. realClock := &clock.RealClock{}
  70. sharedIndexInformer := &sharedIndexInformer{
  71. processor: &sharedProcessor{clock: realClock},
  72. indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
  73. listerWatcher: lw,
  74. objectType: objType,
  75. resyncCheckPeriod: defaultEventHandlerResyncPeriod,
  76. defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
  77. cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
  78. clock: realClock,
  79. }
  80. return sharedIndexInformer
  81. }
  82. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
  83. type InformerSynced func() bool
  84. const (
  85. // syncedPollPeriod controls how often you look at the status of your sync funcs
  86. syncedPollPeriod = 100 * time.Millisecond
  87. // initialBufferSize is the initial number of event notifications that can be buffered.
  88. initialBufferSize = 1024
  89. )
  90. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  91. // if the controller should shutdown
  92. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  93. err := wait.PollUntil(syncedPollPeriod,
  94. func() (bool, error) {
  95. for _, syncFunc := range cacheSyncs {
  96. if !syncFunc() {
  97. return false, nil
  98. }
  99. }
  100. return true, nil
  101. },
  102. stopCh)
  103. if err != nil {
  104. glog.V(2).Infof("stop requested")
  105. return false
  106. }
  107. glog.V(4).Infof("caches populated")
  108. return true
  109. }
  110. type sharedIndexInformer struct {
  111. indexer Indexer
  112. controller Controller
  113. processor *sharedProcessor
  114. cacheMutationDetector CacheMutationDetector
  115. // This block is tracked to handle late initialization of the controller
  116. listerWatcher ListerWatcher
  117. objectType runtime.Object
  118. // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
  119. // shouldResync to check if any of our listeners need a resync.
  120. resyncCheckPeriod time.Duration
  121. // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
  122. // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
  123. // value).
  124. defaultEventHandlerResyncPeriod time.Duration
  125. // clock allows for testability
  126. clock clock.Clock
  127. started, stopped bool
  128. startedLock sync.Mutex
  129. // blockDeltas gives a way to stop all event distribution so that a late event handler
  130. // can safely join the shared informer.
  131. blockDeltas sync.Mutex
  132. }
  133. // dummyController hides the fact that a SharedInformer is different from a dedicated one
  134. // where a caller can `Run`. The run method is disconnected in this case, because higher
  135. // level logic will decide when to start the SharedInformer and related controller.
  136. // Because returning information back is always asynchronous, the legacy callers shouldn't
  137. // notice any change in behavior.
  138. type dummyController struct {
  139. informer *sharedIndexInformer
  140. }
  141. func (v *dummyController) Run(stopCh <-chan struct{}) {
  142. }
  143. func (v *dummyController) HasSynced() bool {
  144. return v.informer.HasSynced()
  145. }
  146. func (c *dummyController) LastSyncResourceVersion() string {
  147. return ""
  148. }
  149. type updateNotification struct {
  150. oldObj interface{}
  151. newObj interface{}
  152. }
  153. type addNotification struct {
  154. newObj interface{}
  155. }
  156. type deleteNotification struct {
  157. oldObj interface{}
  158. }
  159. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  160. defer utilruntime.HandleCrash()
  161. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
  162. cfg := &Config{
  163. Queue: fifo,
  164. ListerWatcher: s.listerWatcher,
  165. ObjectType: s.objectType,
  166. FullResyncPeriod: s.resyncCheckPeriod,
  167. RetryOnError: false,
  168. ShouldResync: s.processor.shouldResync,
  169. Process: s.HandleDeltas,
  170. }
  171. func() {
  172. s.startedLock.Lock()
  173. defer s.startedLock.Unlock()
  174. s.controller = New(cfg)
  175. s.controller.(*controller).clock = s.clock
  176. s.started = true
  177. }()
  178. // Separate stop channel because Processor should be stopped strictly after controller
  179. processorStopCh := make(chan struct{})
  180. var wg wait.Group
  181. defer wg.Wait() // Wait for Processor to stop
  182. defer close(processorStopCh) // Tell Processor to stop
  183. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  184. wg.StartWithChannel(processorStopCh, s.processor.run)
  185. defer func() {
  186. s.startedLock.Lock()
  187. defer s.startedLock.Unlock()
  188. s.stopped = true // Don't want any new listeners
  189. }()
  190. s.controller.Run(stopCh)
  191. }
  192. func (s *sharedIndexInformer) HasSynced() bool {
  193. s.startedLock.Lock()
  194. defer s.startedLock.Unlock()
  195. if s.controller == nil {
  196. return false
  197. }
  198. return s.controller.HasSynced()
  199. }
  200. func (s *sharedIndexInformer) LastSyncResourceVersion() string {
  201. s.startedLock.Lock()
  202. defer s.startedLock.Unlock()
  203. if s.controller == nil {
  204. return ""
  205. }
  206. return s.controller.LastSyncResourceVersion()
  207. }
  208. func (s *sharedIndexInformer) GetStore() Store {
  209. return s.indexer
  210. }
  211. func (s *sharedIndexInformer) GetIndexer() Indexer {
  212. return s.indexer
  213. }
  214. func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
  215. s.startedLock.Lock()
  216. defer s.startedLock.Unlock()
  217. if s.started {
  218. return fmt.Errorf("informer has already started")
  219. }
  220. return s.indexer.AddIndexers(indexers)
  221. }
  222. func (s *sharedIndexInformer) GetController() Controller {
  223. return &dummyController{informer: s}
  224. }
  225. func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  226. s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
  227. }
  228. func determineResyncPeriod(desired, check time.Duration) time.Duration {
  229. if desired == 0 {
  230. return desired
  231. }
  232. if check == 0 {
  233. glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
  234. return 0
  235. }
  236. if desired < check {
  237. glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
  238. return check
  239. }
  240. return desired
  241. }
  242. const minimumResyncPeriod = 1 * time.Second
  243. func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
  244. s.startedLock.Lock()
  245. defer s.startedLock.Unlock()
  246. if s.stopped {
  247. glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
  248. return
  249. }
  250. if resyncPeriod > 0 {
  251. if resyncPeriod < minimumResyncPeriod {
  252. glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
  253. resyncPeriod = minimumResyncPeriod
  254. }
  255. if resyncPeriod < s.resyncCheckPeriod {
  256. if s.started {
  257. glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
  258. resyncPeriod = s.resyncCheckPeriod
  259. } else {
  260. // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
  261. // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
  262. // accordingly
  263. s.resyncCheckPeriod = resyncPeriod
  264. s.processor.resyncCheckPeriodChanged(resyncPeriod)
  265. }
  266. }
  267. }
  268. listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
  269. if !s.started {
  270. s.processor.addListener(listener)
  271. return
  272. }
  273. // in order to safely join, we have to
  274. // 1. stop sending add/update/delete notifications
  275. // 2. do a list against the store
  276. // 3. send synthetic "Add" events to the new handler
  277. // 4. unblock
  278. s.blockDeltas.Lock()
  279. defer s.blockDeltas.Unlock()
  280. s.processor.addListener(listener)
  281. for _, item := range s.indexer.List() {
  282. listener.add(addNotification{newObj: item})
  283. }
  284. }
  285. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  286. s.blockDeltas.Lock()
  287. defer s.blockDeltas.Unlock()
  288. // from oldest to newest
  289. for _, d := range obj.(Deltas) {
  290. switch d.Type {
  291. case Sync, Added, Updated:
  292. isSync := d.Type == Sync
  293. s.cacheMutationDetector.AddObject(d.Object)
  294. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  295. if err := s.indexer.Update(d.Object); err != nil {
  296. return err
  297. }
  298. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  299. } else {
  300. if err := s.indexer.Add(d.Object); err != nil {
  301. return err
  302. }
  303. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
  304. }
  305. case Deleted:
  306. if err := s.indexer.Delete(d.Object); err != nil {
  307. return err
  308. }
  309. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  310. }
  311. }
  312. return nil
  313. }
  314. type sharedProcessor struct {
  315. listenersStarted bool
  316. listenersLock sync.RWMutex
  317. listeners []*processorListener
  318. syncingListeners []*processorListener
  319. clock clock.Clock
  320. wg wait.Group
  321. }
  322. func (p *sharedProcessor) addListener(listener *processorListener) {
  323. p.listenersLock.Lock()
  324. defer p.listenersLock.Unlock()
  325. p.addListenerLocked(listener)
  326. if p.listenersStarted {
  327. p.wg.Start(listener.run)
  328. p.wg.Start(listener.pop)
  329. }
  330. }
  331. func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
  332. p.listeners = append(p.listeners, listener)
  333. p.syncingListeners = append(p.syncingListeners, listener)
  334. }
  335. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  336. p.listenersLock.RLock()
  337. defer p.listenersLock.RUnlock()
  338. if sync {
  339. for _, listener := range p.syncingListeners {
  340. listener.add(obj)
  341. }
  342. } else {
  343. for _, listener := range p.listeners {
  344. listener.add(obj)
  345. }
  346. }
  347. }
  348. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  349. func() {
  350. p.listenersLock.RLock()
  351. defer p.listenersLock.RUnlock()
  352. for _, listener := range p.listeners {
  353. p.wg.Start(listener.run)
  354. p.wg.Start(listener.pop)
  355. }
  356. p.listenersStarted = true
  357. }()
  358. <-stopCh
  359. p.listenersLock.RLock()
  360. defer p.listenersLock.RUnlock()
  361. for _, listener := range p.listeners {
  362. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  363. }
  364. p.wg.Wait() // Wait for all .pop() and .run() to stop
  365. }
  366. // shouldResync queries every listener to determine if any of them need a resync, based on each
  367. // listener's resyncPeriod.
  368. func (p *sharedProcessor) shouldResync() bool {
  369. p.listenersLock.Lock()
  370. defer p.listenersLock.Unlock()
  371. p.syncingListeners = []*processorListener{}
  372. resyncNeeded := false
  373. now := p.clock.Now()
  374. for _, listener := range p.listeners {
  375. // need to loop through all the listeners to see if they need to resync so we can prepare any
  376. // listeners that are going to be resyncing.
  377. if listener.shouldResync(now) {
  378. resyncNeeded = true
  379. p.syncingListeners = append(p.syncingListeners, listener)
  380. listener.determineNextResync(now)
  381. }
  382. }
  383. return resyncNeeded
  384. }
  385. func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
  386. p.listenersLock.RLock()
  387. defer p.listenersLock.RUnlock()
  388. for _, listener := range p.listeners {
  389. resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
  390. listener.setResyncPeriod(resyncPeriod)
  391. }
  392. }
  393. type processorListener struct {
  394. nextCh chan interface{}
  395. addCh chan interface{}
  396. handler ResourceEventHandler
  397. // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
  398. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
  399. // added until we OOM.
  400. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
  401. // we should try to do something better.
  402. pendingNotifications buffer.RingGrowing
  403. // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
  404. requestedResyncPeriod time.Duration
  405. // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
  406. // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
  407. // informer's overall resync check period.
  408. resyncPeriod time.Duration
  409. // nextResync is the earliest time the listener should get a full resync
  410. nextResync time.Time
  411. // resyncLock guards access to resyncPeriod and nextResync
  412. resyncLock sync.Mutex
  413. }
  414. func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
  415. ret := &processorListener{
  416. nextCh: make(chan interface{}),
  417. addCh: make(chan interface{}),
  418. handler: handler,
  419. pendingNotifications: *buffer.NewRingGrowing(bufferSize),
  420. requestedResyncPeriod: requestedResyncPeriod,
  421. resyncPeriod: resyncPeriod,
  422. }
  423. ret.determineNextResync(now)
  424. return ret
  425. }
  426. func (p *processorListener) add(notification interface{}) {
  427. p.addCh <- notification
  428. }
  429. func (p *processorListener) pop() {
  430. defer utilruntime.HandleCrash()
  431. defer close(p.nextCh) // Tell .run() to stop
  432. var nextCh chan<- interface{}
  433. var notification interface{}
  434. for {
  435. select {
  436. case nextCh <- notification:
  437. // Notification dispatched
  438. var ok bool
  439. notification, ok = p.pendingNotifications.ReadOne()
  440. if !ok { // Nothing to pop
  441. nextCh = nil // Disable this select case
  442. }
  443. case notificationToAdd, ok := <-p.addCh:
  444. if !ok {
  445. return
  446. }
  447. if notification == nil { // No notification to pop (and pendingNotifications is empty)
  448. // Optimize the case - skip adding to pendingNotifications
  449. notification = notificationToAdd
  450. nextCh = p.nextCh
  451. } else { // There is already a notification waiting to be dispatched
  452. p.pendingNotifications.WriteOne(notificationToAdd)
  453. }
  454. }
  455. }
  456. }
  457. func (p *processorListener) run() {
  458. // this call blocks until the channel is closed. When a panic happens during the notification
  459. // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  460. // the next notification will be attempted. This is usually better than the alternative of never
  461. // delivering again.
  462. stopCh := make(chan struct{})
  463. wait.Until(func() {
  464. // this gives us a few quick retries before a long pause and then a few more quick retries
  465. err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
  466. for next := range p.nextCh {
  467. switch notification := next.(type) {
  468. case updateNotification:
  469. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  470. case addNotification:
  471. p.handler.OnAdd(notification.newObj)
  472. case deleteNotification:
  473. p.handler.OnDelete(notification.oldObj)
  474. default:
  475. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
  476. }
  477. }
  478. // the only way to get here is if the p.nextCh is empty and closed
  479. return true, nil
  480. })
  481. // the only way to get here is if the p.nextCh is empty and closed
  482. if err == nil {
  483. close(stopCh)
  484. }
  485. }, 1*time.Minute, stopCh)
  486. }
  487. // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
  488. // this always returns false.
  489. func (p *processorListener) shouldResync(now time.Time) bool {
  490. p.resyncLock.Lock()
  491. defer p.resyncLock.Unlock()
  492. if p.resyncPeriod == 0 {
  493. return false
  494. }
  495. return now.After(p.nextResync) || now.Equal(p.nextResync)
  496. }
  497. func (p *processorListener) determineNextResync(now time.Time) {
  498. p.resyncLock.Lock()
  499. defer p.resyncLock.Unlock()
  500. p.nextResync = now.Add(p.resyncPeriod)
  501. }
  502. func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  503. p.resyncLock.Lock()
  504. defer p.resyncLock.Unlock()
  505. p.resyncPeriod = resyncPeriod
  506. }