monitoring.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "time"
  19. influxdb "github.com/influxdata/influxdb/client"
  20. "k8s.io/kubernetes/pkg/api"
  21. client "k8s.io/kubernetes/pkg/client/unversioned"
  22. "k8s.io/kubernetes/pkg/labels"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. . "github.com/onsi/ginkgo"
  25. )
  26. var _ = framework.KubeDescribe("Monitoring", func() {
  27. f := framework.NewDefaultFramework("monitoring")
  28. BeforeEach(func() {
  29. framework.SkipUnlessProviderIs("gce")
  30. })
  31. It("should verify monitoring pods and all cluster nodes are available on influxdb using heapster.", func() {
  32. testMonitoringUsingHeapsterInfluxdb(f.Client)
  33. })
  34. })
  35. const (
  36. influxdbService = "monitoring-influxdb"
  37. influxdbDatabaseName = "k8s"
  38. podlistQuery = "show tag values from \"cpu/usage\" with key = pod_id"
  39. nodelistQuery = "show tag values from \"cpu/usage\" with key = hostname"
  40. sleepBetweenAttempts = 5 * time.Second
  41. testTimeout = 5 * time.Minute
  42. )
  43. var (
  44. rcLabels = []string{"heapster", "influxGrafana"}
  45. expectedServices = map[string]bool{
  46. influxdbService: false,
  47. "monitoring-grafana": false,
  48. }
  49. )
  50. // Query sends a command to the server and returns the Response
  51. func Query(c *client.Client, query string) (*influxdb.Response, error) {
  52. result, err := c.Get().
  53. Prefix("proxy").
  54. Namespace("kube-system").
  55. Resource("services").
  56. Name(influxdbService+":api").
  57. Suffix("query").
  58. Param("q", query).
  59. Param("db", influxdbDatabaseName).
  60. Param("epoch", "s").
  61. Do().
  62. Raw()
  63. if err != nil {
  64. return nil, err
  65. }
  66. var response influxdb.Response
  67. dec := json.NewDecoder(bytes.NewReader(result))
  68. dec.UseNumber()
  69. err = dec.Decode(&response)
  70. if err != nil {
  71. return nil, err
  72. }
  73. return &response, nil
  74. }
  75. func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) {
  76. expectedPods := []string{}
  77. // Iterate over the labels that identify the replication controllers that we
  78. // want to check. The rcLabels contains the value values for the k8s-app key
  79. // that identify the replication controllers that we want to check. Using a label
  80. // rather than an explicit name is preferred because the names will typically have
  81. // a version suffix e.g. heapster-monitoring-v1 and this will change after a rolling
  82. // update e.g. to heapster-monitoring-v2. By using a label query we can check for the
  83. // situation when a heapster-monitoring-v1 and heapster-monitoring-v2 replication controller
  84. // is running (which would be an error except during a rolling update).
  85. for _, rcLabel := range rcLabels {
  86. selector := labels.Set{"k8s-app": rcLabel}.AsSelector()
  87. options := api.ListOptions{LabelSelector: selector}
  88. deploymentList, err := c.Deployments(api.NamespaceSystem).List(options)
  89. if err != nil {
  90. return nil, err
  91. }
  92. rcList, err := c.ReplicationControllers(api.NamespaceSystem).List(options)
  93. if err != nil {
  94. return nil, err
  95. }
  96. psList, err := c.Apps().PetSets(api.NamespaceSystem).List(options)
  97. if err != nil {
  98. return nil, err
  99. }
  100. if (len(rcList.Items) + len(deploymentList.Items) + len(psList.Items)) != 1 {
  101. return nil, fmt.Errorf("expected to find one replica for RC or deployment with label %s but got %d",
  102. rcLabel, len(rcList.Items))
  103. }
  104. // Check all the replication controllers.
  105. for _, rc := range rcList.Items {
  106. selector := labels.Set(rc.Spec.Selector).AsSelector()
  107. options := api.ListOptions{LabelSelector: selector}
  108. podList, err := c.Pods(api.NamespaceSystem).List(options)
  109. if err != nil {
  110. return nil, err
  111. }
  112. for _, pod := range podList.Items {
  113. if pod.DeletionTimestamp != nil {
  114. continue
  115. }
  116. expectedPods = append(expectedPods, string(pod.UID))
  117. }
  118. }
  119. // Do the same for all deployments.
  120. for _, rc := range deploymentList.Items {
  121. selector := labels.Set(rc.Spec.Selector.MatchLabels).AsSelector()
  122. options := api.ListOptions{LabelSelector: selector}
  123. podList, err := c.Pods(api.NamespaceSystem).List(options)
  124. if err != nil {
  125. return nil, err
  126. }
  127. for _, pod := range podList.Items {
  128. if pod.DeletionTimestamp != nil {
  129. continue
  130. }
  131. expectedPods = append(expectedPods, string(pod.UID))
  132. }
  133. }
  134. // And for pet sets.
  135. for _, ps := range psList.Items {
  136. selector := labels.Set(ps.Spec.Selector.MatchLabels).AsSelector()
  137. options := api.ListOptions{LabelSelector: selector}
  138. podList, err := c.Pods(api.NamespaceSystem).List(options)
  139. if err != nil {
  140. return nil, err
  141. }
  142. for _, pod := range podList.Items {
  143. if pod.DeletionTimestamp != nil {
  144. continue
  145. }
  146. expectedPods = append(expectedPods, string(pod.UID))
  147. }
  148. }
  149. }
  150. return expectedPods, nil
  151. }
  152. func expectedServicesExist(c *client.Client) error {
  153. serviceList, err := c.Services(api.NamespaceSystem).List(api.ListOptions{})
  154. if err != nil {
  155. return err
  156. }
  157. for _, service := range serviceList.Items {
  158. if _, ok := expectedServices[service.Name]; ok {
  159. expectedServices[service.Name] = true
  160. }
  161. }
  162. for service, found := range expectedServices {
  163. if !found {
  164. return fmt.Errorf("Service %q not found", service)
  165. }
  166. }
  167. return nil
  168. }
  169. func getAllNodesInCluster(c *client.Client) ([]string, error) {
  170. // It should be OK to list unschedulable Nodes here.
  171. nodeList, err := c.Nodes().List(api.ListOptions{})
  172. if err != nil {
  173. return nil, err
  174. }
  175. result := []string{}
  176. for _, node := range nodeList.Items {
  177. result = append(result, node.Name)
  178. }
  179. return result, nil
  180. }
  181. func getInfluxdbData(c *client.Client, query string, tag string) (map[string]bool, error) {
  182. response, err := Query(c, query)
  183. if err != nil {
  184. return nil, err
  185. }
  186. if len(response.Results) != 1 {
  187. return nil, fmt.Errorf("expected only one result from Influxdb for query %q. Got %+v", query, response)
  188. }
  189. if len(response.Results[0].Series) != 1 {
  190. return nil, fmt.Errorf("expected exactly one series for query %q.", query)
  191. }
  192. if len(response.Results[0].Series[0].Columns) != 2 {
  193. framework.Failf("Expected two columns for query %q. Found %v", query, response.Results[0].Series[0].Columns)
  194. }
  195. result := map[string]bool{}
  196. for _, value := range response.Results[0].Series[0].Values {
  197. name := value[1].(string)
  198. result[name] = true
  199. }
  200. return result, nil
  201. }
  202. func expectedItemsExist(expectedItems []string, actualItems map[string]bool) bool {
  203. if len(actualItems) < len(expectedItems) {
  204. return false
  205. }
  206. for _, item := range expectedItems {
  207. if _, found := actualItems[item]; !found {
  208. return false
  209. }
  210. }
  211. return true
  212. }
  213. func validatePodsAndNodes(c *client.Client, expectedPods, expectedNodes []string) bool {
  214. pods, err := getInfluxdbData(c, podlistQuery, "pod_id")
  215. if err != nil {
  216. // We don't fail the test here because the influxdb service might still not be running.
  217. framework.Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err)
  218. return false
  219. }
  220. nodes, err := getInfluxdbData(c, nodelistQuery, "hostname")
  221. if err != nil {
  222. framework.Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err)
  223. return false
  224. }
  225. if !expectedItemsExist(expectedPods, pods) {
  226. framework.Logf("failed to find all expected Pods.\nExpected: %v\nActual: %v", expectedPods, pods)
  227. return false
  228. }
  229. if !expectedItemsExist(expectedNodes, nodes) {
  230. framework.Logf("failed to find all expected Nodes.\nExpected: %v\nActual: %v", expectedNodes, nodes)
  231. return false
  232. }
  233. return true
  234. }
  235. func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
  236. // Check if heapster pods and services are up.
  237. expectedPods, err := verifyExpectedRcsExistAndGetExpectedPods(c)
  238. framework.ExpectNoError(err)
  239. framework.ExpectNoError(expectedServicesExist(c))
  240. // TODO: Wait for all pods and services to be running.
  241. expectedNodes, err := getAllNodesInCluster(c)
  242. framework.ExpectNoError(err)
  243. startTime := time.Now()
  244. for {
  245. if validatePodsAndNodes(c, expectedPods, expectedNodes) {
  246. return
  247. }
  248. if time.Since(startTime) >= testTimeout {
  249. // temporary workaround to help debug issue #12765
  250. printDebugInfo(c)
  251. break
  252. }
  253. time.Sleep(sleepBetweenAttempts)
  254. }
  255. framework.Failf("monitoring using heapster and influxdb test failed")
  256. }
  257. func printDebugInfo(c *client.Client) {
  258. set := labels.Set{"k8s-app": "heapster"}
  259. options := api.ListOptions{LabelSelector: set.AsSelector()}
  260. podList, err := c.Pods(api.NamespaceSystem).List(options)
  261. if err != nil {
  262. framework.Logf("Error while listing pods %v", err)
  263. return
  264. }
  265. for _, pod := range podList.Items {
  266. framework.Logf("Kubectl output:\n%v",
  267. framework.RunKubectlOrDie("log", pod.Name, "--namespace=kube-system", "--container=heapster"))
  268. }
  269. }