pod_workers.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "github.com/golang/glog"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/client/record"
  21. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  22. "k8s.io/kubernetes/pkg/kubelet/events"
  23. "k8s.io/kubernetes/pkg/kubelet/eviction"
  24. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  25. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  26. "k8s.io/kubernetes/pkg/types"
  27. "k8s.io/kubernetes/pkg/util/runtime"
  28. "k8s.io/kubernetes/pkg/util/wait"
  29. )
  30. // OnCompleteFunc is a function that is invoked when an operation completes.
  31. // If err is non-nil, the operation did not complete successfully.
  32. type OnCompleteFunc func(err error)
  33. // PodStatusFunc is a function that is invoked to generate a pod status.
  34. type PodStatusFunc func(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus
  35. // KillPodOptions are options when performing a pod update whose update type is kill.
  36. type KillPodOptions struct {
  37. // PodStatusFunc is the function to invoke to set pod status in response to a kill request.
  38. PodStatusFunc PodStatusFunc
  39. // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
  40. PodTerminationGracePeriodSecondsOverride *int64
  41. }
  42. // UpdatePodOptions is an options struct to pass to a UpdatePod operation.
  43. type UpdatePodOptions struct {
  44. // pod to update
  45. Pod *api.Pod
  46. // the mirror pod for the pod to update, if it is a static pod
  47. MirrorPod *api.Pod
  48. // the type of update (create, update, sync, kill)
  49. UpdateType kubetypes.SyncPodType
  50. // optional callback function when operation completes
  51. // this callback is not guaranteed to be completed since a pod worker may
  52. // drop update requests if it was fulfilling a previous request. this is
  53. // only guaranteed to be invoked in response to a kill pod request which is
  54. // always delivered.
  55. OnCompleteFunc OnCompleteFunc
  56. // if update type is kill, use the specified options to kill the pod.
  57. KillPodOptions *KillPodOptions
  58. }
  59. // PodWorkers is an abstract interface for testability.
  60. type PodWorkers interface {
  61. UpdatePod(options *UpdatePodOptions)
  62. ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
  63. ForgetWorker(uid types.UID)
  64. }
  65. // syncPodOptions provides the arguments to a SyncPod operation.
  66. type syncPodOptions struct {
  67. // the mirror pod for the pod to sync, if it is a static pod
  68. mirrorPod *api.Pod
  69. // pod to sync
  70. pod *api.Pod
  71. // the type of update (create, update, sync)
  72. updateType kubetypes.SyncPodType
  73. // the current status
  74. podStatus *kubecontainer.PodStatus
  75. // if update type is kill, use the specified options to kill the pod.
  76. killPodOptions *KillPodOptions
  77. }
  78. // the function to invoke to perform a sync.
  79. type syncPodFnType func(options syncPodOptions) error
  80. const (
  81. // jitter factor for resyncInterval
  82. workerResyncIntervalJitterFactor = 0.5
  83. // jitter factor for backOffPeriod
  84. workerBackOffPeriodJitterFactor = 0.5
  85. )
  86. type podWorkers struct {
  87. // Protects all per worker fields.
  88. podLock sync.Mutex
  89. // Tracks all running per-pod goroutines - per-pod goroutine will be
  90. // processing updates received through its corresponding channel.
  91. podUpdates map[types.UID]chan UpdatePodOptions
  92. // Track the current state of per-pod goroutines.
  93. // Currently all update request for a given pod coming when another
  94. // update of this pod is being processed are ignored.
  95. isWorking map[types.UID]bool
  96. // Tracks the last undelivered work item for this pod - a work item is
  97. // undelivered if it comes in while the worker is working.
  98. lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
  99. workQueue queue.WorkQueue
  100. // This function is run to sync the desired stated of pod.
  101. // NOTE: This function has to be thread-safe - it can be called for
  102. // different pods at the same time.
  103. syncPodFn syncPodFnType
  104. // The EventRecorder to use
  105. recorder record.EventRecorder
  106. // backOffPeriod is the duration to back off when there is a sync error.
  107. backOffPeriod time.Duration
  108. // resyncInterval is the duration to wait until the next sync.
  109. resyncInterval time.Duration
  110. // podCache stores kubecontainer.PodStatus for all pods.
  111. podCache kubecontainer.Cache
  112. }
  113. func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
  114. resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
  115. return &podWorkers{
  116. podUpdates: map[types.UID]chan UpdatePodOptions{},
  117. isWorking: map[types.UID]bool{},
  118. lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
  119. syncPodFn: syncPodFn,
  120. recorder: recorder,
  121. workQueue: workQueue,
  122. resyncInterval: resyncInterval,
  123. backOffPeriod: backOffPeriod,
  124. podCache: podCache,
  125. }
  126. }
  127. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
  128. var lastSyncTime time.Time
  129. for update := range podUpdates {
  130. err := func() error {
  131. podUID := update.Pod.UID
  132. // This is a blocking call that would return only if the cache
  133. // has an entry for the pod that is newer than minRuntimeCache
  134. // Time. This ensures the worker doesn't start syncing until
  135. // after the cache is at least newer than the finished time of
  136. // the previous sync.
  137. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
  138. if err != nil {
  139. return err
  140. }
  141. err = p.syncPodFn(syncPodOptions{
  142. mirrorPod: update.MirrorPod,
  143. pod: update.Pod,
  144. podStatus: status,
  145. killPodOptions: update.KillPodOptions,
  146. updateType: update.UpdateType,
  147. })
  148. lastSyncTime = time.Now()
  149. if err != nil {
  150. return err
  151. }
  152. return nil
  153. }()
  154. // notify the call-back function if the operation succeeded or not
  155. if update.OnCompleteFunc != nil {
  156. update.OnCompleteFunc(err)
  157. }
  158. if err != nil {
  159. glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err)
  160. p.recorder.Eventf(update.Pod, api.EventTypeWarning, events.FailedSync, "Error syncing pod, skipping: %v", err)
  161. }
  162. p.wrapUp(update.Pod.UID, err)
  163. }
  164. }
  165. // Apply the new setting to the specified pod.
  166. // If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
  167. // Update requests are ignored if a kill pod request is pending.
  168. func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  169. pod := options.Pod
  170. uid := pod.UID
  171. var podUpdates chan UpdatePodOptions
  172. var exists bool
  173. p.podLock.Lock()
  174. defer p.podLock.Unlock()
  175. if podUpdates, exists = p.podUpdates[uid]; !exists {
  176. // We need to have a buffer here, because checkForUpdates() method that
  177. // puts an update into channel is called from the same goroutine where
  178. // the channel is consumed. However, it is guaranteed that in such case
  179. // the channel is empty, so buffer of size 1 is enough.
  180. podUpdates = make(chan UpdatePodOptions, 1)
  181. p.podUpdates[uid] = podUpdates
  182. // Creating a new pod worker either means this is a new pod, or that the
  183. // kubelet just restarted. In either case the kubelet is willing to believe
  184. // the status of the pod for the first pod worker sync. See corresponding
  185. // comment in syncPod.
  186. go func() {
  187. defer runtime.HandleCrash()
  188. p.managePodLoop(podUpdates)
  189. }()
  190. }
  191. if !p.isWorking[pod.UID] {
  192. p.isWorking[pod.UID] = true
  193. podUpdates <- *options
  194. } else {
  195. // if a request to kill a pod is pending, we do not let anything overwrite that request.
  196. update, found := p.lastUndeliveredWorkUpdate[pod.UID]
  197. if !found || update.UpdateType != kubetypes.SyncPodKill {
  198. p.lastUndeliveredWorkUpdate[pod.UID] = *options
  199. }
  200. }
  201. }
  202. func (p *podWorkers) removeWorker(uid types.UID) {
  203. if ch, ok := p.podUpdates[uid]; ok {
  204. close(ch)
  205. delete(p.podUpdates, uid)
  206. // If there is an undelivered work update for this pod we need to remove it
  207. // since per-pod goroutine won't be able to put it to the already closed
  208. // channel when it finish processing the current work update.
  209. if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
  210. delete(p.lastUndeliveredWorkUpdate, uid)
  211. }
  212. }
  213. }
  214. func (p *podWorkers) ForgetWorker(uid types.UID) {
  215. p.podLock.Lock()
  216. defer p.podLock.Unlock()
  217. p.removeWorker(uid)
  218. }
  219. func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
  220. p.podLock.Lock()
  221. defer p.podLock.Unlock()
  222. for key := range p.podUpdates {
  223. if _, exists := desiredPods[key]; !exists {
  224. p.removeWorker(key)
  225. }
  226. }
  227. }
  228. func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
  229. // Requeue the last update if the last sync returned error.
  230. switch {
  231. case syncErr == nil:
  232. // No error; requeue at the regular resync interval.
  233. p.workQueue.Enqueue(uid, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
  234. default:
  235. // Error occurred during the sync; back off and then retry.
  236. p.workQueue.Enqueue(uid, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
  237. }
  238. p.checkForUpdates(uid)
  239. }
  240. func (p *podWorkers) checkForUpdates(uid types.UID) {
  241. p.podLock.Lock()
  242. defer p.podLock.Unlock()
  243. if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
  244. p.podUpdates[uid] <- workUpdate
  245. delete(p.lastUndeliveredWorkUpdate, uid)
  246. } else {
  247. p.isWorking[uid] = false
  248. }
  249. }
  250. // killPodNow returns a KillPodFunc that can be used to kill a pod.
  251. // It is intended to be injected into other modules that need to kill a pod.
  252. func killPodNow(podWorkers PodWorkers) eviction.KillPodFunc {
  253. return func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error {
  254. // determine the grace period to use when killing the pod
  255. gracePeriod := int64(0)
  256. if gracePeriodOverride != nil {
  257. gracePeriod = *gracePeriodOverride
  258. } else if pod.Spec.TerminationGracePeriodSeconds != nil {
  259. gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  260. }
  261. // we timeout and return an error if we don't get a callback within a reasonable time.
  262. // the default timeout is relative to the grace period (we settle on 2s to wait for kubelet->runtime traffic to complete in sigkill)
  263. timeout := int64(gracePeriod + (gracePeriod / 2))
  264. minTimeout := int64(2)
  265. if timeout < minTimeout {
  266. timeout = minTimeout
  267. }
  268. timeoutDuration := time.Duration(timeout) * time.Second
  269. // open a channel we block against until we get a result
  270. type response struct {
  271. err error
  272. }
  273. ch := make(chan response)
  274. podWorkers.UpdatePod(&UpdatePodOptions{
  275. Pod: pod,
  276. UpdateType: kubetypes.SyncPodKill,
  277. OnCompleteFunc: func(err error) {
  278. ch <- response{err: err}
  279. },
  280. KillPodOptions: &KillPodOptions{
  281. PodStatusFunc: func(p *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
  282. return status
  283. },
  284. PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
  285. },
  286. })
  287. // wait for either a response, or a timeout
  288. select {
  289. case r := <-ch:
  290. return r.err
  291. case <-time.After(timeoutDuration):
  292. return fmt.Errorf("timeout waiting to kill pod")
  293. }
  294. }
  295. }