123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package userspace
- import (
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "net/http/httptest"
- "net/url"
- "os"
- "strconv"
- "sync/atomic"
- "testing"
- "time"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/proxy"
- "k8s.io/kubernetes/pkg/types"
- ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
- "k8s.io/kubernetes/pkg/util/runtime"
- )
- const (
- udpIdleTimeoutForTest = 250 * time.Millisecond
- )
- func joinHostPort(host string, port int) string {
- return net.JoinHostPort(host, fmt.Sprintf("%d", port))
- }
- func waitForClosedPortTCP(p *Proxier, proxyPort int) error {
- for i := 0; i < 50; i++ {
- conn, err := net.Dial("tcp", joinHostPort("", proxyPort))
- if err != nil {
- return nil
- }
- conn.Close()
- time.Sleep(1 * time.Millisecond)
- }
- return fmt.Errorf("port %d still open", proxyPort)
- }
- func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
- for i := 0; i < 50; i++ {
- conn, err := net.Dial("udp", joinHostPort("", proxyPort))
- if err != nil {
- return nil
- }
- conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
- // To detect a closed UDP port write, then read.
- _, err = conn.Write([]byte("x"))
- if err != nil {
- if e, ok := err.(net.Error); ok && !e.Timeout() {
- return nil
- }
- }
- var buf [4]byte
- _, err = conn.Read(buf[0:])
- if err != nil {
- if e, ok := err.(net.Error); ok && !e.Timeout() {
- return nil
- }
- }
- conn.Close()
- time.Sleep(1 * time.Millisecond)
- }
- return fmt.Errorf("port %d still open", proxyPort)
- }
- var tcpServerPort int32
- var udpServerPort int32
- func TestMain(m *testing.M) {
- // Don't handle panics
- runtime.ReallyCrash = true
- // TCP setup.
- tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(r.URL.Path[1:]))
- }))
- defer tcp.Close()
- u, err := url.Parse(tcp.URL)
- if err != nil {
- panic(fmt.Sprintf("failed to parse: %v", err))
- }
- _, port, err := net.SplitHostPort(u.Host)
- if err != nil {
- panic(fmt.Sprintf("failed to parse: %v", err))
- }
- tcpServerPortValue, err := strconv.Atoi(port)
- if err != nil {
- panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
- }
- tcpServerPort = int32(tcpServerPortValue)
- // UDP setup.
- udp, err := newUDPEchoServer()
- if err != nil {
- panic(fmt.Sprintf("failed to make a UDP server: %v", err))
- }
- _, port, err = net.SplitHostPort(udp.LocalAddr().String())
- if err != nil {
- panic(fmt.Sprintf("failed to parse: %v", err))
- }
- udpServerPortValue, err := strconv.Atoi(port)
- if err != nil {
- panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
- }
- udpServerPort = int32(udpServerPortValue)
- go udp.Loop()
- ret := m.Run()
- // it should be safe to call Close() multiple times.
- tcp.Close()
- os.Exit(ret)
- }
- func testEchoTCP(t *testing.T, address string, port int) {
- path := "aaaaa"
- res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path)
- if err != nil {
- t.Fatalf("error connecting to server: %v", err)
- }
- defer res.Body.Close()
- data, err := ioutil.ReadAll(res.Body)
- if err != nil {
- t.Errorf("error reading data: %v %v", err, string(data))
- }
- if string(data) != path {
- t.Errorf("expected: %s, got %s", path, string(data))
- }
- }
- func testEchoUDP(t *testing.T, address string, port int) {
- data := "abc123"
- conn, err := net.Dial("udp", joinHostPort(address, port))
- if err != nil {
- t.Fatalf("error connecting to server: %v", err)
- }
- if _, err := conn.Write([]byte(data)); err != nil {
- t.Fatalf("error sending to server: %v", err)
- }
- var resp [1024]byte
- n, err := conn.Read(resp[0:])
- if err != nil {
- t.Errorf("error receiving data: %v", err)
- }
- if string(resp[0:n]) != data {
- t.Errorf("expected: %s, got %s", data, string(resp[0:n]))
- }
- }
- func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
- var got int32
- for i := 0; i < 600; i++ {
- got = atomic.LoadInt32(&p.numProxyLoops)
- if got == want {
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- t.Errorf("expected %d ProxyLoops running, got %d", want, got)
- }
- func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) {
- var got int
- now := time.Now()
- deadline := now.Add(timeout)
- for time.Now().Before(deadline) {
- s.activeClients.mu.Lock()
- got = len(s.activeClients.clients)
- s.activeClients.mu.Unlock()
- if got == want {
- return
- }
- time.Sleep(500 * time.Millisecond)
- }
- t.Errorf("expected %d ProxyClients live, got %d", want, got)
- }
- func TestTCPProxy(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestUDPProxy(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestUDPProxyTimeout(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- waitForNumProxyLoops(t, p, 1)
- testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
- // When connecting to a UDP service endpoint, there should be a Conn for proxy.
- waitForNumProxyClients(t, svcInfo, 1, time.Second)
- // If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout.
- waitForNumProxyClients(t, svcInfo, 0, 2*time.Second)
- }
- func TestMultiPortProxy(t *testing.T) {
- lb := NewLoadBalancerRR()
- serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"}
- serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"}
- lb.OnEndpointsUpdate([]api.Endpoints{{
- ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}},
- }},
- }, {
- ObjectMeta: api.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}},
- }},
- }})
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort)
- waitForNumProxyLoops(t, p, 2)
- }
- func TestMultiPortOnServiceUpdate(t *testing.T) {
- lb := NewLoadBalancerRR()
- serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
- serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: 80,
- Protocol: "TCP",
- }, {
- Name: "q",
- Port: 81,
- Protocol: "UDP",
- }}},
- }})
- waitForNumProxyLoops(t, p, 2)
- svcInfo, exists := p.getServiceInfo(serviceP)
- if !exists {
- t.Fatalf("can't find serviceInfo for %s", serviceP)
- }
- if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" {
- t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
- }
- svcInfo, exists = p.getServiceInfo(serviceQ)
- if !exists {
- t.Fatalf("can't find serviceInfo for %s", serviceQ)
- }
- if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" {
- t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo)
- }
- svcInfo, exists = p.getServiceInfo(serviceX)
- if exists {
- t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo)
- }
- }
- // Helper: Stops the proxy for the named service.
- func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
- info, found := proxier.getServiceInfo(service)
- if !found {
- return fmt.Errorf("unknown service: %s", service)
- }
- return proxier.stopProxy(service, info)
- }
- func TestTCPProxyStop(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- if !svcInfo.isAlive() {
- t.Fatalf("wrong value for isAlive(): expected true")
- }
- conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- stopProxyByName(p, service)
- if svcInfo.isAlive() {
- t.Fatalf("wrong value for isAlive(): expected false")
- }
- // Wait for the port to really close.
- if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- }
- func TestUDPProxyStop(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- stopProxyByName(p, service)
- // Wait for the port to really close.
- if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- }
- func TestTCPProxyUpdateDelete(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{})
- if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- }
- func TestUDPProxyUpdateDelete(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{})
- if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- }
- func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- endpoint := api.Endpoints{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- }
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{})
- if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- // need to add endpoint here because it got clean up during service delete
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.proxyPort),
- Protocol: "TCP",
- }}},
- }})
- svcInfo, exists := p.getServiceInfo(service)
- if !exists {
- t.Fatalf("can't find serviceInfo for %s", service)
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- endpoint := api.Endpoints{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- }
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
- if err != nil {
- t.Fatalf("error connecting to proxy: %v", err)
- }
- conn.Close()
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{})
- if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- waitForNumProxyLoops(t, p, 0)
- // need to add endpoint here because it got clean up during service delete
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.proxyPort),
- Protocol: "UDP",
- }}},
- }})
- svcInfo, exists := p.getServiceInfo(service)
- if !exists {
- t.Fatalf("can't find serviceInfo")
- }
- testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestTCPProxyUpdatePort(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: 99,
- Protocol: "TCP",
- }}},
- }})
- // Wait for the socket to actually get free.
- if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- svcInfo, exists := p.getServiceInfo(service)
- if !exists {
- t.Fatalf("can't find serviceInfo")
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- // This is a bit async, but this should be sufficient.
- time.Sleep(500 * time.Millisecond)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestUDPProxyUpdatePort(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: 99,
- Protocol: "UDP",
- }}},
- }})
- // Wait for the socket to actually get free.
- if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- svcInfo, exists := p.getServiceInfo(service)
- if !exists {
- t.Fatalf("can't find serviceInfo")
- }
- testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestProxyUpdatePublicIPs(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- lb.OnEndpointsUpdate([]api.Endpoints{
- {
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- },
- })
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{
- Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.portal.port),
- Protocol: "TCP",
- }},
- ClusterIP: svcInfo.portal.ip.String(),
- ExternalIPs: []string{"4.3.2.1"},
- },
- }})
- // Wait for the socket to actually get free.
- if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
- t.Fatalf(err.Error())
- }
- svcInfo, exists := p.getServiceInfo(service)
- if !exists {
- t.Fatalf("can't find serviceInfo")
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- // This is a bit async, but this should be sufficient.
- time.Sleep(500 * time.Millisecond)
- waitForNumProxyLoops(t, p, 1)
- }
- func TestProxyUpdatePortal(t *testing.T) {
- lb := NewLoadBalancerRR()
- service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
- endpoint := api.Endpoints{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
- Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
- }},
- }
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
- if err != nil {
- t.Fatal(err)
- }
- waitForNumProxyLoops(t, p, 0)
- svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
- if err != nil {
- t.Fatalf("error adding new service: %#v", err)
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.proxyPort),
- Protocol: "TCP",
- }}},
- }})
- _, exists := p.getServiceInfo(service)
- if exists {
- t.Fatalf("service with empty ClusterIP should not be included in the proxy")
- }
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.proxyPort),
- Protocol: "TCP",
- }}},
- }})
- _, exists = p.getServiceInfo(service)
- if exists {
- t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
- }
- p.OnServiceUpdate([]api.Service{{
- ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
- Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
- Name: "p",
- Port: int32(svcInfo.proxyPort),
- Protocol: "TCP",
- }}},
- }})
- lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
- svcInfo, exists = p.getServiceInfo(service)
- if !exists {
- t.Fatalf("service with ClusterIP set not found in the proxy")
- }
- testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
- waitForNumProxyLoops(t, p, 1)
- }
- // TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in
|