generic.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pleg
  14. import (
  15. "fmt"
  16. "sync/atomic"
  17. "time"
  18. "github.com/golang/glog"
  19. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  20. "k8s.io/kubernetes/pkg/kubelet/metrics"
  21. "k8s.io/kubernetes/pkg/types"
  22. "k8s.io/kubernetes/pkg/util/clock"
  23. "k8s.io/kubernetes/pkg/util/sets"
  24. "k8s.io/kubernetes/pkg/util/wait"
  25. )
  26. // GenericPLEG is an extremely simple generic PLEG that relies solely on
  27. // periodic listing to discover container changes. It should be used
  28. // as temporary replacement for container runtimes do not support a proper
  29. // event generator yet.
  30. //
  31. // Note that GenericPLEG assumes that a container would not be created,
  32. // terminated, and garbage collected within one relist period. If such an
  33. // incident happens, GenenricPLEG would miss all events regarding this
  34. // container. In the case of relisting failure, the window may become longer.
  35. // Note that this assumption is not unique -- many kubelet internal components
  36. // rely on terminated containers as tombstones for bookkeeping purposes. The
  37. // garbage collector is implemented to work with such situations. However, to
  38. // guarantee that kubelet can handle missing container events, it is
  39. // recommended to set the relist period short and have an auxiliary, longer
  40. // periodic sync in kubelet as the safety net.
  41. type GenericPLEG struct {
  42. // The period for relisting.
  43. relistPeriod time.Duration
  44. // The container runtime.
  45. runtime kubecontainer.Runtime
  46. // The channel from which the subscriber listens events.
  47. eventChannel chan *PodLifecycleEvent
  48. // The internal cache for pod/container information.
  49. podRecords podRecords
  50. // Time of the last relisting.
  51. relistTime atomic.Value
  52. // Cache for storing the runtime states required for syncing pods.
  53. cache kubecontainer.Cache
  54. // For testability.
  55. clock clock.Clock
  56. // Pods that failed to have their status retrieved during a relist. These pods will be
  57. // retried during the next relisting.
  58. podsToReinspect map[types.UID]*kubecontainer.Pod
  59. }
  60. // plegContainerState has a one-to-one mapping to the
  61. // kubecontainer.ContainerState except for the non-existent state. This state
  62. // is introduced here to complete the state transition scenarios.
  63. type plegContainerState string
  64. const (
  65. plegContainerRunning plegContainerState = "running"
  66. plegContainerExited plegContainerState = "exited"
  67. plegContainerUnknown plegContainerState = "unknown"
  68. plegContainerNonExistent plegContainerState = "non-existent"
  69. )
  70. func convertState(state kubecontainer.ContainerState) plegContainerState {
  71. switch state {
  72. case kubecontainer.ContainerStateRunning:
  73. return plegContainerRunning
  74. case kubecontainer.ContainerStateExited:
  75. return plegContainerExited
  76. case kubecontainer.ContainerStateUnknown:
  77. return plegContainerUnknown
  78. default:
  79. panic(fmt.Sprintf("unrecognized container state: %v", state))
  80. }
  81. }
  82. type podRecord struct {
  83. old *kubecontainer.Pod
  84. current *kubecontainer.Pod
  85. }
  86. type podRecords map[types.UID]*podRecord
  87. func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
  88. relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
  89. return &GenericPLEG{
  90. relistPeriod: relistPeriod,
  91. runtime: runtime,
  92. eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
  93. podRecords: make(podRecords),
  94. cache: cache,
  95. clock: clock,
  96. }
  97. }
  98. // Returns a channel from which the subscriber can receive PodLifecycleEvent
  99. // events.
  100. // TODO: support multiple subscribers.
  101. func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
  102. return g.eventChannel
  103. }
  104. // Start spawns a goroutine to relist periodically.
  105. func (g *GenericPLEG) Start() {
  106. go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
  107. }
  108. func (g *GenericPLEG) Healthy() (bool, error) {
  109. relistTime := g.getRelistTime()
  110. // TODO: Evaluate if we can reduce this threshold.
  111. // The threshold needs to be greater than the relisting period + the
  112. // relisting time, which can vary significantly. Set a conservative
  113. // threshold so that we don't cause kubelet to be restarted unnecessarily.
  114. threshold := 2 * time.Minute
  115. if g.clock.Since(relistTime) > threshold {
  116. return false, fmt.Errorf("pleg was last seen active at %v", relistTime)
  117. }
  118. return true, nil
  119. }
  120. func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
  121. if newState == oldState {
  122. return nil
  123. }
  124. glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
  125. switch newState {
  126. case plegContainerRunning:
  127. return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
  128. case plegContainerExited:
  129. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
  130. case plegContainerUnknown:
  131. return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
  132. case plegContainerNonExistent:
  133. switch oldState {
  134. case plegContainerExited:
  135. // We already reported that the container died before.
  136. return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
  137. default:
  138. return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
  139. }
  140. default:
  141. panic(fmt.Sprintf("unrecognized container state: %v", newState))
  142. }
  143. }
  144. func (g *GenericPLEG) getRelistTime() time.Time {
  145. val := g.relistTime.Load()
  146. if val == nil {
  147. return time.Time{}
  148. }
  149. return val.(time.Time)
  150. }
  151. func (g *GenericPLEG) updateRelisTime(timestamp time.Time) {
  152. g.relistTime.Store(timestamp)
  153. }
  154. // relist queries the container runtime for list of pods/containers, compare
  155. // with the internal pods/containers, and generats events accordingly.
  156. func (g *GenericPLEG) relist() {
  157. glog.V(5).Infof("GenericPLEG: Relisting")
  158. if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
  159. metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
  160. }
  161. timestamp := g.clock.Now()
  162. // Update the relist time.
  163. g.updateRelisTime(timestamp)
  164. defer func() {
  165. metrics.PLEGRelistLatency.Observe(metrics.SinceInMicroseconds(timestamp))
  166. }()
  167. // Get all the pods.
  168. podList, err := g.runtime.GetPods(true)
  169. if err != nil {
  170. glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
  171. return
  172. }
  173. pods := kubecontainer.Pods(podList)
  174. g.podRecords.setCurrent(pods)
  175. // Compare the old and the current pods, and generate events.
  176. eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
  177. for pid := range g.podRecords {
  178. oldPod := g.podRecords.getOld(pid)
  179. pod := g.podRecords.getCurrent(pid)
  180. // Get all containers in the old and the new pod.
  181. allContainers := getContainersFromPods(oldPod, pod)
  182. for _, container := range allContainers {
  183. events := computeEvents(oldPod, pod, &container.ID)
  184. for _, e := range events {
  185. updateEvents(eventsByPodID, e)
  186. }
  187. }
  188. }
  189. var needsReinspection map[types.UID]*kubecontainer.Pod
  190. if g.cacheEnabled() {
  191. needsReinspection = make(map[types.UID]*kubecontainer.Pod)
  192. }
  193. // If there are events associated with a pod, we should update the
  194. // podCache.
  195. for pid, events := range eventsByPodID {
  196. pod := g.podRecords.getCurrent(pid)
  197. if g.cacheEnabled() {
  198. // updateCache() will inspect the pod and update the cache. If an
  199. // error occurs during the inspection, we want PLEG to retry again
  200. // in the next relist. To achieve this, we do not update the
  201. // associated podRecord of the pod, so that the change will be
  202. // detect again in the next relist.
  203. // TODO: If many pods changed during the same relist period,
  204. // inspecting the pod and getting the PodStatus to update the cache
  205. // serially may take a while. We should be aware of this and
  206. // parallelize if needed.
  207. if err := g.updateCache(pod, pid); err != nil {
  208. glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
  209. // make sure we try to reinspect the pod during the next relisting
  210. needsReinspection[pid] = pod
  211. continue
  212. } else if _, found := g.podsToReinspect[pid]; found {
  213. // this pod was in the list to reinspect and we did so because it had events, so remove it
  214. // from the list (we don't want the reinspection code below to inspect it a second time in
  215. // this relist execution)
  216. delete(g.podsToReinspect, pid)
  217. }
  218. }
  219. // Update the internal storage and send out the events.
  220. g.podRecords.update(pid)
  221. for i := range events {
  222. // Filter out events that are not reliable and no other components use yet.
  223. if events[i].Type == ContainerChanged {
  224. continue
  225. }
  226. g.eventChannel <- events[i]
  227. }
  228. }
  229. if g.cacheEnabled() {
  230. // reinspect any pods that failed inspection during the previous relist
  231. if len(g.podsToReinspect) > 0 {
  232. glog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
  233. for pid, pod := range g.podsToReinspect {
  234. if err := g.updateCache(pod, pid); err != nil {
  235. glog.Errorf("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
  236. needsReinspection[pid] = pod
  237. }
  238. }
  239. }
  240. // Update the cache timestamp. This needs to happen *after*
  241. // all pods have been properly updated in the cache.
  242. g.cache.UpdateTime(timestamp)
  243. }
  244. // make sure we retain the list of pods that need reinspecting the next time relist is called
  245. g.podsToReinspect = needsReinspection
  246. }
  247. func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
  248. cidSet := sets.NewString()
  249. var containers []*kubecontainer.Container
  250. for _, p := range pods {
  251. if p == nil {
  252. continue
  253. }
  254. for _, c := range p.Containers {
  255. cid := string(c.ID.ID)
  256. if cidSet.Has(cid) {
  257. continue
  258. }
  259. cidSet.Insert(cid)
  260. containers = append(containers, c)
  261. }
  262. }
  263. return containers
  264. }
  265. func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
  266. var pid types.UID
  267. if oldPod != nil {
  268. pid = oldPod.ID
  269. } else if newPod != nil {
  270. pid = newPod.ID
  271. }
  272. oldState := getContainerState(oldPod, cid)
  273. newState := getContainerState(newPod, cid)
  274. return generateEvents(pid, cid.ID, oldState, newState)
  275. }
  276. func (g *GenericPLEG) cacheEnabled() bool {
  277. return g.cache != nil
  278. }
  279. func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
  280. if pod == nil {
  281. // The pod is missing in the current relist. This means that
  282. // the pod has no visible (active or inactive) containers.
  283. glog.V(4).Infof("PLEG: Delete status for pod %q", string(pid))
  284. g.cache.Delete(pid)
  285. return nil
  286. }
  287. timestamp := g.clock.Now()
  288. // TODO: Consider adding a new runtime method
  289. // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
  290. // all containers again.
  291. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
  292. glog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err)
  293. g.cache.Set(pod.ID, status, err, timestamp)
  294. return err
  295. }
  296. func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
  297. if e == nil {
  298. return
  299. }
  300. eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
  301. }
  302. func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
  303. // Default to the non-existent state.
  304. state := plegContainerNonExistent
  305. if pod == nil {
  306. return state
  307. }
  308. container := pod.FindContainerByID(*cid)
  309. if container == nil {
  310. return state
  311. }
  312. return convertState(container.State)
  313. }
  314. func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
  315. r, ok := pr[id]
  316. if !ok {
  317. return nil
  318. }
  319. return r.old
  320. }
  321. func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
  322. r, ok := pr[id]
  323. if !ok {
  324. return nil
  325. }
  326. return r.current
  327. }
  328. func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
  329. for i := range pr {
  330. pr[i].current = nil
  331. }
  332. for _, pod := range pods {
  333. if r, ok := pr[pod.ID]; ok {
  334. r.current = pod
  335. } else {
  336. pr[pod.ID] = &podRecord{current: pod}
  337. }
  338. }
  339. }
  340. func (pr podRecords) update(id types.UID) {
  341. r, ok := pr[id]
  342. if !ok {
  343. return
  344. }
  345. pr.updateInternal(id, r)
  346. }
  347. func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
  348. if r.current == nil {
  349. // Pod no longer exists; delete the entry.
  350. delete(pr, id)
  351. return
  352. }
  353. r.old = r.current
  354. r.current = nil
  355. }