123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "fmt"
- "sync"
- "k8s.io/apimachinery/pkg/util/sets"
- )
- // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
- // TL;DR caveats: you must not modify anything returned by Get or List as it will break
- // the indexing feature in addition to not being thread safe.
- //
- // The guarantees of thread safety provided by List/Get are only valid if the caller
- // treats returned items as read-only. For example, a pointer inserted in the store
- // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
- // on the same key and modify the pointer in a non-thread-safe way. Also note that
- // modifying objects stored by the indexers (if any) will *not* automatically lead
- // to a re-index. So it's not a good idea to directly modify the objects returned by
- // Get/List, in general.
- type ThreadSafeStore interface {
- Add(key string, obj interface{})
- Update(key string, obj interface{})
- Delete(key string)
- Get(key string) (item interface{}, exists bool)
- List() []interface{}
- ListKeys() []string
- Replace(map[string]interface{}, string)
- Index(indexName string, obj interface{}) ([]interface{}, error)
- IndexKeys(indexName, indexKey string) ([]string, error)
- ListIndexFuncValues(name string) []string
- ByIndex(indexName, indexKey string) ([]interface{}, error)
- GetIndexers() Indexers
- // AddIndexers adds more indexers to this store. If you call this after you already have data
- // in the store, the results are undefined.
- AddIndexers(newIndexers Indexers) error
- Resync() error
- }
- // threadSafeMap implements ThreadSafeStore
- type threadSafeMap struct {
- lock sync.RWMutex
- items map[string]interface{}
- // indexers maps a name to an IndexFunc
- indexers Indexers
- // indices maps a name to an Index
- indices Indices
- }
- func (c *threadSafeMap) Add(key string, obj interface{}) {
- c.lock.Lock()
- defer c.lock.Unlock()
- oldObject := c.items[key]
- c.items[key] = obj
- c.updateIndices(oldObject, obj, key)
- }
- func (c *threadSafeMap) Update(key string, obj interface{}) {
- c.lock.Lock()
- defer c.lock.Unlock()
- oldObject := c.items[key]
- c.items[key] = obj
- c.updateIndices(oldObject, obj, key)
- }
- func (c *threadSafeMap) Delete(key string) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if obj, exists := c.items[key]; exists {
- c.deleteFromIndices(obj, key)
- delete(c.items, key)
- }
- }
- func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- item, exists = c.items[key]
- return item, exists
- }
- func (c *threadSafeMap) List() []interface{} {
- c.lock.RLock()
- defer c.lock.RUnlock()
- list := make([]interface{}, 0, len(c.items))
- for _, item := range c.items {
- list = append(list, item)
- }
- return list
- }
- // ListKeys returns a list of all the keys of the objects currently
- // in the threadSafeMap.
- func (c *threadSafeMap) ListKeys() []string {
- c.lock.RLock()
- defer c.lock.RUnlock()
- list := make([]string, 0, len(c.items))
- for key := range c.items {
- list = append(list, key)
- }
- return list
- }
- func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.items = items
- // rebuild any index
- c.indices = Indices{}
- for key, item := range c.items {
- c.updateIndices(nil, item, key)
- }
- }
- // Index returns a list of items that match on the index function
- // Index is thread-safe so long as you treat all items as immutable
- func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- indexFunc := c.indexers[indexName]
- if indexFunc == nil {
- return nil, fmt.Errorf("Index with name %s does not exist", indexName)
- }
- indexKeys, err := indexFunc(obj)
- if err != nil {
- return nil, err
- }
- index := c.indices[indexName]
- // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
- returnKeySet := sets.String{}
- for _, indexKey := range indexKeys {
- set := index[indexKey]
- for _, key := range set.UnsortedList() {
- returnKeySet.Insert(key)
- }
- }
- list := make([]interface{}, 0, returnKeySet.Len())
- for absoluteKey := range returnKeySet {
- list = append(list, c.items[absoluteKey])
- }
- return list, nil
- }
- // ByIndex returns a list of items that match an exact value on the index function
- func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- indexFunc := c.indexers[indexName]
- if indexFunc == nil {
- return nil, fmt.Errorf("Index with name %s does not exist", indexName)
- }
- index := c.indices[indexName]
- set := index[indexKey]
- list := make([]interface{}, 0, set.Len())
- for _, key := range set.List() {
- list = append(list, c.items[key])
- }
- return list, nil
- }
- // IndexKeys returns a list of keys that match on the index function.
- // IndexKeys is thread-safe so long as you treat all items as immutable.
- func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
- c.lock.RLock()
- defer c.lock.RUnlock()
- indexFunc := c.indexers[indexName]
- if indexFunc == nil {
- return nil, fmt.Errorf("Index with name %s does not exist", indexName)
- }
- index := c.indices[indexName]
- set := index[indexKey]
- return set.List(), nil
- }
- func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
- c.lock.RLock()
- defer c.lock.RUnlock()
- index := c.indices[indexName]
- names := make([]string, 0, len(index))
- for key := range index {
- names = append(names, key)
- }
- return names
- }
- func (c *threadSafeMap) GetIndexers() Indexers {
- return c.indexers
- }
- func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
- c.lock.Lock()
- defer c.lock.Unlock()
- if len(c.items) > 0 {
- return fmt.Errorf("cannot add indexers to running index")
- }
- oldKeys := sets.StringKeySet(c.indexers)
- newKeys := sets.StringKeySet(newIndexers)
- if oldKeys.HasAny(newKeys.List()...) {
- return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
- }
- for k, v := range newIndexers {
- c.indexers[k] = v
- }
- return nil
- }
- // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
- // updateIndices must be called from a function that already has a lock on the cache
- func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
- // if we got an old object, we need to remove it before we add it again
- if oldObj != nil {
- c.deleteFromIndices(oldObj, key)
- }
- for name, indexFunc := range c.indexers {
- indexValues, err := indexFunc(newObj)
- if err != nil {
- panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
- }
- index := c.indices[name]
- if index == nil {
- index = Index{}
- c.indices[name] = index
- }
- for _, indexValue := range indexValues {
- set := index[indexValue]
- if set == nil {
- set = sets.String{}
- index[indexValue] = set
- }
- set.Insert(key)
- }
- }
- }
- // deleteFromIndices removes the object from each of the managed indexes
- // it is intended to be called from a function that already has a lock on the cache
- func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
- for name, indexFunc := range c.indexers {
- indexValues, err := indexFunc(obj)
- if err != nil {
- panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
- }
- index := c.indices[name]
- if index == nil {
- continue
- }
- for _, indexValue := range indexValues {
- set := index[indexValue]
- if set != nil {
- set.Delete(key)
- }
- }
- }
- }
- func (c *threadSafeMap) Resync() error {
- // Nothing to do
- return nil
- }
- func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
- return &threadSafeMap{
- items: map[string]interface{}{},
- indexers: indexers,
- indices: indices,
- }
- }
|