mux.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package watch
  14. import (
  15. "sync"
  16. "k8s.io/apimachinery/pkg/runtime"
  17. "k8s.io/apimachinery/pkg/runtime/schema"
  18. )
  19. // FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
  20. // channel is full.
  21. type FullChannelBehavior int
  22. const (
  23. WaitIfChannelFull FullChannelBehavior = iota
  24. DropIfChannelFull
  25. )
  26. // Buffer the incoming queue a little bit even though it should rarely ever accumulate
  27. // anything, just in case a few events are received in such a short window that
  28. // Broadcaster can't move them onto the watchers' queues fast enough.
  29. const incomingQueueLength = 25
  30. // Broadcaster distributes event notifications among any number of watchers. Every event
  31. // is delivered to every watcher.
  32. type Broadcaster struct {
  33. // TODO: see if this lock is needed now that new watchers go through
  34. // the incoming channel.
  35. lock sync.Mutex
  36. watchers map[int64]*broadcasterWatcher
  37. nextWatcher int64
  38. distributing sync.WaitGroup
  39. incoming chan Event
  40. // How large to make watcher's channel.
  41. watchQueueLength int
  42. // If one of the watch channels is full, don't wait for it to become empty.
  43. // Instead just deliver it to the watchers that do have space in their
  44. // channels and move on to the next event.
  45. // It's more fair to do this on a per-watcher basis than to do it on the
  46. // "incoming" channel, which would allow one slow watcher to prevent all
  47. // other watchers from getting new events.
  48. fullChannelBehavior FullChannelBehavior
  49. }
  50. // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
  51. // It is guaranteed that events will be distributed in the order in which they occur,
  52. // but the order in which a single event is distributed among all of the watchers is unspecified.
  53. func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
  54. m := &Broadcaster{
  55. watchers: map[int64]*broadcasterWatcher{},
  56. incoming: make(chan Event, incomingQueueLength),
  57. watchQueueLength: queueLength,
  58. fullChannelBehavior: fullChannelBehavior,
  59. }
  60. m.distributing.Add(1)
  61. go m.loop()
  62. return m
  63. }
  64. const internalRunFunctionMarker = "internal-do-function"
  65. // a function type we can shoehorn into the queue.
  66. type functionFakeRuntimeObject func()
  67. func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
  68. return schema.EmptyObjectKind
  69. }
  70. func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
  71. if obj == nil {
  72. return nil
  73. }
  74. // funcs are immutable. Hence, just return the original func.
  75. return obj
  76. }
  77. // Execute f, blocking the incoming queue (and waiting for it to drain first).
  78. // The purpose of this terrible hack is so that watchers added after an event
  79. // won't ever see that event, and will always see any event after they are
  80. // added.
  81. func (b *Broadcaster) blockQueue(f func()) {
  82. var wg sync.WaitGroup
  83. wg.Add(1)
  84. b.incoming <- Event{
  85. Type: internalRunFunctionMarker,
  86. Object: functionFakeRuntimeObject(func() {
  87. defer wg.Done()
  88. f()
  89. }),
  90. }
  91. wg.Wait()
  92. }
  93. // Watch adds a new watcher to the list and returns an Interface for it.
  94. // Note: new watchers will only receive new events. They won't get an entire history
  95. // of previous events.
  96. func (m *Broadcaster) Watch() Interface {
  97. var w *broadcasterWatcher
  98. m.blockQueue(func() {
  99. m.lock.Lock()
  100. defer m.lock.Unlock()
  101. id := m.nextWatcher
  102. m.nextWatcher++
  103. w = &broadcasterWatcher{
  104. result: make(chan Event, m.watchQueueLength),
  105. stopped: make(chan struct{}),
  106. id: id,
  107. m: m,
  108. }
  109. m.watchers[id] = w
  110. })
  111. return w
  112. }
  113. // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
  114. // queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
  115. // The returned watch will have a queue length that is at least large enough to accommodate
  116. // all of the items in queuedEvents.
  117. func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
  118. var w *broadcasterWatcher
  119. m.blockQueue(func() {
  120. m.lock.Lock()
  121. defer m.lock.Unlock()
  122. id := m.nextWatcher
  123. m.nextWatcher++
  124. length := m.watchQueueLength
  125. if n := len(queuedEvents) + 1; n > length {
  126. length = n
  127. }
  128. w = &broadcasterWatcher{
  129. result: make(chan Event, length),
  130. stopped: make(chan struct{}),
  131. id: id,
  132. m: m,
  133. }
  134. m.watchers[id] = w
  135. for _, e := range queuedEvents {
  136. w.result <- e
  137. }
  138. })
  139. return w
  140. }
  141. // stopWatching stops the given watcher and removes it from the list.
  142. func (m *Broadcaster) stopWatching(id int64) {
  143. m.lock.Lock()
  144. defer m.lock.Unlock()
  145. w, ok := m.watchers[id]
  146. if !ok {
  147. // No need to do anything, it's already been removed from the list.
  148. return
  149. }
  150. delete(m.watchers, id)
  151. close(w.result)
  152. }
  153. // closeAll disconnects all watchers (presumably in response to a Shutdown call).
  154. func (m *Broadcaster) closeAll() {
  155. m.lock.Lock()
  156. defer m.lock.Unlock()
  157. for _, w := range m.watchers {
  158. close(w.result)
  159. }
  160. // Delete everything from the map, since presence/absence in the map is used
  161. // by stopWatching to avoid double-closing the channel.
  162. m.watchers = map[int64]*broadcasterWatcher{}
  163. }
  164. // Action distributes the given event among all watchers.
  165. func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
  166. m.incoming <- Event{action, obj}
  167. }
  168. // Shutdown disconnects all watchers (but any queued events will still be distributed).
  169. // You must not call Action or Watch* after calling Shutdown. This call blocks
  170. // until all events have been distributed through the outbound channels. Note
  171. // that since they can be buffered, this means that the watchers might not
  172. // have received the data yet as it can remain sitting in the buffered
  173. // channel.
  174. func (m *Broadcaster) Shutdown() {
  175. close(m.incoming)
  176. m.distributing.Wait()
  177. }
  178. // loop receives from m.incoming and distributes to all watchers.
  179. func (m *Broadcaster) loop() {
  180. // Deliberately not catching crashes here. Yes, bring down the process if there's a
  181. // bug in watch.Broadcaster.
  182. for event := range m.incoming {
  183. if event.Type == internalRunFunctionMarker {
  184. event.Object.(functionFakeRuntimeObject)()
  185. continue
  186. }
  187. m.distribute(event)
  188. }
  189. m.closeAll()
  190. m.distributing.Done()
  191. }
  192. // distribute sends event to all watchers. Blocking.
  193. func (m *Broadcaster) distribute(event Event) {
  194. m.lock.Lock()
  195. defer m.lock.Unlock()
  196. if m.fullChannelBehavior == DropIfChannelFull {
  197. for _, w := range m.watchers {
  198. select {
  199. case w.result <- event:
  200. case <-w.stopped:
  201. default: // Don't block if the event can't be queued.
  202. }
  203. }
  204. } else {
  205. for _, w := range m.watchers {
  206. select {
  207. case w.result <- event:
  208. case <-w.stopped:
  209. }
  210. }
  211. }
  212. }
  213. // broadcasterWatcher handles a single watcher of a broadcaster
  214. type broadcasterWatcher struct {
  215. result chan Event
  216. stopped chan struct{}
  217. stop sync.Once
  218. id int64
  219. m *Broadcaster
  220. }
  221. // ResultChan returns a channel to use for waiting on events.
  222. func (mw *broadcasterWatcher) ResultChan() <-chan Event {
  223. return mw.result
  224. }
  225. // Stop stops watching and removes mw from its list.
  226. func (mw *broadcasterWatcher) Stop() {
  227. mw.stop.Do(func() {
  228. close(mw.stopped)
  229. mw.m.stopWatching(mw.id)
  230. })
  231. }