pet.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package petset
  14. import (
  15. "fmt"
  16. "strconv"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/api/errors"
  19. "k8s.io/kubernetes/pkg/apis/apps"
  20. "k8s.io/kubernetes/pkg/client/record"
  21. client "k8s.io/kubernetes/pkg/client/unversioned"
  22. "k8s.io/kubernetes/pkg/runtime"
  23. "github.com/golang/glog"
  24. )
  25. // petLifeCycleEvent is used to communicate high level actions the controller
  26. // needs to take on a given pet. It's recorded in the pcb. The recognized values
  27. // are listed below.
  28. type petLifeCycleEvent string
  29. const (
  30. syncPet petLifeCycleEvent = "sync"
  31. deletePet petLifeCycleEvent = "delete"
  32. // updateRetries is the number of Get/Update cycles we perform when an
  33. // update fails.
  34. updateRetries = 3
  35. // PetSetInitAnnotation is an annotation which when set, indicates that the
  36. // pet has finished initializing itself.
  37. // TODO: Replace this with init container status.
  38. PetSetInitAnnotation = "pod.alpha.kubernetes.io/initialized"
  39. )
  40. // pcb is the control block used to transmit all updates about a single pet.
  41. // It serves as the manifest for a single pet. Users must populate the pod
  42. // and parent fields to pass it around safely.
  43. type pcb struct {
  44. // pod is the desired pet pod.
  45. pod *api.Pod
  46. // pvcs is a list of desired persistent volume claims for the pet pod.
  47. pvcs []api.PersistentVolumeClaim
  48. // event is the lifecycle event associated with this update.
  49. event petLifeCycleEvent
  50. // id is the identity index of this pet.
  51. id string
  52. // parent is a pointer to the parent petset.
  53. parent *apps.PetSet
  54. }
  55. // pvcClient is a client for managing persistent volume claims.
  56. type pvcClient interface {
  57. // DeletePVCs deletes the pvcs in the given pcb.
  58. DeletePVCs(*pcb) error
  59. // SyncPVCs creates/updates pvcs in the given pcb.
  60. SyncPVCs(*pcb) error
  61. }
  62. // petSyncer syncs a single pet.
  63. type petSyncer struct {
  64. petClient
  65. // blockingPet is an unhealthy pet either from this iteration or a previous
  66. // iteration, either because it is not yet Running, or being Deleted, that
  67. // prevents other creates/deletions.
  68. blockingPet *pcb
  69. }
  70. // Sync syncs the given pet.
  71. func (p *petSyncer) Sync(pet *pcb) error {
  72. if pet == nil {
  73. return nil
  74. }
  75. realPet, exists, err := p.Get(pet)
  76. if err != nil {
  77. return err
  78. }
  79. // There is not constraint except quota on the number of pvcs created.
  80. // This is done per pet so we get a working cluster ASAP, even if user
  81. // runs out of quota.
  82. if err := p.SyncPVCs(pet); err != nil {
  83. return err
  84. }
  85. if exists {
  86. if !p.isHealthy(realPet.pod) {
  87. glog.Infof("PetSet %v waiting on unhealthy pet %v", pet.parent.Name, realPet.pod.Name)
  88. }
  89. return p.Update(realPet, pet)
  90. }
  91. if p.blockingPet != nil {
  92. glog.Infof("Create of %v in PetSet %v blocked by unhealthy pet %v", pet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name)
  93. return nil
  94. }
  95. // This is counted as a create, even if it fails. We can't skip indices
  96. // because some pets might allocate a special role to earlier indices.
  97. // The returned error will force a requeue.
  98. // TODO: What's the desired behavior if pet-0 is deleted while pet-1 is
  99. // not yet healthy? currently pet-0 will wait till pet-1 is healthy,
  100. // this feels safer, but might lead to deadlock.
  101. p.blockingPet = pet
  102. if err := p.Create(pet); err != nil {
  103. return err
  104. }
  105. return nil
  106. }
  107. // Delete deletes the given pet, if no other pet in the petset is blocking a
  108. // scale event.
  109. func (p *petSyncer) Delete(pet *pcb) error {
  110. if pet == nil {
  111. return nil
  112. }
  113. realPet, exists, err := p.Get(pet)
  114. if err != nil {
  115. return err
  116. }
  117. if !exists {
  118. return nil
  119. }
  120. if p.blockingPet != nil {
  121. glog.Infof("Delete of %v in PetSet %v blocked by unhealthy pet %v", realPet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name)
  122. return nil
  123. }
  124. // This is counted as a delete, even if it fails.
  125. // The returned error will force a requeue.
  126. p.blockingPet = realPet
  127. if !p.isDying(realPet.pod) {
  128. glog.Infof("PetSet %v deleting pet %v", pet.parent.Name, pet.pod.Name)
  129. return p.petClient.Delete(pet)
  130. }
  131. glog.Infof("PetSet %v waiting on pet %v to die in %v", pet.parent.Name, realPet.pod.Name, realPet.pod.DeletionTimestamp)
  132. return nil
  133. }
  134. // petClient is a client for managing pets.
  135. type petClient interface {
  136. pvcClient
  137. petHealthChecker
  138. Delete(*pcb) error
  139. Get(*pcb) (*pcb, bool, error)
  140. Create(*pcb) error
  141. Update(*pcb, *pcb) error
  142. }
  143. // apiServerPetClient is a petset aware Kubernetes client.
  144. type apiServerPetClient struct {
  145. c *client.Client
  146. recorder record.EventRecorder
  147. petHealthChecker
  148. }
  149. // Get gets the pet in the pcb from the apiserver.
  150. func (p *apiServerPetClient) Get(pet *pcb) (*pcb, bool, error) {
  151. found := true
  152. ns := pet.parent.Namespace
  153. pod, err := podClient(p.c, ns).Get(pet.pod.Name)
  154. if errors.IsNotFound(err) {
  155. found = false
  156. err = nil
  157. }
  158. if err != nil || !found {
  159. return nil, found, err
  160. }
  161. realPet := *pet
  162. realPet.pod = pod
  163. return &realPet, true, nil
  164. }
  165. // Delete deletes the pet in the pcb from the apiserver.
  166. func (p *apiServerPetClient) Delete(pet *pcb) error {
  167. err := podClient(p.c, pet.parent.Namespace).Delete(pet.pod.Name, nil)
  168. if errors.IsNotFound(err) {
  169. err = nil
  170. }
  171. p.event(pet.parent, "Delete", fmt.Sprintf("pet: %v", pet.pod.Name), err)
  172. return err
  173. }
  174. // Create creates the pet in the pcb.
  175. func (p *apiServerPetClient) Create(pet *pcb) error {
  176. _, err := podClient(p.c, pet.parent.Namespace).Create(pet.pod)
  177. p.event(pet.parent, "Create", fmt.Sprintf("pet: %v", pet.pod.Name), err)
  178. return err
  179. }
  180. // Update updates the pet in the 'pet' pcb to match the pet in the 'expectedPet' pcb.
  181. func (p *apiServerPetClient) Update(pet *pcb, expectedPet *pcb) (updateErr error) {
  182. var getErr error
  183. pc := podClient(p.c, pet.parent.Namespace)
  184. pod, needsUpdate, err := copyPetID(pet, expectedPet)
  185. if err != nil || !needsUpdate {
  186. return err
  187. }
  188. glog.Infof("Resetting pet %v to match PetSet %v spec", pod.Name, pet.parent.Name)
  189. for i, p := 0, &pod; ; i++ {
  190. _, updateErr = pc.Update(p)
  191. if updateErr == nil || i >= updateRetries {
  192. return updateErr
  193. }
  194. if p, getErr = pc.Get(pod.Name); getErr != nil {
  195. return getErr
  196. }
  197. }
  198. }
  199. // DeletePVCs should delete PVCs, when implemented.
  200. func (p *apiServerPetClient) DeletePVCs(pet *pcb) error {
  201. // TODO: Implement this when we delete pvcs.
  202. return nil
  203. }
  204. func (p *apiServerPetClient) getPVC(pvcName, pvcNamespace string) (*api.PersistentVolumeClaim, bool, error) {
  205. found := true
  206. pvc, err := claimClient(p.c, pvcNamespace).Get(pvcName)
  207. if errors.IsNotFound(err) {
  208. found = false
  209. }
  210. if !found {
  211. return nil, found, nil
  212. } else if err != nil {
  213. return nil, found, err
  214. }
  215. return pvc, true, nil
  216. }
  217. func (p *apiServerPetClient) createPVC(pvc *api.PersistentVolumeClaim) error {
  218. _, err := claimClient(p.c, pvc.Namespace).Create(pvc)
  219. return err
  220. }
  221. // SyncPVCs syncs pvcs in the given pcb.
  222. func (p *apiServerPetClient) SyncPVCs(pet *pcb) error {
  223. errMsg := ""
  224. // Create new claims.
  225. for i, pvc := range pet.pvcs {
  226. _, exists, err := p.getPVC(pvc.Name, pet.parent.Namespace)
  227. if !exists {
  228. var err error
  229. if err = p.createPVC(&pet.pvcs[i]); err != nil {
  230. errMsg += fmt.Sprintf("Failed to create %v: %v", pvc.Name, err)
  231. }
  232. p.event(pet.parent, "Create", fmt.Sprintf("pvc: %v", pvc.Name), err)
  233. } else if err != nil {
  234. errMsg += fmt.Sprintf("Error trying to get pvc %v, %v.", pvc.Name, err)
  235. }
  236. // TODO: Check resource requirements and accessmodes, update if necessary
  237. }
  238. if len(errMsg) != 0 {
  239. return fmt.Errorf("%v", errMsg)
  240. }
  241. return nil
  242. }
  243. // event formats an event for the given runtime object.
  244. func (p *apiServerPetClient) event(obj runtime.Object, reason, msg string, err error) {
  245. if err != nil {
  246. p.recorder.Eventf(obj, api.EventTypeWarning, fmt.Sprintf("Failed%v", reason), fmt.Sprintf("%v, error: %v", msg, err))
  247. } else {
  248. p.recorder.Eventf(obj, api.EventTypeNormal, fmt.Sprintf("Successful%v", reason), msg)
  249. }
  250. }
  251. // petHealthChecker is an interface to check pet health. It makes a boolean
  252. // decision based on the given pod.
  253. type petHealthChecker interface {
  254. isHealthy(*api.Pod) bool
  255. isDying(*api.Pod) bool
  256. }
  257. // defaultPetHealthChecks does basic health checking.
  258. // It doesn't update, probe or get the pod.
  259. type defaultPetHealthChecker struct{}
  260. // isHealthy returns true if the pod is running and has the
  261. // "pod.alpha.kubernetes.io/initialized" set to "true".
  262. func (d *defaultPetHealthChecker) isHealthy(pod *api.Pod) bool {
  263. if pod == nil || pod.Status.Phase != api.PodRunning {
  264. return false
  265. }
  266. initialized, ok := pod.Annotations[PetSetInitAnnotation]
  267. if !ok {
  268. glog.Infof("PetSet pod %v in %v, waiting on annotation %v", api.PodRunning, pod.Name, PetSetInitAnnotation)
  269. return false
  270. }
  271. b, err := strconv.ParseBool(initialized)
  272. if err != nil {
  273. return false
  274. }
  275. return b && api.IsPodReady(pod)
  276. }
  277. // isDying returns true if the pod has a non-nil deletion timestamp. Since the
  278. // timestamp can only decrease, once this method returns true for a given pet, it
  279. // will never return false.
  280. func (d *defaultPetHealthChecker) isDying(pod *api.Pod) bool {
  281. return pod != nil && pod.DeletionTimestamp != nil
  282. }