mutation_cache.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "github.com/golang/glog"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. utilcache "k8s.io/apimachinery/pkg/util/cache"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. )
  26. // MutationCache is able to take the result of update operations and stores them in an LRU
  27. // that can be used to provide a more current view of a requested object. It requires interpreting
  28. // resourceVersions for comparisons.
  29. // Implementations must be thread-safe.
  30. // TODO find a way to layer this into an informer/lister
  31. type MutationCache interface {
  32. GetByKey(key string) (interface{}, bool, error)
  33. ByIndex(indexName, indexKey string) ([]interface{}, error)
  34. Mutation(interface{})
  35. }
  36. type ResourceVersionComparator interface {
  37. CompareResourceVersion(lhs, rhs runtime.Object) int
  38. }
  39. // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
  40. // deal with objects that have a resource version that:
  41. //
  42. // - is an integer
  43. // - increases when updated
  44. // - is comparable across the same resource in a namespace
  45. //
  46. // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
  47. // remains in the mutation cache before it is removed.
  48. //
  49. // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
  50. // in the underlying store. This is only safe if your use of the cache can handle mutation entries
  51. // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
  52. func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
  53. return &mutationCache{
  54. backingCache: backingCache,
  55. indexer: indexer,
  56. mutationCache: utilcache.NewLRUExpireCache(100),
  57. comparator: etcdObjectVersioner{},
  58. ttl: ttl,
  59. includeAdds: includeAdds,
  60. }
  61. }
  62. // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
  63. // since you can't distinguish between, "didn't observe create" and "was deleted after create",
  64. // if the key is missing from the backing cache, we always return it as missing
  65. type mutationCache struct {
  66. lock sync.Mutex
  67. backingCache Store
  68. indexer Indexer
  69. mutationCache *utilcache.LRUExpireCache
  70. includeAdds bool
  71. ttl time.Duration
  72. comparator ResourceVersionComparator
  73. }
  74. // GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
  75. // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
  76. // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
  77. func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
  78. c.lock.Lock()
  79. defer c.lock.Unlock()
  80. obj, exists, err := c.backingCache.GetByKey(key)
  81. if err != nil {
  82. return nil, false, err
  83. }
  84. if !exists {
  85. if !c.includeAdds {
  86. // we can't distinguish between, "didn't observe create" and "was deleted after create", so
  87. // if the key is missing, we always return it as missing
  88. return nil, false, nil
  89. }
  90. obj, exists = c.mutationCache.Get(key)
  91. if !exists {
  92. return nil, false, nil
  93. }
  94. }
  95. objRuntime, ok := obj.(runtime.Object)
  96. if !ok {
  97. return obj, true, nil
  98. }
  99. return c.newerObject(key, objRuntime), true, nil
  100. }
  101. // ByIndex returns the newer objects that match the provided index and indexer key.
  102. // Will return an error if no indexer was provided.
  103. func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
  104. c.lock.Lock()
  105. defer c.lock.Unlock()
  106. if c.indexer == nil {
  107. return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
  108. }
  109. keys, err := c.indexer.IndexKeys(name, indexKey)
  110. if err != nil {
  111. return nil, err
  112. }
  113. var items []interface{}
  114. keySet := sets.NewString()
  115. for _, key := range keys {
  116. keySet.Insert(key)
  117. obj, exists, err := c.indexer.GetByKey(key)
  118. if err != nil {
  119. return nil, err
  120. }
  121. if !exists {
  122. continue
  123. }
  124. if objRuntime, ok := obj.(runtime.Object); ok {
  125. items = append(items, c.newerObject(key, objRuntime))
  126. } else {
  127. items = append(items, obj)
  128. }
  129. }
  130. if c.includeAdds {
  131. fn := c.indexer.GetIndexers()[name]
  132. // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
  133. for _, key := range c.mutationCache.Keys() {
  134. updated, ok := c.mutationCache.Get(key)
  135. if !ok {
  136. continue
  137. }
  138. if keySet.Has(key.(string)) {
  139. continue
  140. }
  141. elements, err := fn(updated)
  142. if err != nil {
  143. glog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
  144. continue
  145. }
  146. for _, inIndex := range elements {
  147. if inIndex != indexKey {
  148. continue
  149. }
  150. items = append(items, updated)
  151. break
  152. }
  153. }
  154. }
  155. return items, nil
  156. }
  157. // newerObject checks the mutation cache for a newer object and returns one if found. If the
  158. // mutated object is older than the backing object, it is removed from the Must be
  159. // called while the lock is held.
  160. func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
  161. mutatedObj, exists := c.mutationCache.Get(key)
  162. if !exists {
  163. return backing
  164. }
  165. mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
  166. if !ok {
  167. return backing
  168. }
  169. if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
  170. c.mutationCache.Remove(key)
  171. return backing
  172. }
  173. return mutatedObjRuntime
  174. }
  175. // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
  176. // copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
  177. // which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
  178. // preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
  179. // "one that was valid at some point in time", not "the one that I want".
  180. func (c *mutationCache) Mutation(obj interface{}) {
  181. c.lock.Lock()
  182. defer c.lock.Unlock()
  183. key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
  184. if err != nil {
  185. // this is a "nice to have", so failures shouldn't do anything weird
  186. utilruntime.HandleError(err)
  187. return
  188. }
  189. if objRuntime, ok := obj.(runtime.Object); ok {
  190. if mutatedObj, exists := c.mutationCache.Get(key); exists {
  191. if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
  192. if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
  193. return
  194. }
  195. }
  196. }
  197. }
  198. c.mutationCache.Add(key, obj, c.ttl)
  199. }
  200. // etcdObjectVersioner implements versioning and extracting etcd node information
  201. // for objects that have an embedded ObjectMeta or ListMeta field.
  202. type etcdObjectVersioner struct{}
  203. // ObjectResourceVersion implements Versioner
  204. func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
  205. accessor, err := meta.Accessor(obj)
  206. if err != nil {
  207. return 0, err
  208. }
  209. version := accessor.GetResourceVersion()
  210. if len(version) == 0 {
  211. return 0, nil
  212. }
  213. return strconv.ParseUint(version, 10, 64)
  214. }
  215. // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
  216. // but etcd resource versions are special, they're actually ints, so we can easily compare them.
  217. func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
  218. lhsVersion, err := a.ObjectResourceVersion(lhs)
  219. if err != nil {
  220. // coder error
  221. panic(err)
  222. }
  223. rhsVersion, err := a.ObjectResourceVersion(rhs)
  224. if err != nil {
  225. // coder error
  226. panic(err)
  227. }
  228. if lhsVersion == rhsVersion {
  229. return 0
  230. }
  231. if lhsVersion < rhsVersion {
  232. return -1
  233. }
  234. return 1
  235. }