watch.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package watch
  14. import (
  15. "fmt"
  16. "sync"
  17. "github.com/golang/glog"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. )
  20. // Interface can be implemented by anything that knows how to watch and report changes.
  21. type Interface interface {
  22. // Stops watching. Will close the channel returned by ResultChan(). Releases
  23. // any resources used by the watch.
  24. Stop()
  25. // Returns a chan which will receive all the events. If an error occurs
  26. // or Stop() is called, this channel will be closed, in which case the
  27. // watch should be completely cleaned up.
  28. ResultChan() <-chan Event
  29. }
  30. // EventType defines the possible types of events.
  31. type EventType string
  32. const (
  33. Added EventType = "ADDED"
  34. Modified EventType = "MODIFIED"
  35. Deleted EventType = "DELETED"
  36. Error EventType = "ERROR"
  37. DefaultChanSize int32 = 100
  38. )
  39. // Event represents a single event to a watched resource.
  40. // +k8s:deepcopy-gen=true
  41. type Event struct {
  42. Type EventType
  43. // Object is:
  44. // * If Type is Added or Modified: the new state of the object.
  45. // * If Type is Deleted: the state of the object immediately before deletion.
  46. // * If Type is Error: *api.Status is recommended; other types may make sense
  47. // depending on context.
  48. Object runtime.Object
  49. }
  50. type emptyWatch chan Event
  51. // NewEmptyWatch returns a watch interface that returns no results and is closed.
  52. // May be used in certain error conditions where no information is available but
  53. // an error is not warranted.
  54. func NewEmptyWatch() Interface {
  55. ch := make(chan Event)
  56. close(ch)
  57. return emptyWatch(ch)
  58. }
  59. // Stop implements Interface
  60. func (w emptyWatch) Stop() {
  61. }
  62. // ResultChan implements Interface
  63. func (w emptyWatch) ResultChan() <-chan Event {
  64. return chan Event(w)
  65. }
  66. // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  67. type FakeWatcher struct {
  68. result chan Event
  69. Stopped bool
  70. sync.Mutex
  71. }
  72. func NewFake() *FakeWatcher {
  73. return &FakeWatcher{
  74. result: make(chan Event),
  75. }
  76. }
  77. func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
  78. return &FakeWatcher{
  79. result: make(chan Event, size),
  80. }
  81. }
  82. // Stop implements Interface.Stop().
  83. func (f *FakeWatcher) Stop() {
  84. f.Lock()
  85. defer f.Unlock()
  86. if !f.Stopped {
  87. glog.V(4).Infof("Stopping fake watcher.")
  88. close(f.result)
  89. f.Stopped = true
  90. }
  91. }
  92. func (f *FakeWatcher) IsStopped() bool {
  93. f.Lock()
  94. defer f.Unlock()
  95. return f.Stopped
  96. }
  97. // Reset prepares the watcher to be reused.
  98. func (f *FakeWatcher) Reset() {
  99. f.Lock()
  100. defer f.Unlock()
  101. f.Stopped = false
  102. f.result = make(chan Event)
  103. }
  104. func (f *FakeWatcher) ResultChan() <-chan Event {
  105. return f.result
  106. }
  107. // Add sends an add event.
  108. func (f *FakeWatcher) Add(obj runtime.Object) {
  109. f.result <- Event{Added, obj}
  110. }
  111. // Modify sends a modify event.
  112. func (f *FakeWatcher) Modify(obj runtime.Object) {
  113. f.result <- Event{Modified, obj}
  114. }
  115. // Delete sends a delete event.
  116. func (f *FakeWatcher) Delete(lastValue runtime.Object) {
  117. f.result <- Event{Deleted, lastValue}
  118. }
  119. // Error sends an Error event.
  120. func (f *FakeWatcher) Error(errValue runtime.Object) {
  121. f.result <- Event{Error, errValue}
  122. }
  123. // Action sends an event of the requested type, for table-based testing.
  124. func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
  125. f.result <- Event{action, obj}
  126. }
  127. // RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  128. type RaceFreeFakeWatcher struct {
  129. result chan Event
  130. Stopped bool
  131. sync.Mutex
  132. }
  133. func NewRaceFreeFake() *RaceFreeFakeWatcher {
  134. return &RaceFreeFakeWatcher{
  135. result: make(chan Event, DefaultChanSize),
  136. }
  137. }
  138. // Stop implements Interface.Stop().
  139. func (f *RaceFreeFakeWatcher) Stop() {
  140. f.Lock()
  141. defer f.Unlock()
  142. if !f.Stopped {
  143. glog.V(4).Infof("Stopping fake watcher.")
  144. close(f.result)
  145. f.Stopped = true
  146. }
  147. }
  148. func (f *RaceFreeFakeWatcher) IsStopped() bool {
  149. f.Lock()
  150. defer f.Unlock()
  151. return f.Stopped
  152. }
  153. // Reset prepares the watcher to be reused.
  154. func (f *RaceFreeFakeWatcher) Reset() {
  155. f.Lock()
  156. defer f.Unlock()
  157. f.Stopped = false
  158. f.result = make(chan Event, DefaultChanSize)
  159. }
  160. func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
  161. f.Lock()
  162. defer f.Unlock()
  163. return f.result
  164. }
  165. // Add sends an add event.
  166. func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
  167. f.Lock()
  168. defer f.Unlock()
  169. if !f.Stopped {
  170. select {
  171. case f.result <- Event{Added, obj}:
  172. return
  173. default:
  174. panic(fmt.Errorf("channel full"))
  175. }
  176. }
  177. }
  178. // Modify sends a modify event.
  179. func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
  180. f.Lock()
  181. defer f.Unlock()
  182. if !f.Stopped {
  183. select {
  184. case f.result <- Event{Modified, obj}:
  185. return
  186. default:
  187. panic(fmt.Errorf("channel full"))
  188. }
  189. }
  190. }
  191. // Delete sends a delete event.
  192. func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
  193. f.Lock()
  194. defer f.Unlock()
  195. if !f.Stopped {
  196. select {
  197. case f.result <- Event{Deleted, lastValue}:
  198. return
  199. default:
  200. panic(fmt.Errorf("channel full"))
  201. }
  202. }
  203. }
  204. // Error sends an Error event.
  205. func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
  206. f.Lock()
  207. defer f.Unlock()
  208. if !f.Stopped {
  209. select {
  210. case f.result <- Event{Error, errValue}:
  211. return
  212. default:
  213. panic(fmt.Errorf("channel full"))
  214. }
  215. }
  216. }
  217. // Action sends an event of the requested type, for table-based testing.
  218. func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
  219. f.Lock()
  220. defer f.Unlock()
  221. if !f.Stopped {
  222. select {
  223. case f.result <- Event{action, obj}:
  224. return
  225. default:
  226. panic(fmt.Errorf("channel full"))
  227. }
  228. }
  229. }