federated-ingress.go 16 KB


  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "fmt"
  16. "net/http"
  17. "os"
  18. "reflect"
  19. "strconv"
  20. "time"
  21. "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/errors"
  24. "k8s.io/kubernetes/pkg/api/v1"
  25. "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
  26. "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
  27. "k8s.io/kubernetes/pkg/util/intstr"
  28. "k8s.io/kubernetes/pkg/util/wait"
  29. "k8s.io/kubernetes/test/e2e/framework"
  30. . "github.com/onsi/ginkgo"
  31. . "github.com/onsi/gomega"
  32. )
  33. const (
  34. MaxRetriesOnFederatedApiserver = 3
  35. FederatedIngressTimeout = 60 * time.Second
  36. FederatedIngressName = "federated-ingress"
  37. FederatedIngressServiceName = "federated-ingress-service"
  38. FederatedIngressServicePodName = "federated-ingress-service-test-pod"
  39. )
  40. var _ = framework.KubeDescribe("Federated ingresses [Feature:Federation]", func() {
  41. f := framework.NewDefaultFederatedFramework("federated-ingress")
  42. // Create/delete ingress api objects
  43. // Validate federation apiserver, does not rely on underlying clusters or federation ingress controller.
  44. Describe("Ingress objects", func() {
  45. AfterEach(func() {
  46. nsName := f.FederationNamespace.Name
  47. // Delete registered ingresses.
  48. ingressList, err := f.FederationClientset_1_4.Extensions().Ingresses(nsName).List(api.ListOptions{})
  49. Expect(err).NotTo(HaveOccurred())
  50. for _, ingress := range ingressList.Items {
  51. err := f.FederationClientset_1_4.Extensions().Ingresses(nsName).Delete(ingress.Name, &api.DeleteOptions{})
  52. Expect(err).NotTo(HaveOccurred())
  53. }
  54. })
  55. It("should be created and deleted successfully", func() {
  56. framework.SkipUnlessFederated(f.Client)
  57. nsName := f.FederationNamespace.Name
  58. ingress := createIngressOrFail(f.FederationClientset_1_4, nsName)
  59. By(fmt.Sprintf("Creation of ingress %q in namespace %q succeeded. Deleting ingress.", ingress.Name, nsName))
  60. // Cleanup
  61. err := f.FederationClientset_1_4.Extensions().Ingresses(nsName).Delete(ingress.Name, &api.DeleteOptions{})
  62. framework.ExpectNoError(err, "Error deleting ingress %q in namespace %q", ingress.Name, ingress.Namespace)
  63. By(fmt.Sprintf("Deletion of ingress %q in namespace %q succeeded.", ingress.Name, nsName))
  64. })
  65. })
  66. // e2e cases for federation ingress controller
  67. var _ = Describe("Federated Ingresses", func() {
  68. var (
  69. clusters map[string]*cluster // All clusters, keyed by cluster name
  70. primaryClusterName, federationName, ns string
  71. jig *federationTestJig
  72. )
  73. // register clusters in federation apiserver
  74. BeforeEach(func() {
  75. framework.SkipUnlessFederated(f.Client)
  76. if federationName = os.Getenv("FEDERATION_NAME"); federationName == "" {
  77. federationName = DefaultFederationName
  78. }
  79. jig = newFederationTestJig(f.FederationClientset_1_4)
  80. clusters = map[string]*cluster{}
  81. primaryClusterName = registerClusters(clusters, UserAgentName, federationName, f)
  82. ns = f.Namespace.Name
  83. })
  84. AfterEach(func() {
  85. unregisterClusters(clusters, f)
  86. })
  87. It("should create and update matching ingresses in underlying clusters", func() {
  88. ingress := createIngressOrFail(f.FederationClientset_1_4, f.Namespace.Name)
  89. defer func() { // Cleanup
  90. By(fmt.Sprintf("Deleting ingress %q in namespace %q", ingress.Name, f.Namespace.Name))
  91. err := f.FederationClientset_1_4.Ingresses(f.Namespace.Name).Delete(ingress.Name, &api.DeleteOptions{})
  92. framework.ExpectNoError(err, "Error deleting ingress %q in namespace %q", ingress.Name, f.Namespace.Name)
  93. }()
  94. // wait for ingress shards being created
  95. waitForIngressShardsOrFail(f.Namespace.Name, ingress, clusters)
  96. ingress = updateIngressOrFail(f.FederationClientset_1_4, f.Namespace.Name)
  97. waitForIngressShardsUpdatedOrFail(f.Namespace.Name, ingress, clusters)
  98. })
  99. var _ = Describe("Ingress connectivity and DNS", func() {
  100. var (
  101. service *v1.Service
  102. )
  103. BeforeEach(func() {
  104. framework.SkipUnlessFederated(f.Client)
  105. // create backend pod
  106. createBackendPodsOrFail(clusters, f.Namespace.Name, FederatedIngressServicePodName)
  107. // create backend service
  108. service = createServiceOrFail(f.FederationClientset_1_4, f.Namespace.Name, FederatedIngressServiceName)
  109. // create ingress object
  110. jig.ing = createIngressOrFail(f.FederationClientset_1_4, f.Namespace.Name)
  111. // wait for services objects sync
  112. waitForServiceShardsOrFail(f.Namespace.Name, service, clusters)
  113. // wait for ingress objects sync
  114. waitForIngressShardsOrFail(f.Namespace.Name, jig.ing, clusters)
  115. })
  116. AfterEach(func() {
  117. deleteBackendPodsOrFail(clusters, f.Namespace.Name)
  118. if service != nil {
  119. deleteServiceOrFail(f.FederationClientset_1_4, f.Namespace.Name, service.Name)
  120. service = nil
  121. } else {
  122. By("No service to delete. Service is nil")
  123. }
  124. if jig.ing != nil {
  125. deleteIngressOrFail(f.FederationClientset_1_4, f.Namespace.Name, jig.ing.Name)
  126. jig.ing = nil
  127. } else {
  128. By("No ingress to delete. Ingress is nil")
  129. }
  130. })
  131. PIt("should be able to discover a federated ingress service", func() {
  132. // we are about the ingress name
  133. svcDNSNames := []string{
  134. fmt.Sprintf("%s.%s", FederatedIngressServiceName, f.Namespace.Name),
  135. fmt.Sprintf("%s.%s.svc.cluster.local.", FederatedIngressServiceName, f.Namespace.Name),
  136. // TODO these two entries are not set yet
  137. //fmt.Sprintf("%s.%s.%s", FederatedIngressServiceName, f.Namespace.Name, federationName),
  138. //fmt.Sprintf("%s.%s.%s.svc.cluster.local.", FederatedIngressServiceName, f.Namespace.Name, federationName),
  139. }
  140. // check dns records in underlying cluster
  141. for i, DNSName := range svcDNSNames {
  142. discoverService(f, DNSName, true, "federated-ingress-e2e-discovery-pod-"+strconv.Itoa(i))
  143. }
  144. // TODO check dns record in global dns server
  145. // check the traffic on federation ingress
  146. jig.waitForFederatedIngress()
  147. })
  148. })
  149. })
  150. })
  151. /*
  152. equivalent returns true if the two ingress spec are equivalent.
  153. */
  154. func equivalentIngress(federatedIngress, clusterIngress v1beta1.Ingress) bool {
  155. return reflect.DeepEqual(clusterIngress.Spec, federatedIngress.Spec)
  156. }
  157. /*
  158. waitForIngressOrFail waits until a ingress is either present or absent in the cluster specified by clientset.
  159. If the condition is not met within timout, it fails the calling test.
  160. */
  161. func waitForIngressOrFail(clientset *release_1_3.Clientset, namespace string, ingress *v1beta1.Ingress, present bool, timeout time.Duration) {
  162. By(fmt.Sprintf("Fetching a federated ingress shard of ingress %q in namespace %q from cluster", ingress.Name, namespace))
  163. var clusterIngress *v1beta1.Ingress
  164. err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
  165. clusterIngress, err := clientset.Ingresses(namespace).Get(ingress.Name)
  166. if (!present) && errors.IsNotFound(err) { // We want it gone, and it's gone.
  167. By(fmt.Sprintf("Success: shard of federated ingress %q in namespace %q in cluster is absent", ingress.Name, namespace))
  168. return true, nil // Success
  169. }
  170. if present && err == nil { // We want it present, and the Get succeeded, so we're all good.
  171. By(fmt.Sprintf("Success: shard of federated ingress %q in namespace %q in cluster is present", ingress.Name, namespace))
  172. return true, nil // Success
  173. }
  174. By(fmt.Sprintf("Ingress %q in namespace %q in cluster. Found: %v, waiting for Found: %v, trying again in %s (err=%v)", ingress.Name, namespace, clusterIngress != nil && err == nil, present, framework.Poll, err))
  175. return false, nil
  176. })
  177. framework.ExpectNoError(err, "Failed to verify ingress %q in namespace %q in cluster: Present=%v", ingress.Name, namespace, present)
  178. if present && clusterIngress != nil {
  179. Expect(equivalentIngress(*clusterIngress, *ingress))
  180. }
  181. }
  182. /*
  183. waitForIngressShardsOrFail waits for the ingress to appear in all clusters
  184. */
  185. func waitForIngressShardsOrFail(namespace string, ingress *v1beta1.Ingress, clusters map[string]*cluster) {
  186. framework.Logf("Waiting for ingress %q in %d clusters", ingress.Name, len(clusters))
  187. for _, c := range clusters {
  188. waitForIngressOrFail(c.Clientset, namespace, ingress, true, FederatedIngressTimeout)
  189. }
  190. }
  191. /*
  192. waitForIngressShardsUpdatedOrFail waits for the ingress to be updated in all clusters
  193. */
  194. func waitForIngressShardsUpdatedOrFail(namespace string, ingress *v1beta1.Ingress, clusters map[string]*cluster) {
  195. framework.Logf("Waiting for ingress %q in %d clusters", ingress.Name, len(clusters))
  196. for _, c := range clusters {
  197. waitForIngressUpdateOrFail(c.Clientset, namespace, ingress, FederatedIngressTimeout)
  198. }
  199. }
  200. /*
  201. waitForIngressUpdateOrFail waits until a ingress is updated in the specified cluster with same spec of federated ingress.
  202. If the condition is not met within timeout, it fails the calling test.
  203. */
  204. func waitForIngressUpdateOrFail(clientset *release_1_3.Clientset, namespace string, ingress *v1beta1.Ingress, timeout time.Duration) {
  205. By(fmt.Sprintf("Fetching a federated ingress shard of ingress %q in namespace %q from cluster", ingress.Name, namespace))
  206. err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
  207. clusterIngress, err := clientset.Ingresses(namespace).Get(ingress.Name)
  208. if err == nil { // We want it present, and the Get succeeded, so we're all good.
  209. if equivalentIngress(*clusterIngress, *ingress) {
  210. By(fmt.Sprintf("Success: shard of federated ingress %q in namespace %q in cluster is updated", ingress.Name, namespace))
  211. return true, nil
  212. }
  213. By(fmt.Sprintf("Ingress %q in namespace %q in cluster, waiting for service being updated, trying again in %s (err=%v)", ingress.Name, namespace, framework.Poll, err))
  214. return false, nil
  215. }
  216. By(fmt.Sprintf("Ingress %q in namespace %q in cluster, waiting for service being updated, trying again in %s (err=%v)", ingress.Name, namespace, framework.Poll, err))
  217. return false, nil
  218. })
  219. framework.ExpectNoError(err, "Failed to verify ingress %q in namespace %q in cluster", ingress.Name, namespace)
  220. }
  221. /*
  222. waitForIngressShardsGoneOrFail waits for the ingress to disappear in all clusters
  223. */
  224. func waitForIngressShardsGoneOrFail(namespace string, ingress *v1beta1.Ingress, clusters map[string]*cluster) {
  225. framework.Logf("Waiting for ingress %q in %d clusters", ingress.Name, len(clusters))
  226. for _, c := range clusters {
  227. waitForIngressOrFail(c.Clientset, namespace, ingress, false, FederatedIngressTimeout)
  228. }
  229. }
  230. func deleteIngressOrFail(clientset *federation_release_1_4.Clientset, namespace string, ingressName string) {
  231. if clientset == nil || len(namespace) == 0 || len(ingressName) == 0 {
  232. Fail(fmt.Sprintf("Internal error: invalid parameters passed to deleteIngressOrFail: clientset: %v, namespace: %v, ingress: %v", clientset, namespace, ingressName))
  233. }
  234. err := clientset.Ingresses(namespace).Delete(ingressName, api.NewDeleteOptions(0))
  235. framework.ExpectNoError(err, "Error deleting ingress %q from namespace %q", ingressName, namespace)
  236. }
  237. func createIngressOrFail(clientset *federation_release_1_4.Clientset, namespace string) *v1beta1.Ingress {
  238. if clientset == nil || len(namespace) == 0 {
  239. Fail(fmt.Sprintf("Internal error: invalid parameters passed to createIngressOrFail: clientset: %v, namespace: %v", clientset, namespace))
  240. }
  241. By(fmt.Sprintf("Creating federated ingress %q in namespace %q", FederatedIngressName, namespace))
  242. ingress := &v1beta1.Ingress{
  243. ObjectMeta: v1.ObjectMeta{
  244. Name: FederatedIngressName,
  245. },
  246. Spec: v1beta1.IngressSpec{
  247. Backend: &v1beta1.IngressBackend{
  248. ServiceName: "testingress-service",
  249. ServicePort: intstr.FromInt(80),
  250. },
  251. },
  252. }
  253. _, err := clientset.Extensions().Ingresses(namespace).Create(ingress)
  254. framework.ExpectNoError(err, "Creating ingress %q in namespace %q", ingress.Name, namespace)
  255. By(fmt.Sprintf("Successfully created federated ingress %q in namespace %q", FederatedIngressName, namespace))
  256. return ingress
  257. }
  258. func updateIngressOrFail(clientset *federation_release_1_4.Clientset, namespace string) (newIng *v1beta1.Ingress) {
  259. var err error
  260. if clientset == nil || len(namespace) == 0 {
  261. Fail(fmt.Sprintf("Internal error: invalid parameters passed to createIngressOrFail: clientset: %v, namespace: %v", clientset, namespace))
  262. }
  263. ingress := &v1beta1.Ingress{
  264. ObjectMeta: v1.ObjectMeta{
  265. Name: FederatedIngressName,
  266. },
  267. Spec: v1beta1.IngressSpec{
  268. Backend: &v1beta1.IngressBackend{
  269. ServiceName: "updated-testingress-service",
  270. ServicePort: intstr.FromInt(80),
  271. },
  272. },
  273. }
  274. for MaxRetriesOnFederatedApiserver := 0; MaxRetriesOnFederatedApiserver < 3; MaxRetriesOnFederatedApiserver++ {
  275. _, err = clientset.Extensions().Ingresses(namespace).Get(FederatedIngressName)
  276. if err != nil {
  277. framework.Failf("failed to get ingress %q: %v", FederatedIngressName, err)
  278. }
  279. newIng, err = clientset.Extensions().Ingresses(namespace).Update(ingress)
  280. if err == nil {
  281. describeIng(namespace)
  282. return
  283. }
  284. if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
  285. framework.Failf("failed to update ingress %q: %v", FederatedIngressName, err)
  286. }
  287. }
  288. framework.Failf("too many retries updating ingress %q", FederatedIngressName)
  289. return newIng
  290. }
  291. func (j *federationTestJig) waitForFederatedIngress() {
  292. // Wait for the loadbalancer IP.
  293. address, err := WaitForFederatedIngressAddress(j.client, j.ing.Namespace, j.ing.Name, lbPollTimeout)
  294. if err != nil {
  295. framework.Failf("Ingress failed to acquire an IP address within %v", lbPollTimeout)
  296. }
  297. j.address = address
  298. framework.Logf("Found address %v for ingress %v", j.address, j.ing.Name)
  299. timeoutClient := &http.Client{Timeout: reqTimeout}
  300. // Check that all rules respond to a simple GET.
  301. for _, rules := range j.ing.Spec.Rules {
  302. proto := "http"
  303. for _, p := range rules.IngressRuleValue.HTTP.Paths {
  304. route := fmt.Sprintf("%v://%v%v", proto, address, p.Path)
  305. framework.Logf("Testing route %v host %v with simple GET", route, rules.Host)
  306. ExpectNoError(pollURL(route, rules.Host, lbPollTimeout, timeoutClient, false))
  307. }
  308. }
  309. }
  310. type federationTestJig struct {
  311. // TODO add TLS check later
  312. rootCAs map[string][]byte
  313. address string
  314. ing *v1beta1.Ingress
  315. client *federation_release_1_4.Clientset
  316. }
  317. func newFederationTestJig(c *federation_release_1_4.Clientset) *federationTestJig {
  318. return &federationTestJig{client: c, rootCAs: map[string][]byte{}}
  319. }
  320. // WaitForFederatedIngressAddress waits for the Ingress to acquire an address.
  321. func WaitForFederatedIngressAddress(c *federation_release_1_4.Clientset, ns, ingName string, timeout time.Duration) (string, error) {
  322. var address string
  323. err := wait.PollImmediate(10*time.Second, timeout, func() (bool, error) {
  324. ipOrNameList, err := getFederatedIngressAddress(c, ns, ingName)
  325. if err != nil || len(ipOrNameList) == 0 {
  326. framework.Logf("Waiting for Ingress %v to acquire IP, error %v", ingName, err)
  327. return false, nil
  328. }
  329. address = ipOrNameList[0]
  330. return true, nil
  331. })
  332. return address, err
  333. }
  334. // getFederatedIngressAddress returns the ips/hostnames associated with the Ingress.
  335. func getFederatedIngressAddress(client *federation_release_1_4.Clientset, ns, name string) ([]string, error) {
  336. ing, err := client.Extensions().Ingresses(ns).Get(name)
  337. if err != nil {
  338. return nil, err
  339. }
  340. addresses := []string{}
  341. for _, a := range ing.Status.LoadBalancer.Ingress {
  342. if a.IP != "" {
  343. addresses = append(addresses, a.IP)
  344. }
  345. if a.Hostname != "" {
  346. addresses = append(addresses, a.Hostname)
  347. }
  348. }
  349. return addresses, nil
  350. }