delta_fifo.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "github.com/golang/glog"
  20. )
  21. // NewDeltaFIFO returns a Store which can be used process changes to items.
  22. //
  23. // keyFunc is used to figure out what key an object should have. (It's
  24. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
  25. //
  26. // 'keyLister' is expected to return a list of keys that the consumer of
  27. // this queue "knows about". It is used to decide which items are missing
  28. // when Replace() is called; 'Deleted' deltas are produced for these items.
  29. // It may be nil if you don't need to detect all deletions.
  30. // TODO: consider merging keyLister with this object, tracking a list of
  31. // "known" keys when Pop() is called. Have to think about how that
  32. // affects error retrying.
  33. // NOTE: It is possible to misuse this and cause a race when using an
  34. // external known object source.
  35. // Whether there is a potential race depends on how the comsumer
  36. // modifies knownObjects. In Pop(), process function is called under
  37. // lock, so it is safe to update data structures in it that need to be
  38. // in sync with the queue (e.g. knownObjects).
  39. //
  40. // Example:
  41. // In case of sharedIndexInformer being a consumer
  42. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
  43. // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  44. // there is no race as knownObjects (s.indexer) is modified safely
  45. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  46. // GetIndexer() methods, which expose ways to modify the underlying
  47. // storage. Currently these two methods are used for creating Lister
  48. // and internal tests.
  49. //
  50. // Also see the comment on DeltaFIFO.
  51. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  52. f := &DeltaFIFO{
  53. items: map[string]Deltas{},
  54. queue: []string{},
  55. keyFunc: keyFunc,
  56. knownObjects: knownObjects,
  57. }
  58. f.cond.L = &f.lock
  59. return f
  60. }
  61. // DeltaFIFO is like FIFO, but allows you to process deletes.
  62. //
  63. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  64. // intended to be the producer, and the consumer is whatever calls
  65. // the Pop() method.
  66. //
  67. // DeltaFIFO solves this use case:
  68. // * You want to process every object change (delta) at most once.
  69. // * When you process an object, you want to see everything
  70. // that's happened to it since you last processed it.
  71. // * You want to process the deletion of objects.
  72. // * You might want to periodically reprocess objects.
  73. //
  74. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  75. // interface{} to satisfy the Store/Queue interfaces, but it
  76. // will always return an object of type Deltas.
  77. //
  78. // A note on threading: If you call Pop() in parallel from multiple
  79. // threads, you could end up with multiple threads processing slightly
  80. // different versions of the same object.
  81. //
  82. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
  83. // to list keys that are "known", for the purpose of figuring out which
  84. // items have been deleted when Replace() or Delete() are called. The deleted
  85. // object will be included in the DeleteFinalStateUnknown markers. These objects
  86. // could be stale.
  87. type DeltaFIFO struct {
  88. // lock/cond protects access to 'items' and 'queue'.
  89. lock sync.RWMutex
  90. cond sync.Cond
  91. // We depend on the property that items in the set are in
  92. // the queue and vice versa, and that all Deltas in this
  93. // map have at least one Delta.
  94. items map[string]Deltas
  95. queue []string
  96. // populated is true if the first batch of items inserted by Replace() has been populated
  97. // or Delete/Add/Update was called first.
  98. populated bool
  99. // initialPopulationCount is the number of items inserted by the first call of Replace()
  100. initialPopulationCount int
  101. // keyFunc is used to make the key used for queued item
  102. // insertion and retrieval, and should be deterministic.
  103. keyFunc KeyFunc
  104. // knownObjects list keys that are "known", for the
  105. // purpose of figuring out which items have been deleted
  106. // when Replace() or Delete() is called.
  107. knownObjects KeyListerGetter
  108. // Indication the queue is closed.
  109. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  110. // Currently, not used to gate any of CRED operations.
  111. closed bool
  112. closedLock sync.Mutex
  113. }
  114. var (
  115. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  116. )
  117. var (
  118. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  119. // object with zero length is encountered (should be impossible,
  120. // but included for completeness).
  121. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  122. )
  123. // Close the queue.
  124. func (f *DeltaFIFO) Close() {
  125. f.closedLock.Lock()
  126. defer f.closedLock.Unlock()
  127. f.closed = true
  128. f.cond.Broadcast()
  129. }
  130. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  131. // DeletedFinalStateUnknown objects.
  132. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  133. if d, ok := obj.(Deltas); ok {
  134. if len(d) == 0 {
  135. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  136. }
  137. obj = d.Newest().Object
  138. }
  139. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  140. return d.Key, nil
  141. }
  142. return f.keyFunc(obj)
  143. }
  144. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  145. // or an Update called first but the first batch of items inserted by Replace() has been popped
  146. func (f *DeltaFIFO) HasSynced() bool {
  147. f.lock.Lock()
  148. defer f.lock.Unlock()
  149. return f.populated && f.initialPopulationCount == 0
  150. }
  151. // Add inserts an item, and puts it in the queue. The item is only enqueued
  152. // if it doesn't already exist in the set.
  153. func (f *DeltaFIFO) Add(obj interface{}) error {
  154. f.lock.Lock()
  155. defer f.lock.Unlock()
  156. f.populated = true
  157. return f.queueActionLocked(Added, obj)
  158. }
  159. // Update is just like Add, but makes an Updated Delta.
  160. func (f *DeltaFIFO) Update(obj interface{}) error {
  161. f.lock.Lock()
  162. defer f.lock.Unlock()
  163. f.populated = true
  164. return f.queueActionLocked(Updated, obj)
  165. }
  166. // Delete is just like Add, but makes an Deleted Delta. If the item does not
  167. // already exist, it will be ignored. (It may have already been deleted by a
  168. // Replace (re-list), for example.
  169. func (f *DeltaFIFO) Delete(obj interface{}) error {
  170. id, err := f.KeyOf(obj)
  171. if err != nil {
  172. return KeyError{obj, err}
  173. }
  174. f.lock.Lock()
  175. defer f.lock.Unlock()
  176. f.populated = true
  177. if f.knownObjects == nil {
  178. if _, exists := f.items[id]; !exists {
  179. // Presumably, this was deleted when a relist happened.
  180. // Don't provide a second report of the same deletion.
  181. return nil
  182. }
  183. } else {
  184. // We only want to skip the "deletion" action if the object doesn't
  185. // exist in knownObjects and it doesn't have corresponding item in items.
  186. // Note that even if there is a "deletion" action in items, we can ignore it,
  187. // because it will be deduped automatically in "queueActionLocked"
  188. _, exists, err := f.knownObjects.GetByKey(id)
  189. _, itemsExist := f.items[id]
  190. if err == nil && !exists && !itemsExist {
  191. // Presumably, this was deleted when a relist happened.
  192. // Don't provide a second report of the same deletion.
  193. return nil
  194. }
  195. }
  196. return f.queueActionLocked(Deleted, obj)
  197. }
  198. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  199. // present in the set, it is neither enqueued nor added to the set.
  200. //
  201. // This is useful in a single producer/consumer scenario so that the consumer can
  202. // safely retry items without contending with the producer and potentially enqueueing
  203. // stale items.
  204. //
  205. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  206. // different from the Add/Update/Delete functions.
  207. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  208. deltas, ok := obj.(Deltas)
  209. if !ok {
  210. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  211. }
  212. id, err := f.KeyOf(deltas.Newest().Object)
  213. if err != nil {
  214. return KeyError{obj, err}
  215. }
  216. f.lock.Lock()
  217. defer f.lock.Unlock()
  218. f.addIfNotPresent(id, deltas)
  219. return nil
  220. }
  221. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  222. // already holds the fifo lock.
  223. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  224. f.populated = true
  225. if _, exists := f.items[id]; exists {
  226. return
  227. }
  228. f.queue = append(f.queue, id)
  229. f.items[id] = deltas
  230. f.cond.Broadcast()
  231. }
  232. // re-listing and watching can deliver the same update multiple times in any
  233. // order. This will combine the most recent two deltas if they are the same.
  234. func dedupDeltas(deltas Deltas) Deltas {
  235. n := len(deltas)
  236. if n < 2 {
  237. return deltas
  238. }
  239. a := &deltas[n-1]
  240. b := &deltas[n-2]
  241. if out := isDup(a, b); out != nil {
  242. d := append(Deltas{}, deltas[:n-2]...)
  243. return append(d, *out)
  244. }
  245. return deltas
  246. }
  247. // If a & b represent the same event, returns the delta that ought to be kept.
  248. // Otherwise, returns nil.
  249. // TODO: is there anything other than deletions that need deduping?
  250. func isDup(a, b *Delta) *Delta {
  251. if out := isDeletionDup(a, b); out != nil {
  252. return out
  253. }
  254. // TODO: Detect other duplicate situations? Are there any?
  255. return nil
  256. }
  257. // keep the one with the most information if both are deletions.
  258. func isDeletionDup(a, b *Delta) *Delta {
  259. if b.Type != Deleted || a.Type != Deleted {
  260. return nil
  261. }
  262. // Do more sophisticated checks, or is this sufficient?
  263. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  264. return a
  265. }
  266. return b
  267. }
  268. // willObjectBeDeletedLocked returns true only if the last delta for the
  269. // given object is Delete. Caller must lock first.
  270. func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
  271. deltas := f.items[id]
  272. return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
  273. }
  274. // queueActionLocked appends to the delta list for the object.
  275. // Caller must lock first.
  276. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  277. id, err := f.KeyOf(obj)
  278. if err != nil {
  279. return KeyError{obj, err}
  280. }
  281. // If object is supposed to be deleted (last event is Deleted),
  282. // then we should ignore Sync events, because it would result in
  283. // recreation of this object.
  284. if actionType == Sync && f.willObjectBeDeletedLocked(id) {
  285. return nil
  286. }
  287. newDeltas := append(f.items[id], Delta{actionType, obj})
  288. newDeltas = dedupDeltas(newDeltas)
  289. _, exists := f.items[id]
  290. if len(newDeltas) > 0 {
  291. if !exists {
  292. f.queue = append(f.queue, id)
  293. }
  294. f.items[id] = newDeltas
  295. f.cond.Broadcast()
  296. } else if exists {
  297. // We need to remove this from our map (extra items
  298. // in the queue are ignored if they are not in the
  299. // map).
  300. delete(f.items, id)
  301. }
  302. return nil
  303. }
  304. // List returns a list of all the items; it returns the object
  305. // from the most recent Delta.
  306. // You should treat the items returned inside the deltas as immutable.
  307. func (f *DeltaFIFO) List() []interface{} {
  308. f.lock.RLock()
  309. defer f.lock.RUnlock()
  310. return f.listLocked()
  311. }
  312. func (f *DeltaFIFO) listLocked() []interface{} {
  313. list := make([]interface{}, 0, len(f.items))
  314. for _, item := range f.items {
  315. // Copy item's slice so operations on this slice
  316. // won't interfere with the object we return.
  317. item = copyDeltas(item)
  318. list = append(list, item.Newest().Object)
  319. }
  320. return list
  321. }
  322. // ListKeys returns a list of all the keys of the objects currently
  323. // in the FIFO.
  324. func (f *DeltaFIFO) ListKeys() []string {
  325. f.lock.RLock()
  326. defer f.lock.RUnlock()
  327. list := make([]string, 0, len(f.items))
  328. for key := range f.items {
  329. list = append(list, key)
  330. }
  331. return list
  332. }
  333. // Get returns the complete list of deltas for the requested item,
  334. // or sets exists=false.
  335. // You should treat the items returned inside the deltas as immutable.
  336. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  337. key, err := f.KeyOf(obj)
  338. if err != nil {
  339. return nil, false, KeyError{obj, err}
  340. }
  341. return f.GetByKey(key)
  342. }
  343. // GetByKey returns the complete list of deltas for the requested item,
  344. // setting exists=false if that list is empty.
  345. // You should treat the items returned inside the deltas as immutable.
  346. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  347. f.lock.RLock()
  348. defer f.lock.RUnlock()
  349. d, exists := f.items[key]
  350. if exists {
  351. // Copy item's slice so operations on this slice
  352. // won't interfere with the object we return.
  353. d = copyDeltas(d)
  354. }
  355. return d, exists, nil
  356. }
  357. // Checks if the queue is closed
  358. func (f *DeltaFIFO) IsClosed() bool {
  359. f.closedLock.Lock()
  360. defer f.closedLock.Unlock()
  361. if f.closed {
  362. return true
  363. }
  364. return false
  365. }
  366. // Pop blocks until an item is added to the queue, and then returns it. If
  367. // multiple items are ready, they are returned in the order in which they were
  368. // added/updated. The item is removed from the queue (and the store) before it
  369. // is returned, so if you don't successfully process it, you need to add it back
  370. // with AddIfNotPresent().
  371. // process function is called under lock, so it is safe update data structures
  372. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  373. // may return an instance of ErrRequeue with a nested error to indicate the current
  374. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  375. //
  376. // Pop returns a 'Deltas', which has a complete list of all the things
  377. // that happened to the object (deltas) while it was sitting in the queue.
  378. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  379. f.lock.Lock()
  380. defer f.lock.Unlock()
  381. for {
  382. for len(f.queue) == 0 {
  383. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  384. // When Close() is called, the f.closed is set and the condition is broadcasted.
  385. // Which causes this loop to continue and return from the Pop().
  386. if f.IsClosed() {
  387. return nil, FIFOClosedError
  388. }
  389. f.cond.Wait()
  390. }
  391. id := f.queue[0]
  392. f.queue = f.queue[1:]
  393. item, ok := f.items[id]
  394. if f.initialPopulationCount > 0 {
  395. f.initialPopulationCount--
  396. }
  397. if !ok {
  398. // Item may have been deleted subsequently.
  399. continue
  400. }
  401. delete(f.items, id)
  402. err := process(item)
  403. if e, ok := err.(ErrRequeue); ok {
  404. f.addIfNotPresent(id, item)
  405. err = e.Err
  406. }
  407. // Don't need to copyDeltas here, because we're transferring
  408. // ownership to the caller.
  409. return item, err
  410. }
  411. }
  412. // Replace will delete the contents of 'f', using instead the given map.
  413. // 'f' takes ownership of the map, you should not reference the map again
  414. // after calling this function. f's queue is reset, too; upon return, it
  415. // will contain the items in the map, in no particular order.
  416. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  417. f.lock.Lock()
  418. defer f.lock.Unlock()
  419. keys := make(sets.String, len(list))
  420. for _, item := range list {
  421. key, err := f.KeyOf(item)
  422. if err != nil {
  423. return KeyError{item, err}
  424. }
  425. keys.Insert(key)
  426. if err := f.queueActionLocked(Sync, item); err != nil {
  427. return fmt.Errorf("couldn't enqueue object: %v", err)
  428. }
  429. }
  430. if f.knownObjects == nil {
  431. // Do deletion detection against our own list.
  432. for k, oldItem := range f.items {
  433. if keys.Has(k) {
  434. continue
  435. }
  436. var deletedObj interface{}
  437. if n := oldItem.Newest(); n != nil {
  438. deletedObj = n.Object
  439. }
  440. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  441. return err
  442. }
  443. }
  444. if !f.populated {
  445. f.populated = true
  446. f.initialPopulationCount = len(list)
  447. }
  448. return nil
  449. }
  450. // Detect deletions not already in the queue.
  451. knownKeys := f.knownObjects.ListKeys()
  452. queuedDeletions := 0
  453. for _, k := range knownKeys {
  454. if keys.Has(k) {
  455. continue
  456. }
  457. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  458. if err != nil {
  459. deletedObj = nil
  460. glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  461. } else if !exists {
  462. deletedObj = nil
  463. glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  464. }
  465. queuedDeletions++
  466. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  467. return err
  468. }
  469. }
  470. if !f.populated {
  471. f.populated = true
  472. f.initialPopulationCount = len(list) + queuedDeletions
  473. }
  474. return nil
  475. }
  476. // Resync will send a sync event for each item
  477. func (f *DeltaFIFO) Resync() error {
  478. f.lock.Lock()
  479. defer f.lock.Unlock()
  480. if f.knownObjects == nil {
  481. return nil
  482. }
  483. keys := f.knownObjects.ListKeys()
  484. for _, k := range keys {
  485. if err := f.syncKeyLocked(k); err != nil {
  486. return err
  487. }
  488. }
  489. return nil
  490. }
  491. func (f *DeltaFIFO) syncKey(key string) error {
  492. f.lock.Lock()
  493. defer f.lock.Unlock()
  494. return f.syncKeyLocked(key)
  495. }
  496. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  497. obj, exists, err := f.knownObjects.GetByKey(key)
  498. if err != nil {
  499. glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  500. return nil
  501. } else if !exists {
  502. glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  503. return nil
  504. }
  505. // If we are doing Resync() and there is already an event queued for that object,
  506. // we ignore the Resync for it. This is to avoid the race, in which the resync
  507. // comes with the previous value of object (since queueing an event for the object
  508. // doesn't trigger changing the underlying store <knownObjects>.
  509. id, err := f.KeyOf(obj)
  510. if err != nil {
  511. return KeyError{obj, err}
  512. }
  513. if len(f.items[id]) > 0 {
  514. return nil
  515. }
  516. if err := f.queueActionLocked(Sync, obj); err != nil {
  517. return fmt.Errorf("couldn't queue object: %v", err)
  518. }
  519. return nil
  520. }
  521. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  522. type KeyListerGetter interface {
  523. KeyLister
  524. KeyGetter
  525. }
  526. // A KeyLister is anything that knows how to list its keys.
  527. type KeyLister interface {
  528. ListKeys() []string
  529. }
  530. // A KeyGetter is anything that knows how to get the value stored under a given key.
  531. type KeyGetter interface {
  532. GetByKey(key string) (interface{}, bool, error)
  533. }
  534. // DeltaType is the type of a change (addition, deletion, etc)
  535. type DeltaType string
  536. const (
  537. Added DeltaType = "Added"
  538. Updated DeltaType = "Updated"
  539. Deleted DeltaType = "Deleted"
  540. // The other types are obvious. You'll get Sync deltas when:
  541. // * A watch expires/errors out and a new list/watch cycle is started.
  542. // * You've turned on periodic syncs.
  543. // (Anything that trigger's DeltaFIFO's Replace() method.)
  544. Sync DeltaType = "Sync"
  545. )
  546. // Delta is the type stored by a DeltaFIFO. It tells you what change
  547. // happened, and the object's state after* that change.
  548. //
  549. // [*] Unless the change is a deletion, and then you'll get the final
  550. // state of the object before it was deleted.
  551. type Delta struct {
  552. Type DeltaType
  553. Object interface{}
  554. }
  555. // Deltas is a list of one or more 'Delta's to an individual object.
  556. // The oldest delta is at index 0, the newest delta is the last one.
  557. type Deltas []Delta
  558. // Oldest is a convenience function that returns the oldest delta, or
  559. // nil if there are no deltas.
  560. func (d Deltas) Oldest() *Delta {
  561. if len(d) > 0 {
  562. return &d[0]
  563. }
  564. return nil
  565. }
  566. // Newest is a convenience function that returns the newest delta, or
  567. // nil if there are no deltas.
  568. func (d Deltas) Newest() *Delta {
  569. if n := len(d); n > 0 {
  570. return &d[n-1]
  571. }
  572. return nil
  573. }
  574. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  575. // the objects in the slice. This allows Get/List to return an object that we
  576. // know won't be clobbered by a subsequent modifications.
  577. func copyDeltas(d Deltas) Deltas {
  578. d2 := make(Deltas, len(d))
  579. copy(d2, d)
  580. return d2
  581. }
  582. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
  583. // an object was deleted but the watch deletion event was missed. In this
  584. // case we don't know the final "resting" state of the object, so there's
  585. // a chance the included `Obj` is stale.
  586. type DeletedFinalStateUnknown struct {
  587. Key string
  588. Obj interface{}
  589. }