scale.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubectl
  14. import (
  15. "fmt"
  16. "strconv"
  17. "time"
  18. "k8s.io/kubernetes/pkg/api"
  19. "k8s.io/kubernetes/pkg/api/errors"
  20. "k8s.io/kubernetes/pkg/api/unversioned"
  21. "k8s.io/kubernetes/pkg/apis/apps"
  22. "k8s.io/kubernetes/pkg/apis/batch"
  23. "k8s.io/kubernetes/pkg/apis/extensions"
  24. client "k8s.io/kubernetes/pkg/client/unversioned"
  25. "k8s.io/kubernetes/pkg/fields"
  26. "k8s.io/kubernetes/pkg/util/wait"
  27. "k8s.io/kubernetes/pkg/watch"
  28. )
  29. // Scaler provides an interface for resources that can be scaled.
  30. type Scaler interface {
  31. // Scale scales the named resource after checking preconditions. It optionally
  32. // retries in the event of resource version mismatch (if retry is not nil),
  33. // and optionally waits until the status of the resource matches newSize (if wait is not nil)
  34. Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error
  35. // ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but
  36. // a necessary building block for Scale
  37. ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error)
  38. }
  39. func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) {
  40. switch kind {
  41. case api.Kind("ReplicationController"):
  42. return &ReplicationControllerScaler{c}, nil
  43. case extensions.Kind("ReplicaSet"):
  44. return &ReplicaSetScaler{c.Extensions()}, nil
  45. case extensions.Kind("Job"), batch.Kind("Job"):
  46. return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface.
  47. case apps.Kind("PetSet"):
  48. return &PetSetScaler{c.Apps()}, nil
  49. case extensions.Kind("Deployment"):
  50. return &DeploymentScaler{c.Extensions()}, nil
  51. }
  52. return nil, fmt.Errorf("no scaler has been implemented for %q", kind)
  53. }
  54. // ScalePrecondition describes a condition that must be true for the scale to take place
  55. // If CurrentSize == -1, it is ignored.
  56. // If CurrentResourceVersion is the empty string, it is ignored.
  57. // Otherwise they must equal the values in the resource for it to be valid.
  58. type ScalePrecondition struct {
  59. Size int
  60. ResourceVersion string
  61. }
  62. // A PreconditionError is returned when a resource fails to match
  63. // the scale preconditions passed to kubectl.
  64. type PreconditionError struct {
  65. Precondition string
  66. ExpectedValue string
  67. ActualValue string
  68. }
  69. func (pe PreconditionError) Error() string {
  70. return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
  71. }
  72. type ScaleErrorType int
  73. const (
  74. ScaleGetFailure ScaleErrorType = iota
  75. ScaleUpdateFailure
  76. ScaleUpdateConflictFailure
  77. )
  78. // A ScaleError is returned when a scale request passes
  79. // preconditions but fails to actually scale the controller.
  80. type ScaleError struct {
  81. FailureType ScaleErrorType
  82. ResourceVersion string
  83. ActualError error
  84. }
  85. func (c ScaleError) Error() string {
  86. return fmt.Sprintf(
  87. "Scaling the resource failed with: %v; Current resource version %s",
  88. c.ActualError, c.ResourceVersion)
  89. }
  90. // RetryParams encapsulates the retry parameters used by kubectl's scaler.
  91. type RetryParams struct {
  92. Interval, Timeout time.Duration
  93. }
  94. func NewRetryParams(interval, timeout time.Duration) *RetryParams {
  95. return &RetryParams{interval, timeout}
  96. }
  97. // ScaleCondition is a closure around Scale that facilitates retries via util.wait
  98. func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc {
  99. return func() (bool, error) {
  100. rv, err := r.ScaleSimple(namespace, name, precondition, count)
  101. if updatedResourceVersion != nil {
  102. *updatedResourceVersion = rv
  103. }
  104. switch e, _ := err.(ScaleError); err.(type) {
  105. case nil:
  106. return true, nil
  107. case ScaleError:
  108. // Retry only on update conflicts.
  109. if e.FailureType == ScaleUpdateConflictFailure {
  110. return false, nil
  111. }
  112. }
  113. return false, err
  114. }
  115. }
  116. // ValidatePetSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
  117. func (precondition *ScalePrecondition) ValidatePetSet(ps *apps.PetSet) error {
  118. if precondition.Size != -1 && int(ps.Spec.Replicas) != precondition.Size {
  119. return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(ps.Spec.Replicas))}
  120. }
  121. if len(precondition.ResourceVersion) != 0 && ps.ResourceVersion != precondition.ResourceVersion {
  122. return PreconditionError{"resource version", precondition.ResourceVersion, ps.ResourceVersion}
  123. }
  124. return nil
  125. }
  126. // ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise
  127. func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error {
  128. if precondition.Size != -1 && int(controller.Spec.Replicas) != precondition.Size {
  129. return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(controller.Spec.Replicas))}
  130. }
  131. if len(precondition.ResourceVersion) != 0 && controller.ResourceVersion != precondition.ResourceVersion {
  132. return PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion}
  133. }
  134. return nil
  135. }
  136. type ReplicationControllerScaler struct {
  137. c client.Interface
  138. }
  139. // ScaleSimple does a simple one-shot attempt at scaling. It returns the
  140. // resourceVersion of the replication controller if the update is successful.
  141. func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
  142. controller, err := scaler.c.ReplicationControllers(namespace).Get(name)
  143. if err != nil {
  144. return "", ScaleError{ScaleGetFailure, "Unknown", err}
  145. }
  146. if preconditions != nil {
  147. if err := preconditions.ValidateReplicationController(controller); err != nil {
  148. return "", err
  149. }
  150. }
  151. controller.Spec.Replicas = int32(newSize)
  152. updatedRC, err := scaler.c.ReplicationControllers(namespace).Update(controller)
  153. if err != nil {
  154. if errors.IsConflict(err) {
  155. return "", ScaleError{ScaleUpdateConflictFailure, controller.ResourceVersion, err}
  156. }
  157. return "", ScaleError{ScaleUpdateFailure, controller.ResourceVersion, err}
  158. }
  159. return updatedRC.ObjectMeta.ResourceVersion, nil
  160. }
  161. // Scale updates a ReplicationController to a new size, with optional precondition check (if preconditions is not nil),
  162. // optional retries (if retry is not nil), and then optionally waits for it's replica count to reach the new value
  163. // (if wait is not nil).
  164. func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
  165. if preconditions == nil {
  166. preconditions = &ScalePrecondition{-1, ""}
  167. }
  168. if retry == nil {
  169. // Make it try only once, immediately
  170. retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
  171. }
  172. var updatedResourceVersion string
  173. cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, &updatedResourceVersion)
  174. if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
  175. return err
  176. }
  177. if waitForReplicas != nil {
  178. checkRC := func(rc *api.ReplicationController) bool {
  179. if uint(rc.Spec.Replicas) != newSize {
  180. // the size is changed by other party. Don't need to wait for the new change to complete.
  181. return true
  182. }
  183. return rc.Status.ObservedGeneration >= rc.Generation && rc.Status.Replicas == rc.Spec.Replicas
  184. }
  185. // If number of replicas doesn't change, then the update may not event
  186. // be sent to underlying databse (we don't send no-op changes).
  187. // In such case, <updatedResourceVersion> will have value of the most
  188. // recent update (which may be far in the past) so we may get "too old
  189. // RV" error from watch or potentially no ReplicationController events
  190. // will be deliver, since it may already be in the expected state.
  191. // To protect from these two, we first issue Get() to ensure that we
  192. // are not already in the expected state.
  193. currentRC, err := scaler.c.ReplicationControllers(namespace).Get(name)
  194. if err != nil {
  195. return err
  196. }
  197. if !checkRC(currentRC) {
  198. watchOptions := api.ListOptions{
  199. FieldSelector: fields.OneTermEqualSelector("metadata.name", name),
  200. ResourceVersion: updatedResourceVersion,
  201. }
  202. watcher, err := scaler.c.ReplicationControllers(namespace).Watch(watchOptions)
  203. if err != nil {
  204. return err
  205. }
  206. _, err = watch.Until(waitForReplicas.Timeout, watcher, func(event watch.Event) (bool, error) {
  207. if event.Type != watch.Added && event.Type != watch.Modified {
  208. return false, nil
  209. }
  210. return checkRC(event.Object.(*api.ReplicationController)), nil
  211. })
  212. if err == wait.ErrWaitTimeout {
  213. return fmt.Errorf("timed out waiting for %q to be synced", name)
  214. }
  215. return err
  216. }
  217. }
  218. return nil
  219. }
  220. // ValidateReplicaSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise
  221. func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions.ReplicaSet) error {
  222. if precondition.Size != -1 && int(replicaSet.Spec.Replicas) != precondition.Size {
  223. return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(replicaSet.Spec.Replicas))}
  224. }
  225. if len(precondition.ResourceVersion) != 0 && replicaSet.ResourceVersion != precondition.ResourceVersion {
  226. return PreconditionError{"resource version", precondition.ResourceVersion, replicaSet.ResourceVersion}
  227. }
  228. return nil
  229. }
  230. type ReplicaSetScaler struct {
  231. c client.ExtensionsInterface
  232. }
  233. // ScaleSimple does a simple one-shot attempt at scaling. It returns the
  234. // resourceVersion of the replicaset if the update is successful.
  235. func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
  236. rs, err := scaler.c.ReplicaSets(namespace).Get(name)
  237. if err != nil {
  238. return "", ScaleError{ScaleGetFailure, "Unknown", err}
  239. }
  240. if preconditions != nil {
  241. if err := preconditions.ValidateReplicaSet(rs); err != nil {
  242. return "", err
  243. }
  244. }
  245. rs.Spec.Replicas = int32(newSize)
  246. updatedRS, err := scaler.c.ReplicaSets(namespace).Update(rs)
  247. if err != nil {
  248. if errors.IsConflict(err) {
  249. return "", ScaleError{ScaleUpdateConflictFailure, rs.ResourceVersion, err}
  250. }
  251. return "", ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err}
  252. }
  253. return updatedRS.ObjectMeta.ResourceVersion, nil
  254. }
  255. // Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is
  256. // not nil), optional retries (if retry is not nil), and then optionally waits for it's replica
  257. // count to reach the new value (if wait is not nil).
  258. func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
  259. if preconditions == nil {
  260. preconditions = &ScalePrecondition{-1, ""}
  261. }
  262. if retry == nil {
  263. // Make it try only once, immediately
  264. retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
  265. }
  266. cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
  267. if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
  268. return err
  269. }
  270. if waitForReplicas != nil {
  271. rs, err := scaler.c.ReplicaSets(namespace).Get(name)
  272. if err != nil {
  273. return err
  274. }
  275. err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.ReplicaSetHasDesiredReplicas(scaler.c, rs))
  276. if err == wait.ErrWaitTimeout {
  277. return fmt.Errorf("timed out waiting for %q to be synced", name)
  278. }
  279. return err
  280. }
  281. return nil
  282. }
  283. // ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
  284. func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
  285. if precondition.Size != -1 && job.Spec.Parallelism == nil {
  286. return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
  287. }
  288. if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size {
  289. return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))}
  290. }
  291. if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
  292. return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
  293. }
  294. return nil
  295. }
  296. type PetSetScaler struct {
  297. c client.AppsInterface
  298. }
  299. // ScaleSimple does a simple one-shot attempt at scaling. It returns the
  300. // resourceVersion of the petset if the update is successful.
  301. func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
  302. ps, err := scaler.c.PetSets(namespace).Get(name)
  303. if err != nil {
  304. return "", ScaleError{ScaleGetFailure, "Unknown", err}
  305. }
  306. if preconditions != nil {
  307. if err := preconditions.ValidatePetSet(ps); err != nil {
  308. return "", err
  309. }
  310. }
  311. ps.Spec.Replicas = int(newSize)
  312. updatedPetSet, err := scaler.c.PetSets(namespace).Update(ps)
  313. if err != nil {
  314. if errors.IsConflict(err) {
  315. return "", ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err}
  316. }
  317. return "", ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err}
  318. }
  319. return updatedPetSet.ResourceVersion, nil
  320. }
  321. func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
  322. if preconditions == nil {
  323. preconditions = &ScalePrecondition{-1, ""}
  324. }
  325. if retry == nil {
  326. // Make it try only once, immediately
  327. retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
  328. }
  329. cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
  330. if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
  331. return err
  332. }
  333. if waitForReplicas != nil {
  334. job, err := scaler.c.PetSets(namespace).Get(name)
  335. if err != nil {
  336. return err
  337. }
  338. err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.PetSetHasDesiredPets(scaler.c, job))
  339. if err == wait.ErrWaitTimeout {
  340. return fmt.Errorf("timed out waiting for %q to be synced", name)
  341. }
  342. return err
  343. }
  344. return nil
  345. }
  346. type JobScaler struct {
  347. c client.BatchInterface
  348. }
  349. // ScaleSimple is responsible for updating job's parallelism. It returns the
  350. // resourceVersion of the job if the update is successful.
  351. func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
  352. job, err := scaler.c.Jobs(namespace).Get(name)
  353. if err != nil {
  354. return "", ScaleError{ScaleGetFailure, "Unknown", err}
  355. }
  356. if preconditions != nil {
  357. if err := preconditions.ValidateJob(job); err != nil {
  358. return "", err
  359. }
  360. }
  361. parallelism := int32(newSize)
  362. job.Spec.Parallelism = &parallelism
  363. udpatedJob, err := scaler.c.Jobs(namespace).Update(job)
  364. if err != nil {
  365. if errors.IsConflict(err) {
  366. return "", ScaleError{ScaleUpdateConflictFailure, job.ResourceVersion, err}
  367. }
  368. return "", ScaleError{ScaleUpdateFailure, job.ResourceVersion, err}
  369. }
  370. return udpatedJob.ObjectMeta.ResourceVersion, nil
  371. }
  372. // Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
  373. // optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
  374. // number, which can be less than requested based on job's current progress.
  375. func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
  376. if preconditions == nil {
  377. preconditions = &ScalePrecondition{-1, ""}
  378. }
  379. if retry == nil {
  380. // Make it try only once, immediately
  381. retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
  382. }
  383. cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
  384. if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
  385. return err
  386. }
  387. if waitForReplicas != nil {
  388. job, err := scaler.c.Jobs(namespace).Get(name)
  389. if err != nil {
  390. return err
  391. }
  392. err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.JobHasDesiredParallelism(scaler.c, job))
  393. if err == wait.ErrWaitTimeout {
  394. return fmt.Errorf("timed out waiting for %q to be synced", name)
  395. }
  396. return err
  397. }
  398. return nil
  399. }
  400. // ValidateDeployment ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
  401. func (precondition *ScalePrecondition) ValidateDeployment(deployment *extensions.Deployment) error {
  402. if precondition.Size != -1 && int(deployment.Spec.Replicas) != precondition.Size {
  403. return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(deployment.Spec.Replicas))}
  404. }
  405. if len(precondition.ResourceVersion) != 0 && deployment.ResourceVersion != precondition.ResourceVersion {
  406. return PreconditionError{"resource version", precondition.ResourceVersion, deployment.ResourceVersion}
  407. }
  408. return nil
  409. }
  410. type DeploymentScaler struct {
  411. c client.ExtensionsInterface
  412. }
  413. // ScaleSimple is responsible for updating a deployment's desired replicas
  414. // count. It returns the resourceVersion of the deployment if the update is
  415. // successful.
  416. func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
  417. deployment, err := scaler.c.Deployments(namespace).Get(name)
  418. if err != nil {
  419. return "", ScaleError{ScaleGetFailure, "Unknown", err}
  420. }
  421. if preconditions != nil {
  422. if err := preconditions.ValidateDeployment(deployment); err != nil {
  423. return "", err
  424. }
  425. }
  426. // TODO(madhusudancs): Fix this when Scale group issues are resolved (see issue #18528).
  427. // For now I'm falling back to regular Deployment update operation.
  428. deployment.Spec.Replicas = int32(newSize)
  429. updatedDeployment, err := scaler.c.Deployments(namespace).Update(deployment)
  430. if err != nil {
  431. if errors.IsConflict(err) {
  432. return "", ScaleError{ScaleUpdateConflictFailure, deployment.ResourceVersion, err}
  433. }
  434. return "", ScaleError{ScaleUpdateFailure, deployment.ResourceVersion, err}
  435. }
  436. return updatedDeployment.ObjectMeta.ResourceVersion, nil
  437. }
  438. // Scale updates a deployment to a new size, with optional precondition check (if preconditions is not nil),
  439. // optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
  440. func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
  441. if preconditions == nil {
  442. preconditions = &ScalePrecondition{-1, ""}
  443. }
  444. if retry == nil {
  445. // Make it try only once, immediately
  446. retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
  447. }
  448. cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
  449. if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
  450. return err
  451. }
  452. if waitForReplicas != nil {
  453. deployment, err := scaler.c.Deployments(namespace).Get(name)
  454. if err != nil {
  455. return err
  456. }
  457. err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.DeploymentHasDesiredReplicas(scaler.c, deployment))
  458. if err == wait.ErrWaitTimeout {
  459. return fmt.Errorf("timed out waiting for %q to be synced", name)
  460. }
  461. return err
  462. }
  463. return nil
  464. }