cache.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package container
  14. import (
  15. "sync"
  16. "time"
  17. "k8s.io/kubernetes/pkg/types"
  18. )
  19. // Cache stores the PodStatus for the pods. It represents *all* the visible
  20. // pods/containers in the container runtime. All cache entries are at least as
  21. // new or newer than the global timestamp (set by UpdateTime()), while
  22. // individual entries may be slightly newer than the global timestamp. If a pod
  23. // has no states known by the runtime, Cache returns an empty PodStatus object
  24. // with ID populated.
  25. //
  26. // Cache provides two methods to retrive the PodStatus: the non-blocking Get()
  27. // and the blocking GetNewerThan() method. The component responsible for
  28. // populating the cache is expected to call Delete() to explicitly free the
  29. // cache entries.
  30. type Cache interface {
  31. Get(types.UID) (*PodStatus, error)
  32. Set(types.UID, *PodStatus, error, time.Time)
  33. // GetNewerThan is a blocking call that only returns the status
  34. // when it is newer than the given time.
  35. GetNewerThan(types.UID, time.Time) (*PodStatus, error)
  36. Delete(types.UID)
  37. UpdateTime(time.Time)
  38. }
  39. type data struct {
  40. // Status of the pod.
  41. status *PodStatus
  42. // Error got when trying to inspect the pod.
  43. err error
  44. // Time when the data was last modfied.
  45. modified time.Time
  46. }
  47. type subRecord struct {
  48. time time.Time
  49. ch chan *data
  50. }
  51. // cache implements Cache.
  52. type cache struct {
  53. // Lock which guards all internal data structures.
  54. lock sync.RWMutex
  55. // Map that stores the pod statuses.
  56. pods map[types.UID]*data
  57. // A global timestamp represents how fresh the cached data is. All
  58. // cache content is at the least newer than this timestamp. Note that the
  59. // timestamp is nil after initialization, and will only become non-nil when
  60. // it is ready to serve the cached statuses.
  61. timestamp *time.Time
  62. // Map that stores the subscriber records.
  63. subscribers map[types.UID][]*subRecord
  64. }
  65. // NewCache creates a pod cache.
  66. func NewCache() Cache {
  67. return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
  68. }
  69. // Get returns the PodStatus for the pod; callers are expected not to
  70. // modify the objects returned.
  71. func (c *cache) Get(id types.UID) (*PodStatus, error) {
  72. c.lock.RLock()
  73. defer c.lock.RUnlock()
  74. d := c.get(id)
  75. return d.status, d.err
  76. }
  77. func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
  78. ch := c.subscribe(id, minTime)
  79. d := <-ch
  80. return d.status, d.err
  81. }
  82. // Set sets the PodStatus for the pod.
  83. func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
  84. c.lock.Lock()
  85. defer c.lock.Unlock()
  86. defer c.notify(id, timestamp)
  87. c.pods[id] = &data{status: status, err: err, modified: timestamp}
  88. }
  89. // Delete removes the entry of the pod.
  90. func (c *cache) Delete(id types.UID) {
  91. c.lock.Lock()
  92. defer c.lock.Unlock()
  93. delete(c.pods, id)
  94. }
  95. // UpdateTime modifies the global timestamp of the cache and notify
  96. // subscribers if needed.
  97. func (c *cache) UpdateTime(timestamp time.Time) {
  98. c.lock.Lock()
  99. defer c.lock.Unlock()
  100. c.timestamp = &timestamp
  101. // Notify all the subscribers if the condition is met.
  102. for id := range c.subscribers {
  103. c.notify(id, *c.timestamp)
  104. }
  105. }
  106. func makeDefaultData(id types.UID) *data {
  107. return &data{status: &PodStatus{ID: id}, err: nil}
  108. }
  109. func (c *cache) get(id types.UID) *data {
  110. d, ok := c.pods[id]
  111. if !ok {
  112. // Cache should store *all* pod/container information known by the
  113. // container runtime. A cache miss indicates that there are no states
  114. // regarding the pod last time we queried the container runtime.
  115. // What this *really* means is that there are no visible pod/containers
  116. // associated with this pod. Simply return an default (mostly empty)
  117. // PodStatus to reflect this.
  118. return makeDefaultData(id)
  119. }
  120. return d
  121. }
  122. // getIfNewerThan returns the data it is newer than the given time.
  123. // Otherwise, it returns nil. The caller should acquire the lock.
  124. func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
  125. d, ok := c.pods[id]
  126. globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
  127. if !ok && globalTimestampIsNewer {
  128. // Status is not cached, but the global timestamp is newer than
  129. // minTime, return the default status.
  130. return makeDefaultData(id)
  131. }
  132. if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
  133. // Status is cached, return status if either of the following is true.
  134. // * status was modified after minTime
  135. // * the global timestamp of the cache is newer than minTime.
  136. return d
  137. }
  138. // The pod status is not ready.
  139. return nil
  140. }
  141. // notify sends notifications for pod with the given id, if the requirements
  142. // are met. Note that the caller should acquire the lock.
  143. func (c *cache) notify(id types.UID, timestamp time.Time) {
  144. list, ok := c.subscribers[id]
  145. if !ok {
  146. // No one to notify.
  147. return
  148. }
  149. newList := []*subRecord{}
  150. for i, r := range list {
  151. if timestamp.Before(r.time) {
  152. // Doesn't meet the time requirement; keep the record.
  153. newList = append(newList, list[i])
  154. continue
  155. }
  156. r.ch <- c.get(id)
  157. close(r.ch)
  158. }
  159. if len(newList) == 0 {
  160. delete(c.subscribers, id)
  161. } else {
  162. c.subscribers[id] = newList
  163. }
  164. }
  165. func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
  166. ch := make(chan *data, 1)
  167. c.lock.Lock()
  168. defer c.lock.Unlock()
  169. d := c.getIfNewerThan(id, timestamp)
  170. if d != nil {
  171. // If the cache entry is ready, send the data and return immediately.
  172. ch <- d
  173. return ch
  174. }
  175. // Add the subscription record.
  176. c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
  177. return ch
  178. }