123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "errors"
- "sync"
- "k8s.io/apimachinery/pkg/util/sets"
- )
- // PopProcessFunc is passed to Pop() method of Queue interface.
- // It is supposed to process the element popped from the queue.
- type PopProcessFunc func(interface{}) error
- // ErrRequeue may be returned by a PopProcessFunc to safely requeue
- // the current item. The value of Err will be returned from Pop.
- type ErrRequeue struct {
- // Err is returned by the Pop function
- Err error
- }
- var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
- func (e ErrRequeue) Error() string {
- if e.Err == nil {
- return "the popped item should be requeued without returning an error"
- }
- return e.Err.Error()
- }
- // Queue is exactly like a Store, but has a Pop() method too.
- type Queue interface {
- Store
- // Pop blocks until it has something to process.
- // It returns the object that was process and the result of processing.
- // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
- // should be requeued before releasing the lock on the queue.
- Pop(PopProcessFunc) (interface{}, error)
- // AddIfNotPresent adds a value previously
- // returned by Pop back into the queue as long
- // as nothing else (presumably more recent)
- // has since been added.
- AddIfNotPresent(interface{}) error
- // Return true if the first batch of items has been popped
- HasSynced() bool
- // Close queue
- Close()
- }
- // Helper function for popping from Queue.
- // WARNING: Do NOT use this function in non-test code to avoid races
- // unless you really really really really know what you are doing.
- func Pop(queue Queue) interface{} {
- var result interface{}
- queue.Pop(func(obj interface{}) error {
- result = obj
- return nil
- })
- return result
- }
- // FIFO receives adds and updates from a Reflector, and puts them in a queue for
- // FIFO order processing. If multiple adds/updates of a single item happen while
- // an item is in the queue before it has been processed, it will only be
- // processed once, and when it is processed, the most recent version will be
- // processed. This can't be done with a channel.
- //
- // FIFO solves this use case:
- // * You want to process every object (exactly) once.
- // * You want to process the most recent version of the object when you process it.
- // * You do not want to process deleted objects, they should be removed from the queue.
- // * You do not want to periodically reprocess objects.
- // Compare with DeltaFIFO for other use cases.
- type FIFO struct {
- lock sync.RWMutex
- cond sync.Cond
- // We depend on the property that items in the set are in the queue and vice versa.
- items map[string]interface{}
- queue []string
- // populated is true if the first batch of items inserted by Replace() has been populated
- // or Delete/Add/Update was called first.
- populated bool
- // initialPopulationCount is the number of items inserted by the first call of Replace()
- initialPopulationCount int
- // keyFunc is used to make the key used for queued item insertion and retrieval, and
- // should be deterministic.
- keyFunc KeyFunc
- // Indication the queue is closed.
- // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
- // Currently, not used to gate any of CRED operations.
- closed bool
- closedLock sync.Mutex
- }
- var (
- _ = Queue(&FIFO{}) // FIFO is a Queue
- )
- // Close the queue.
- func (f *FIFO) Close() {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- f.closed = true
- f.cond.Broadcast()
- }
- // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
- // or an Update called first but the first batch of items inserted by Replace() has been popped
- func (f *FIFO) HasSynced() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.populated && f.initialPopulationCount == 0
- }
- // Add inserts an item, and puts it in the queue. The item is only enqueued
- // if it doesn't already exist in the set.
- func (f *FIFO) Add(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- if _, exists := f.items[id]; !exists {
- f.queue = append(f.queue, id)
- }
- f.items[id] = obj
- f.cond.Broadcast()
- return nil
- }
- // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
- // present in the set, it is neither enqueued nor added to the set.
- //
- // This is useful in a single producer/consumer scenario so that the consumer can
- // safely retry items without contending with the producer and potentially enqueueing
- // stale items.
- func (f *FIFO) AddIfNotPresent(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.addIfNotPresent(id, obj)
- return nil
- }
- // addIfNotPresent assumes the fifo lock is already held and adds the the provided
- // item to the queue under id if it does not already exist.
- func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
- f.populated = true
- if _, exists := f.items[id]; exists {
- return
- }
- f.queue = append(f.queue, id)
- f.items[id] = obj
- f.cond.Broadcast()
- }
- // Update is the same as Add in this implementation.
- func (f *FIFO) Update(obj interface{}) error {
- return f.Add(obj)
- }
- // Delete removes an item. It doesn't add it to the queue, because
- // this implementation assumes the consumer only cares about the objects,
- // not the order in which they were created/added.
- func (f *FIFO) Delete(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- delete(f.items, id)
- return err
- }
- // List returns a list of all the items.
- func (f *FIFO) List() []interface{} {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]interface{}, 0, len(f.items))
- for _, item := range f.items {
- list = append(list, item)
- }
- return list
- }
- // ListKeys returns a list of all the keys of the objects currently
- // in the FIFO.
- func (f *FIFO) ListKeys() []string {
- f.lock.RLock()
- defer f.lock.RUnlock()
- list := make([]string, 0, len(f.items))
- for key := range f.items {
- list = append(list, key)
- }
- return list
- }
- // Get returns the requested item, or sets exists=false.
- func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
- key, err := f.keyFunc(obj)
- if err != nil {
- return nil, false, KeyError{obj, err}
- }
- return f.GetByKey(key)
- }
- // GetByKey returns the requested item, or sets exists=false.
- func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
- f.lock.RLock()
- defer f.lock.RUnlock()
- item, exists = f.items[key]
- return item, exists, nil
- }
- // Checks if the queue is closed
- func (f *FIFO) IsClosed() bool {
- f.closedLock.Lock()
- defer f.closedLock.Unlock()
- if f.closed {
- return true
- }
- return false
- }
- // Pop waits until an item is ready and processes it. If multiple items are
- // ready, they are returned in the order in which they were added/updated.
- // The item is removed from the queue (and the store) before it is processed,
- // so if you don't successfully process it, it should be added back with
- // AddIfNotPresent(). process function is called under lock, so it is safe
- // update data structures in it that need to be in sync with the queue.
- func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- for {
- for len(f.queue) == 0 {
- // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
- // When Close() is called, the f.closed is set and the condition is broadcasted.
- // Which causes this loop to continue and return from the Pop().
- if f.IsClosed() {
- return nil, FIFOClosedError
- }
- f.cond.Wait()
- }
- id := f.queue[0]
- f.queue = f.queue[1:]
- if f.initialPopulationCount > 0 {
- f.initialPopulationCount--
- }
- item, ok := f.items[id]
- if !ok {
- // Item may have been deleted subsequently.
- continue
- }
- delete(f.items, id)
- err := process(item)
- if e, ok := err.(ErrRequeue); ok {
- f.addIfNotPresent(id, item)
- err = e.Err
- }
- return item, err
- }
- }
- // Replace will delete the contents of 'f', using instead the given map.
- // 'f' takes ownership of the map, you should not reference the map again
- // after calling this function. f's queue is reset, too; upon return, it
- // will contain the items in the map, in no particular order.
- func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
- items := map[string]interface{}{}
- for _, item := range list {
- key, err := f.keyFunc(item)
- if err != nil {
- return KeyError{item, err}
- }
- items[key] = item
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(items)
- }
- f.items = items
- f.queue = f.queue[:0]
- for id := range items {
- f.queue = append(f.queue, id)
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
- }
- // Resync will touch all objects to put them into the processing queue
- func (f *FIFO) Resync() error {
- f.lock.Lock()
- defer f.lock.Unlock()
- inQueue := sets.NewString()
- for _, id := range f.queue {
- inQueue.Insert(id)
- }
- for id := range f.items {
- if !inQueue.Has(id) {
- f.queue = append(f.queue, id)
- }
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
- }
- // NewFIFO returns a Store which can be used to queue up items to
- // process.
- func NewFIFO(keyFunc KeyFunc) *FIFO {
- f := &FIFO{
- items: map[string]interface{}{},
- queue: []string{},
- keyFunc: keyFunc,
- }
- f.cond.L = &f.lock
- return f
- }
|