controller_base.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package persistentvolume
  14. import (
  15. "fmt"
  16. "strconv"
  17. "time"
  18. "k8s.io/kubernetes/pkg/api"
  19. "k8s.io/kubernetes/pkg/api/errors"
  20. "k8s.io/kubernetes/pkg/api/meta"
  21. "k8s.io/kubernetes/pkg/apis/extensions"
  22. "k8s.io/kubernetes/pkg/client/cache"
  23. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  24. unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  25. "k8s.io/kubernetes/pkg/client/record"
  26. "k8s.io/kubernetes/pkg/cloudprovider"
  27. "k8s.io/kubernetes/pkg/controller/framework"
  28. "k8s.io/kubernetes/pkg/conversion"
  29. "k8s.io/kubernetes/pkg/runtime"
  30. "k8s.io/kubernetes/pkg/util/goroutinemap"
  31. vol "k8s.io/kubernetes/pkg/volume"
  32. "k8s.io/kubernetes/pkg/watch"
  33. "github.com/golang/glog"
  34. )
  35. // This file contains the controller base functionality, i.e. framework to
  36. // process PV/PVC added/updated/deleted events. The real binding, provisioning,
  37. // recycling and deleting is done in controller.go
  38. // NewPersistentVolumeController creates a new PersistentVolumeController
  39. func NewPersistentVolumeController(
  40. kubeClient clientset.Interface,
  41. syncPeriod time.Duration,
  42. alphaProvisioner vol.ProvisionableVolumePlugin,
  43. volumePlugins []vol.VolumePlugin,
  44. cloud cloudprovider.Interface,
  45. clusterName string,
  46. volumeSource, claimSource, classSource cache.ListerWatcher,
  47. eventRecorder record.EventRecorder,
  48. enableDynamicProvisioning bool,
  49. ) *PersistentVolumeController {
  50. if eventRecorder == nil {
  51. broadcaster := record.NewBroadcaster()
  52. broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  53. eventRecorder = broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
  54. }
  55. controller := &PersistentVolumeController{
  56. volumes: newPersistentVolumeOrderedIndex(),
  57. claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
  58. kubeClient: kubeClient,
  59. eventRecorder: eventRecorder,
  60. runningOperations: goroutinemap.NewGoRoutineMap(false /* exponentialBackOffOnError */),
  61. cloud: cloud,
  62. enableDynamicProvisioning: enableDynamicProvisioning,
  63. clusterName: clusterName,
  64. createProvisionedPVRetryCount: createProvisionedPVRetryCount,
  65. createProvisionedPVInterval: createProvisionedPVInterval,
  66. alphaProvisioner: alphaProvisioner,
  67. }
  68. controller.volumePluginMgr.InitPlugins(volumePlugins, controller)
  69. if controller.alphaProvisioner != nil {
  70. if err := controller.alphaProvisioner.Init(controller); err != nil {
  71. glog.Errorf("PersistentVolumeController: error initializing alpha provisioner plugin: %v", err)
  72. }
  73. }
  74. if volumeSource == nil {
  75. volumeSource = &cache.ListWatch{
  76. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  77. return kubeClient.Core().PersistentVolumes().List(options)
  78. },
  79. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  80. return kubeClient.Core().PersistentVolumes().Watch(options)
  81. },
  82. }
  83. }
  84. controller.volumeSource = volumeSource
  85. if claimSource == nil {
  86. claimSource = &cache.ListWatch{
  87. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  88. return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
  89. },
  90. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  91. return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
  92. },
  93. }
  94. }
  95. controller.claimSource = claimSource
  96. if classSource == nil {
  97. classSource = &cache.ListWatch{
  98. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  99. return kubeClient.Extensions().StorageClasses().List(options)
  100. },
  101. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  102. return kubeClient.Extensions().StorageClasses().Watch(options)
  103. },
  104. }
  105. }
  106. controller.classSource = classSource
  107. _, controller.volumeController = framework.NewIndexerInformer(
  108. volumeSource,
  109. &api.PersistentVolume{},
  110. syncPeriod,
  111. framework.ResourceEventHandlerFuncs{
  112. AddFunc: controller.addVolume,
  113. UpdateFunc: controller.updateVolume,
  114. DeleteFunc: controller.deleteVolume,
  115. },
  116. cache.Indexers{"accessmodes": accessModesIndexFunc},
  117. )
  118. _, controller.claimController = framework.NewInformer(
  119. claimSource,
  120. &api.PersistentVolumeClaim{},
  121. syncPeriod,
  122. framework.ResourceEventHandlerFuncs{
  123. AddFunc: controller.addClaim,
  124. UpdateFunc: controller.updateClaim,
  125. DeleteFunc: controller.deleteClaim,
  126. },
  127. )
  128. // This is just a cache of StorageClass instances, no special actions are
  129. // needed when a class is created/deleted/updated.
  130. controller.classes = cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
  131. controller.classReflector = cache.NewReflector(
  132. classSource,
  133. &extensions.StorageClass{},
  134. controller.classes,
  135. syncPeriod,
  136. )
  137. return controller
  138. }
  139. // initializeCaches fills all controller caches with initial data from etcd in
  140. // order to have the caches already filled when first addClaim/addVolume to
  141. // perform initial synchronization of the controller.
  142. func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
  143. volumeListObj, err := volumeSource.List(api.ListOptions{})
  144. if err != nil {
  145. glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
  146. return
  147. }
  148. volumeList, ok := volumeListObj.(*api.PersistentVolumeList)
  149. if !ok {
  150. glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj)
  151. return
  152. }
  153. for _, volume := range volumeList.Items {
  154. // Ignore template volumes from kubernetes 1.2
  155. deleted := ctrl.upgradeVolumeFrom1_2(&volume)
  156. if !deleted {
  157. clone, err := conversion.NewCloner().DeepCopy(&volume)
  158. if err != nil {
  159. glog.Errorf("error cloning volume %q: %v", volume.Name, err)
  160. continue
  161. }
  162. volumeClone := clone.(*api.PersistentVolume)
  163. ctrl.storeVolumeUpdate(volumeClone)
  164. }
  165. }
  166. claimListObj, err := claimSource.List(api.ListOptions{})
  167. if err != nil {
  168. glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
  169. return
  170. }
  171. claimList, ok := claimListObj.(*api.PersistentVolumeClaimList)
  172. if !ok {
  173. glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %#v", claimListObj)
  174. return
  175. }
  176. for _, claim := range claimList.Items {
  177. clone, err := conversion.NewCloner().DeepCopy(&claim)
  178. if err != nil {
  179. glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err)
  180. continue
  181. }
  182. claimClone := clone.(*api.PersistentVolumeClaim)
  183. ctrl.storeClaimUpdate(claimClone)
  184. }
  185. glog.V(4).Infof("controller initialized")
  186. }
  187. func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume *api.PersistentVolume) (bool, error) {
  188. return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
  189. }
  190. func (ctrl *PersistentVolumeController) storeClaimUpdate(claim *api.PersistentVolumeClaim) (bool, error) {
  191. return storeObjectUpdate(ctrl.claims, claim, "claim")
  192. }
  193. // addVolume is callback from framework.Controller watching PersistentVolume
  194. // events.
  195. func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
  196. pv, ok := obj.(*api.PersistentVolume)
  197. if !ok {
  198. glog.Errorf("expected PersistentVolume but handler received %#v", obj)
  199. return
  200. }
  201. if ctrl.upgradeVolumeFrom1_2(pv) {
  202. // volume deleted
  203. return
  204. }
  205. // Store the new volume version in the cache and do not process it if this
  206. // is an old version.
  207. new, err := ctrl.storeVolumeUpdate(pv)
  208. if err != nil {
  209. glog.Errorf("%v", err)
  210. }
  211. if !new {
  212. return
  213. }
  214. if err := ctrl.syncVolume(pv); err != nil {
  215. if errors.IsConflict(err) {
  216. // Version conflict error happens quite often and the controller
  217. // recovers from it easily.
  218. glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
  219. } else {
  220. glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
  221. }
  222. }
  223. }
  224. // updateVolume is callback from framework.Controller watching PersistentVolume
  225. // events.
  226. func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
  227. newVolume, ok := newObj.(*api.PersistentVolume)
  228. if !ok {
  229. glog.Errorf("Expected PersistentVolume but handler received %#v", newObj)
  230. return
  231. }
  232. if ctrl.upgradeVolumeFrom1_2(newVolume) {
  233. // volume deleted
  234. return
  235. }
  236. // Store the new volume version in the cache and do not process it if this
  237. // is an old version.
  238. new, err := ctrl.storeVolumeUpdate(newVolume)
  239. if err != nil {
  240. glog.Errorf("%v", err)
  241. }
  242. if !new {
  243. return
  244. }
  245. if err := ctrl.syncVolume(newVolume); err != nil {
  246. if errors.IsConflict(err) {
  247. // Version conflict error happens quite often and the controller
  248. // recovers from it easily.
  249. glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
  250. } else {
  251. glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
  252. }
  253. }
  254. }
  255. // deleteVolume is callback from framework.Controller watching PersistentVolume
  256. // events.
  257. func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
  258. _ = ctrl.volumes.store.Delete(obj)
  259. var volume *api.PersistentVolume
  260. var ok bool
  261. volume, ok = obj.(*api.PersistentVolume)
  262. if !ok {
  263. if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
  264. volume, ok = unknown.Obj.(*api.PersistentVolume)
  265. if !ok {
  266. glog.Errorf("Expected PersistentVolume but deleteVolume received %#v", unknown.Obj)
  267. return
  268. }
  269. } else {
  270. glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj)
  271. return
  272. }
  273. }
  274. if volume == nil || volume.Spec.ClaimRef == nil {
  275. return
  276. }
  277. glog.V(4).Infof("volume %q deleted", volume.Name)
  278. if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
  279. if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
  280. // sync the claim when its volume is deleted. Explicitly syncing the
  281. // claim here in response to volume deletion prevents the claim from
  282. // waiting until the next sync period for its Lost status.
  283. err := ctrl.syncClaim(claim)
  284. if err != nil {
  285. if errors.IsConflict(err) {
  286. // Version conflict error happens quite often and the
  287. // controller recovers from it easily.
  288. glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err)
  289. } else {
  290. glog.Errorf("PersistentVolumeController could not update volume %q from deleteVolume handler: %+v", claimToClaimKey(claim), err)
  291. }
  292. }
  293. } else {
  294. glog.Errorf("Cannot convert object from claim cache to claim %q!?: %#v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj)
  295. }
  296. }
  297. }
  298. // addClaim is callback from framework.Controller watching PersistentVolumeClaim
  299. // events.
  300. func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
  301. // Store the new claim version in the cache and do not process it if this is
  302. // an old version.
  303. claim, ok := obj.(*api.PersistentVolumeClaim)
  304. if !ok {
  305. glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
  306. return
  307. }
  308. new, err := ctrl.storeClaimUpdate(claim)
  309. if err != nil {
  310. glog.Errorf("%v", err)
  311. }
  312. if !new {
  313. return
  314. }
  315. if err := ctrl.syncClaim(claim); err != nil {
  316. if errors.IsConflict(err) {
  317. // Version conflict error happens quite often and the controller
  318. // recovers from it easily.
  319. glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
  320. } else {
  321. glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
  322. }
  323. }
  324. }
  325. // updateClaim is callback from framework.Controller watching PersistentVolumeClaim
  326. // events.
  327. func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
  328. // Store the new claim version in the cache and do not process it if this is
  329. // an old version.
  330. newClaim, ok := newObj.(*api.PersistentVolumeClaim)
  331. if !ok {
  332. glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
  333. return
  334. }
  335. new, err := ctrl.storeClaimUpdate(newClaim)
  336. if err != nil {
  337. glog.Errorf("%v", err)
  338. }
  339. if !new {
  340. return
  341. }
  342. if err := ctrl.syncClaim(newClaim); err != nil {
  343. if errors.IsConflict(err) {
  344. // Version conflict error happens quite often and the controller
  345. // recovers from it easily.
  346. glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
  347. } else {
  348. glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
  349. }
  350. }
  351. }
  352. // deleteClaim is callback from framework.Controller watching PersistentVolumeClaim
  353. // events.
  354. func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
  355. _ = ctrl.claims.Delete(obj)
  356. var volume *api.PersistentVolume
  357. var claim *api.PersistentVolumeClaim
  358. var ok bool
  359. claim, ok = obj.(*api.PersistentVolumeClaim)
  360. if !ok {
  361. if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
  362. claim, ok = unknown.Obj.(*api.PersistentVolumeClaim)
  363. if !ok {
  364. glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", unknown.Obj)
  365. return
  366. }
  367. } else {
  368. glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %#v", obj)
  369. return
  370. }
  371. }
  372. if !ok || claim == nil {
  373. return
  374. }
  375. glog.V(4).Infof("claim %q deleted", claimToClaimKey(claim))
  376. if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
  377. if volume, ok = pvObj.(*api.PersistentVolume); ok {
  378. // sync the volume when its claim is deleted. Explicitly sync'ing the
  379. // volume here in response to claim deletion prevents the volume from
  380. // waiting until the next sync period for its Release.
  381. if volume != nil {
  382. err := ctrl.syncVolume(volume)
  383. if err != nil {
  384. if errors.IsConflict(err) {
  385. // Version conflict error happens quite often and the
  386. // controller recovers from it easily.
  387. glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
  388. } else {
  389. glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
  390. }
  391. }
  392. }
  393. } else {
  394. glog.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, pvObj)
  395. }
  396. }
  397. }
  398. // Run starts all of this controller's control loops
  399. func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
  400. glog.V(4).Infof("starting PersistentVolumeController")
  401. ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
  402. go ctrl.volumeController.Run(stopCh)
  403. go ctrl.claimController.Run(stopCh)
  404. go ctrl.classReflector.RunUntil(stopCh)
  405. }
  406. const (
  407. // these pair of constants are used by the provisioner in Kubernetes 1.2.
  408. pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
  409. pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
  410. )
  411. // upgradeVolumeFrom1_2 updates PV from Kubernetes 1.2 to 1.3 and newer. In 1.2,
  412. // we used template PersistentVolume instances for dynamic provisioning. In 1.3
  413. // and later, these template (and not provisioned) instances must be removed to
  414. // make the controller to provision a new PV.
  415. // It returns true if the volume was deleted.
  416. // TODO: remove this function when upgrade from 1.2 becomes unsupported.
  417. func (ctrl *PersistentVolumeController) upgradeVolumeFrom1_2(volume *api.PersistentVolume) bool {
  418. annValue, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
  419. if !found {
  420. // The volume is not template
  421. return false
  422. }
  423. if annValue == pvProvisioningCompletedAnnotationValue {
  424. // The volume is already fully provisioned. The new controller will
  425. // ignore this annotation and it will obey its ReclaimPolicy, which is
  426. // likely to delete the volume when appropriate claim is deleted.
  427. return false
  428. }
  429. glog.V(2).Infof("deleting unprovisioned template volume %q from Kubernetes 1.2.", volume.Name)
  430. err := ctrl.kubeClient.Core().PersistentVolumes().Delete(volume.Name, nil)
  431. if err != nil {
  432. glog.Errorf("cannot delete unprovisioned template volume %q: %v", volume.Name, err)
  433. }
  434. // Remove from local cache
  435. err = ctrl.volumes.store.Delete(volume)
  436. if err != nil {
  437. glog.Errorf("cannot remove volume %q from local cache: %v", volume.Name, err)
  438. }
  439. return true
  440. }
  441. // Stateless functions
  442. func hasAnnotation(obj api.ObjectMeta, ann string) bool {
  443. _, found := obj.Annotations[ann]
  444. return found
  445. }
  446. func setAnnotation(obj *api.ObjectMeta, ann string, value string) {
  447. if obj.Annotations == nil {
  448. obj.Annotations = make(map[string]string)
  449. }
  450. obj.Annotations[ann] = value
  451. }
  452. func getClaimStatusForLogging(claim *api.PersistentVolumeClaim) string {
  453. bound := hasAnnotation(claim.ObjectMeta, annBindCompleted)
  454. boundByController := hasAnnotation(claim.ObjectMeta, annBoundByController)
  455. return fmt.Sprintf("phase: %s, bound to: %q, bindCompleted: %v, boundByController: %v", claim.Status.Phase, claim.Spec.VolumeName, bound, boundByController)
  456. }
  457. func getVolumeStatusForLogging(volume *api.PersistentVolume) string {
  458. boundByController := hasAnnotation(volume.ObjectMeta, annBoundByController)
  459. claimName := ""
  460. if volume.Spec.ClaimRef != nil {
  461. claimName = fmt.Sprintf("%s/%s (uid: %s)", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, volume.Spec.ClaimRef.UID)
  462. }
  463. return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
  464. }
  465. // isVolumeBoundToClaim returns true, if given volume is pre-bound or bound
  466. // to specific claim. Both claim.Name and claim.Namespace must be equal.
  467. // If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
  468. func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) bool {
  469. if volume.Spec.ClaimRef == nil {
  470. return false
  471. }
  472. if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
  473. return false
  474. }
  475. if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
  476. return false
  477. }
  478. return true
  479. }
  480. // storeObjectUpdate updates given cache with a new object version from Informer
  481. // callback (i.e. with events from etcd) or with an object modified by the
  482. // controller itself. Returns "true", if the cache was updated, false if the
  483. // object is an old version and should be ignored.
  484. func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
  485. objAccessor, err := meta.Accessor(obj)
  486. if err != nil {
  487. return false, fmt.Errorf("Error reading cache of %s: %v", className, err)
  488. }
  489. objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName()
  490. oldObj, found, err := store.Get(obj)
  491. if err != nil {
  492. return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
  493. }
  494. if !found {
  495. // This is a new object
  496. glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
  497. if err = store.Add(obj); err != nil {
  498. return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err)
  499. }
  500. return true, nil
  501. }
  502. oldObjAccessor, err := meta.Accessor(oldObj)
  503. if err != nil {
  504. return false, err
  505. }
  506. objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
  507. if err != nil {
  508. return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
  509. }
  510. oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
  511. if err != nil {
  512. return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
  513. }
  514. // Throw away only older version, let the same version pass - we do want to
  515. // get periodic sync events.
  516. if oldObjResourceVersion > objResourceVersion {
  517. glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
  518. return false, nil
  519. }
  520. glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
  521. if err = store.Update(obj); err != nil {
  522. return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err)
  523. }
  524. return true, nil
  525. }
  526. // getVolumeClass returns value of annClass annotation or empty string in case
  527. // the annotation does not exist.
  528. // TODO: change to PersistentVolume.Spec.Class value when this attribute is
  529. // introduced.
  530. func getVolumeClass(volume *api.PersistentVolume) string {
  531. if class, found := volume.Annotations[annClass]; found {
  532. return class
  533. }
  534. // 'nil' is interpreted as "", i.e. the volume does not belong to any class.
  535. return ""
  536. }
  537. // getClaimClass returns name of class that is requested by given claim.
  538. // Request for `nil` class is interpreted as request for class "",
  539. // i.e. for a classless PV.
  540. func getClaimClass(claim *api.PersistentVolumeClaim) string {
  541. // TODO: change to PersistentVolumeClaim.Spec.Class value when this
  542. // attribute is introduced.
  543. if class, found := claim.Annotations[annClass]; found {
  544. return class
  545. }
  546. return ""
  547. }