nodes_util.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "fmt"
  16. "path"
  17. "strings"
  18. "time"
  19. "k8s.io/kubernetes/pkg/api"
  20. client "k8s.io/kubernetes/pkg/client/unversioned"
  21. "k8s.io/kubernetes/pkg/fields"
  22. "k8s.io/kubernetes/pkg/util/wait"
  23. )
  24. // The following upgrade functions are passed into the framework below and used
  25. // to do the actual upgrades.
  26. var MasterUpgrade = func(v string) error {
  27. switch TestContext.Provider {
  28. case "gce":
  29. return masterUpgradeGCE(v)
  30. case "gke":
  31. return masterUpgradeGKE(v)
  32. default:
  33. return fmt.Errorf("MasterUpgrade() is not implemented for provider %s", TestContext.Provider)
  34. }
  35. }
  36. func masterUpgradeGCE(rawV string) error {
  37. v := "v" + rawV
  38. _, _, err := RunCmd(path.Join(TestContext.RepoRoot, "cluster/gce/upgrade.sh"), "-M", v)
  39. return err
  40. }
  41. func masterUpgradeGKE(v string) error {
  42. Logf("Upgrading master to %q", v)
  43. _, _, err := RunCmd("gcloud", "container",
  44. "clusters",
  45. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  46. fmt.Sprintf("--zone=%s", TestContext.CloudConfig.Zone),
  47. "upgrade",
  48. TestContext.CloudConfig.Cluster,
  49. "--master",
  50. fmt.Sprintf("--cluster-version=%s", v),
  51. "--quiet")
  52. return err
  53. }
  54. var NodeUpgrade = func(f *Framework, v string) error {
  55. // Perform the upgrade.
  56. var err error
  57. switch TestContext.Provider {
  58. case "gce":
  59. err = nodeUpgradeGCE(v)
  60. case "gke":
  61. err = nodeUpgradeGKE(v)
  62. default:
  63. err = fmt.Errorf("NodeUpgrade() is not implemented for provider %s", TestContext.Provider)
  64. }
  65. if err != nil {
  66. return err
  67. }
  68. // Wait for it to complete and validate nodes are healthy.
  69. //
  70. // TODO(ihmccreery) We shouldn't have to wait for nodes to be ready in
  71. // GKE; the operation shouldn't return until they all are.
  72. Logf("Waiting up to %v for all nodes to be ready after the upgrade", RestartNodeReadyAgainTimeout)
  73. if _, err := CheckNodesReady(f.Client, RestartNodeReadyAgainTimeout, TestContext.CloudConfig.NumNodes); err != nil {
  74. return err
  75. }
  76. return nil
  77. }
  78. func nodeUpgradeGCE(rawV string) error {
  79. // TODO(ihmccreery) This code path should be identical to how a user
  80. // would trigger a node update; right now it's very different.
  81. v := "v" + rawV
  82. Logf("Getting the node template before the upgrade")
  83. tmplBefore, err := MigTemplate()
  84. if err != nil {
  85. return fmt.Errorf("error getting the node template before the upgrade: %v", err)
  86. }
  87. Logf("Preparing node upgrade by creating new instance template for %q", v)
  88. stdout, _, err := RunCmd(path.Join(TestContext.RepoRoot, "cluster/gce/upgrade.sh"), "-P", v)
  89. if err != nil {
  90. cleanupNodeUpgradeGCE(tmplBefore)
  91. return fmt.Errorf("error preparing node upgrade: %v", err)
  92. }
  93. tmpl := strings.TrimSpace(stdout)
  94. Logf("Performing a node upgrade to %q; waiting at most %v per node", tmpl, RestartPerNodeTimeout)
  95. if err := MigRollingUpdate(tmpl, RestartPerNodeTimeout); err != nil {
  96. cleanupNodeUpgradeGCE(tmplBefore)
  97. return fmt.Errorf("error doing node upgrade via a MigRollingUpdate to %s: %v", tmpl, err)
  98. }
  99. return nil
  100. }
  101. // MigRollingUpdate starts a MIG rolling update, upgrading the nodes to a new
  102. // instance template named tmpl, and waits up to nt times the number of nodes
  103. // for it to complete.
  104. func MigRollingUpdate(tmpl string, nt time.Duration) error {
  105. Logf(fmt.Sprintf("starting the MIG rolling update to %s", tmpl))
  106. id, err := migRollingUpdateStart(tmpl, nt)
  107. if err != nil {
  108. return fmt.Errorf("couldn't start the MIG rolling update: %v", err)
  109. }
  110. Logf(fmt.Sprintf("polling the MIG rolling update (%s) until it completes", id))
  111. if err := migRollingUpdatePoll(id, nt); err != nil {
  112. return fmt.Errorf("err waiting until update completed: %v", err)
  113. }
  114. return nil
  115. }
  116. // migRollingUpdateStart (GCE/GKE-only) starts a MIG rolling update using templ
  117. // as the new template, waiting up to nt per node, and returns the ID of that
  118. // update.
  119. func migRollingUpdateStart(templ string, nt time.Duration) (string, error) {
  120. var errLast error
  121. var id string
  122. prefix, suffix := "Started [", "]."
  123. if err := wait.Poll(Poll, SingleCallTimeout, func() (bool, error) {
  124. // TODO(mikedanese): make this hit the compute API directly instead of
  125. // shelling out to gcloud.
  126. // NOTE(mikedanese): If you are changing this gcloud command, update
  127. // cluster/gce/upgrade.sh to match this EXACTLY.
  128. // A `rolling-updates start` call outputs what we want to stderr.
  129. _, output, err := retryCmd("gcloud", "alpha", "compute",
  130. "rolling-updates",
  131. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  132. fmt.Sprintf("--zone=%s", TestContext.CloudConfig.Zone),
  133. "start",
  134. // Required args.
  135. fmt.Sprintf("--group=%s", TestContext.CloudConfig.NodeInstanceGroup),
  136. fmt.Sprintf("--template=%s", templ),
  137. // Optional args to fine-tune behavior.
  138. fmt.Sprintf("--instance-startup-timeout=%ds", int(nt.Seconds())),
  139. // NOTE: We can speed up this process by increasing
  140. // --max-num-concurrent-instances.
  141. fmt.Sprintf("--max-num-concurrent-instances=%d", 1),
  142. fmt.Sprintf("--max-num-failed-instances=%d", 0),
  143. fmt.Sprintf("--min-instance-update-time=%ds", 0))
  144. if err != nil {
  145. errLast = fmt.Errorf("rolling-updates call failed with err: %v", err)
  146. return false, nil
  147. }
  148. // The 'start' call probably succeeded; parse the output and try to find
  149. // the line that looks like "Started [url/to/<id>]." and return <id>.
  150. for _, line := range strings.Split(output, "\n") {
  151. // As a sanity check, ensure the line starts with prefix and ends
  152. // with suffix.
  153. if strings.Index(line, prefix) != 0 || strings.Index(line, suffix) != len(line)-len(suffix) {
  154. continue
  155. }
  156. url := strings.Split(strings.TrimSuffix(strings.TrimPrefix(line, prefix), suffix), "/")
  157. id = url[len(url)-1]
  158. Logf("Started MIG rolling update; ID: %s", id)
  159. return true, nil
  160. }
  161. errLast = fmt.Errorf("couldn't find line like '%s ... %s' in output to MIG rolling-update start. Output: %s",
  162. prefix, suffix, output)
  163. return false, nil
  164. }); err != nil {
  165. return "", fmt.Errorf("migRollingUpdateStart() failed with last error: %v", errLast)
  166. }
  167. return id, nil
  168. }
  169. // migRollingUpdatePoll (CKE/GKE-only) polls the progress of the MIG rolling
  170. // update with ID id until it is complete. It returns an error if this takes
  171. // longer than nt times the number of nodes.
  172. func migRollingUpdatePoll(id string, nt time.Duration) error {
  173. // Two keys and a val.
  174. status, progress, done := "status", "statusMessage", "ROLLED_OUT"
  175. start, timeout := time.Now(), nt*time.Duration(TestContext.CloudConfig.NumNodes)
  176. var errLast error
  177. Logf("Waiting up to %v for MIG rolling update to complete.", timeout)
  178. if wait.Poll(RestartPoll, timeout, func() (bool, error) {
  179. // A `rolling-updates describe` call outputs what we want to stdout.
  180. output, _, err := retryCmd("gcloud", "alpha", "compute",
  181. "rolling-updates",
  182. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  183. fmt.Sprintf("--zone=%s", TestContext.CloudConfig.Zone),
  184. "describe",
  185. id)
  186. if err != nil {
  187. errLast = fmt.Errorf("Error calling rolling-updates describe %s: %v", id, err)
  188. Logf("%v", errLast)
  189. return false, nil
  190. }
  191. // The 'describe' call probably succeeded; parse the output and try to
  192. // find the line that looks like "status: <status>" and see whether it's
  193. // done.
  194. Logf("Waiting for MIG rolling update: %s (%v elapsed)",
  195. ParseKVLines(output, progress), time.Since(start))
  196. if st := ParseKVLines(output, status); st == done {
  197. return true, nil
  198. }
  199. return false, nil
  200. }) != nil {
  201. return fmt.Errorf("timeout waiting %v for MIG rolling update to complete. Last error: %v", timeout, errLast)
  202. }
  203. Logf("MIG rolling update complete after %v", time.Since(start))
  204. return nil
  205. }
  206. func cleanupNodeUpgradeGCE(tmplBefore string) {
  207. Logf("Cleaning up any unused node templates")
  208. tmplAfter, err := MigTemplate()
  209. if err != nil {
  210. Logf("Could not get node template post-upgrade; may have leaked template %s", tmplBefore)
  211. return
  212. }
  213. if tmplBefore == tmplAfter {
  214. // The node upgrade failed so there's no need to delete
  215. // anything.
  216. Logf("Node template %s is still in use; not cleaning up", tmplBefore)
  217. return
  218. }
  219. Logf("Deleting node template %s", tmplBefore)
  220. if _, _, err := retryCmd("gcloud", "compute", "instance-templates",
  221. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  222. "delete",
  223. tmplBefore); err != nil {
  224. Logf("gcloud compute instance-templates delete %s call failed with err: %v", tmplBefore, err)
  225. Logf("May have leaked instance template %q", tmplBefore)
  226. }
  227. }
  228. func nodeUpgradeGKE(v string) error {
  229. Logf("Upgrading nodes to %q", v)
  230. _, _, err := RunCmd("gcloud", "container",
  231. "clusters",
  232. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  233. fmt.Sprintf("--zone=%s", TestContext.CloudConfig.Zone),
  234. "upgrade",
  235. TestContext.CloudConfig.Cluster,
  236. fmt.Sprintf("--cluster-version=%s", v),
  237. "--quiet")
  238. return err
  239. }
  240. // CheckNodesReady waits up to nt for expect nodes accessed by c to be ready,
  241. // returning an error if this doesn't happen in time. It returns the names of
  242. // nodes it finds.
  243. func CheckNodesReady(c *client.Client, nt time.Duration, expect int) ([]string, error) {
  244. // First, keep getting all of the nodes until we get the number we expect.
  245. var nodeList *api.NodeList
  246. var errLast error
  247. start := time.Now()
  248. found := wait.Poll(Poll, nt, func() (bool, error) {
  249. // A rolling-update (GCE/GKE implementation of restart) can complete before the apiserver
  250. // knows about all of the nodes. Thus, we retry the list nodes call
  251. // until we get the expected number of nodes.
  252. nodeList, errLast = c.Nodes().List(api.ListOptions{
  253. FieldSelector: fields.Set{"spec.unschedulable": "false"}.AsSelector()})
  254. if errLast != nil {
  255. return false, nil
  256. }
  257. if len(nodeList.Items) != expect {
  258. errLast = fmt.Errorf("expected to find %d nodes but found only %d (%v elapsed)",
  259. expect, len(nodeList.Items), time.Since(start))
  260. Logf("%v", errLast)
  261. return false, nil
  262. }
  263. return true, nil
  264. }) == nil
  265. nodeNames := make([]string, len(nodeList.Items))
  266. for i, n := range nodeList.Items {
  267. nodeNames[i] = n.ObjectMeta.Name
  268. }
  269. if !found {
  270. return nodeNames, fmt.Errorf("couldn't find %d nodes within %v; last error: %v",
  271. expect, nt, errLast)
  272. }
  273. Logf("Successfully found %d nodes", expect)
  274. // Next, ensure in parallel that all the nodes are ready. We subtract the
  275. // time we spent waiting above.
  276. timeout := nt - time.Since(start)
  277. result := make(chan bool, len(nodeList.Items))
  278. for _, n := range nodeNames {
  279. n := n
  280. go func() { result <- WaitForNodeToBeReady(c, n, timeout) }()
  281. }
  282. failed := false
  283. // TODO(mbforbes): Change to `for range` syntax once we support only Go
  284. // >= 1.4.
  285. for i := range nodeList.Items {
  286. _ = i
  287. if !<-result {
  288. failed = true
  289. }
  290. }
  291. if failed {
  292. return nodeNames, fmt.Errorf("at least one node failed to be ready")
  293. }
  294. return nodeNames, nil
  295. }
  296. // MigTemplate (GCE-only) returns the name of the MIG template that the
  297. // nodes of the cluster use.
  298. func MigTemplate() (string, error) {
  299. var errLast error
  300. var templ string
  301. key := "instanceTemplate"
  302. if wait.Poll(Poll, SingleCallTimeout, func() (bool, error) {
  303. // TODO(mikedanese): make this hit the compute API directly instead of
  304. // shelling out to gcloud.
  305. // An `instance-groups managed describe` call outputs what we want to stdout.
  306. output, _, err := retryCmd("gcloud", "compute", "instance-groups", "managed",
  307. fmt.Sprintf("--project=%s", TestContext.CloudConfig.ProjectID),
  308. "describe",
  309. fmt.Sprintf("--zone=%s", TestContext.CloudConfig.Zone),
  310. TestContext.CloudConfig.NodeInstanceGroup)
  311. if err != nil {
  312. errLast = fmt.Errorf("gcloud compute instance-groups managed describe call failed with err: %v", err)
  313. return false, nil
  314. }
  315. // The 'describe' call probably succeeded; parse the output and try to
  316. // find the line that looks like "instanceTemplate: url/to/<templ>" and
  317. // return <templ>.
  318. if val := ParseKVLines(output, key); len(val) > 0 {
  319. url := strings.Split(val, "/")
  320. templ = url[len(url)-1]
  321. Logf("MIG group %s using template: %s", TestContext.CloudConfig.NodeInstanceGroup, templ)
  322. return true, nil
  323. }
  324. errLast = fmt.Errorf("couldn't find %s in output to get MIG template. Output: %s", key, output)
  325. return false, nil
  326. }) != nil {
  327. return "", fmt.Errorf("MigTemplate() failed with last error: %v", errLast)
  328. }
  329. return templ, nil
  330. }