thread_safe_store.go 8.2 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. )
  19. // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
  20. // TL;DR caveats: you must not modify anything returned by Get or List as it will break
  21. // the indexing feature in addition to not being thread safe.
  22. //
  23. // The guarantees of thread safety provided by List/Get are only valid if the caller
  24. // treats returned items as read-only. For example, a pointer inserted in the store
  25. // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
  26. // on the same key and modify the pointer in a non-thread-safe way. Also note that
  27. // modifying objects stored by the indexers (if any) will *not* automatically lead
  28. // to a re-index. So it's not a good idea to directly modify the objects returned by
  29. // Get/List, in general.
  30. type ThreadSafeStore interface {
  31. Add(key string, obj interface{})
  32. Update(key string, obj interface{})
  33. Delete(key string)
  34. Get(key string) (item interface{}, exists bool)
  35. List() []interface{}
  36. ListKeys() []string
  37. Replace(map[string]interface{}, string)
  38. Index(indexName string, obj interface{}) ([]interface{}, error)
  39. IndexKeys(indexName, indexKey string) ([]string, error)
  40. ListIndexFuncValues(name string) []string
  41. ByIndex(indexName, indexKey string) ([]interface{}, error)
  42. GetIndexers() Indexers
  43. // AddIndexers adds more indexers to this store. If you call this after you already have data
  44. // in the store, the results are undefined.
  45. AddIndexers(newIndexers Indexers) error
  46. Resync() error
  47. }
  48. // threadSafeMap implements ThreadSafeStore
  49. type threadSafeMap struct {
  50. lock sync.RWMutex
  51. items map[string]interface{}
  52. // indexers maps a name to an IndexFunc
  53. indexers Indexers
  54. // indices maps a name to an Index
  55. indices Indices
  56. }
  57. func (c *threadSafeMap) Add(key string, obj interface{}) {
  58. c.lock.Lock()
  59. defer c.lock.Unlock()
  60. oldObject := c.items[key]
  61. c.items[key] = obj
  62. c.updateIndices(oldObject, obj, key)
  63. }
  64. func (c *threadSafeMap) Update(key string, obj interface{}) {
  65. c.lock.Lock()
  66. defer c.lock.Unlock()
  67. oldObject := c.items[key]
  68. c.items[key] = obj
  69. c.updateIndices(oldObject, obj, key)
  70. }
  71. func (c *threadSafeMap) Delete(key string) {
  72. c.lock.Lock()
  73. defer c.lock.Unlock()
  74. if obj, exists := c.items[key]; exists {
  75. c.deleteFromIndices(obj, key)
  76. delete(c.items, key)
  77. }
  78. }
  79. func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
  80. c.lock.RLock()
  81. defer c.lock.RUnlock()
  82. item, exists = c.items[key]
  83. return item, exists
  84. }
  85. func (c *threadSafeMap) List() []interface{} {
  86. c.lock.RLock()
  87. defer c.lock.RUnlock()
  88. list := make([]interface{}, 0, len(c.items))
  89. for _, item := range c.items {
  90. list = append(list, item)
  91. }
  92. return list
  93. }
  94. // ListKeys returns a list of all the keys of the objects currently
  95. // in the threadSafeMap.
  96. func (c *threadSafeMap) ListKeys() []string {
  97. c.lock.RLock()
  98. defer c.lock.RUnlock()
  99. list := make([]string, 0, len(c.items))
  100. for key := range c.items {
  101. list = append(list, key)
  102. }
  103. return list
  104. }
  105. func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
  106. c.lock.Lock()
  107. defer c.lock.Unlock()
  108. c.items = items
  109. // rebuild any index
  110. c.indices = Indices{}
  111. for key, item := range c.items {
  112. c.updateIndices(nil, item, key)
  113. }
  114. }
  115. // Index returns a list of items that match on the index function
  116. // Index is thread-safe so long as you treat all items as immutable
  117. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
  118. c.lock.RLock()
  119. defer c.lock.RUnlock()
  120. indexFunc := c.indexers[indexName]
  121. if indexFunc == nil {
  122. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  123. }
  124. indexKeys, err := indexFunc(obj)
  125. if err != nil {
  126. return nil, err
  127. }
  128. index := c.indices[indexName]
  129. // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
  130. returnKeySet := sets.String{}
  131. for _, indexKey := range indexKeys {
  132. set := index[indexKey]
  133. for _, key := range set.UnsortedList() {
  134. returnKeySet.Insert(key)
  135. }
  136. }
  137. list := make([]interface{}, 0, returnKeySet.Len())
  138. for absoluteKey := range returnKeySet {
  139. list = append(list, c.items[absoluteKey])
  140. }
  141. return list, nil
  142. }
  143. // ByIndex returns a list of items that match an exact value on the index function
  144. func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
  145. c.lock.RLock()
  146. defer c.lock.RUnlock()
  147. indexFunc := c.indexers[indexName]
  148. if indexFunc == nil {
  149. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  150. }
  151. index := c.indices[indexName]
  152. set := index[indexKey]
  153. list := make([]interface{}, 0, set.Len())
  154. for _, key := range set.List() {
  155. list = append(list, c.items[key])
  156. }
  157. return list, nil
  158. }
  159. // IndexKeys returns a list of keys that match on the index function.
  160. // IndexKeys is thread-safe so long as you treat all items as immutable.
  161. func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
  162. c.lock.RLock()
  163. defer c.lock.RUnlock()
  164. indexFunc := c.indexers[indexName]
  165. if indexFunc == nil {
  166. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  167. }
  168. index := c.indices[indexName]
  169. set := index[indexKey]
  170. return set.List(), nil
  171. }
  172. func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
  173. c.lock.RLock()
  174. defer c.lock.RUnlock()
  175. index := c.indices[indexName]
  176. names := make([]string, 0, len(index))
  177. for key := range index {
  178. names = append(names, key)
  179. }
  180. return names
  181. }
  182. func (c *threadSafeMap) GetIndexers() Indexers {
  183. return c.indexers
  184. }
  185. func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
  186. c.lock.Lock()
  187. defer c.lock.Unlock()
  188. if len(c.items) > 0 {
  189. return fmt.Errorf("cannot add indexers to running index")
  190. }
  191. oldKeys := sets.StringKeySet(c.indexers)
  192. newKeys := sets.StringKeySet(newIndexers)
  193. if oldKeys.HasAny(newKeys.List()...) {
  194. return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
  195. }
  196. for k, v := range newIndexers {
  197. c.indexers[k] = v
  198. }
  199. return nil
  200. }
  201. // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
  202. // updateIndices must be called from a function that already has a lock on the cache
  203. func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
  204. // if we got an old object, we need to remove it before we add it again
  205. if oldObj != nil {
  206. c.deleteFromIndices(oldObj, key)
  207. }
  208. for name, indexFunc := range c.indexers {
  209. indexValues, err := indexFunc(newObj)
  210. if err != nil {
  211. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  212. }
  213. index := c.indices[name]
  214. if index == nil {
  215. index = Index{}
  216. c.indices[name] = index
  217. }
  218. for _, indexValue := range indexValues {
  219. set := index[indexValue]
  220. if set == nil {
  221. set = sets.String{}
  222. index[indexValue] = set
  223. }
  224. set.Insert(key)
  225. }
  226. }
  227. }
  228. // deleteFromIndices removes the object from each of the managed indexes
  229. // it is intended to be called from a function that already has a lock on the cache
  230. func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
  231. for name, indexFunc := range c.indexers {
  232. indexValues, err := indexFunc(obj)
  233. if err != nil {
  234. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  235. }
  236. index := c.indices[name]
  237. if index == nil {
  238. continue
  239. }
  240. for _, indexValue := range indexValues {
  241. set := index[indexValue]
  242. if set != nil {
  243. set.Delete(key)
  244. }
  245. }
  246. }
  247. }
  248. func (c *threadSafeMap) Resync() error {
  249. // Nothing to do
  250. return nil
  251. }
  252. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
  253. return &threadSafeMap{
  254. items: map[string]interface{}{},
  255. indexers: indexers,
  256. indices: indices,
  257. }
  258. }