123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package server
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/http/httptest"
- "net/http/httputil"
- "reflect"
- "strconv"
- "strings"
- "testing"
- "time"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- cadvisorapiv2 "github.com/google/cadvisor/info/v2"
- "k8s.io/kubernetes/pkg/api"
- apierrs "k8s.io/kubernetes/pkg/api/errors"
- "k8s.io/kubernetes/pkg/auth/authorizer"
- "k8s.io/kubernetes/pkg/auth/user"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
- "k8s.io/kubernetes/pkg/kubelet/server/stats"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/types"
- "k8s.io/kubernetes/pkg/util/httpstream"
- "k8s.io/kubernetes/pkg/util/httpstream/spdy"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/term"
- "k8s.io/kubernetes/pkg/volume"
- )
- type fakeKubelet struct {
- podByNameFunc func(namespace, name string) (*api.Pod, bool)
- containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
- rawInfoFunc func(query *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
- machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
- podsFunc func() []*api.Pod
- runningPodsFunc func() ([]*api.Pod, error)
- logFunc func(w http.ResponseWriter, req *http.Request)
- runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
- execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
- attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
- portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
- containerLogsFunc func(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
- streamingConnectionIdleTimeoutFunc func() time.Duration
- hostnameFunc func() string
- resyncInterval time.Duration
- loopEntryTime time.Time
- plegHealth bool
- }
- func (fk *fakeKubelet) ResyncInterval() time.Duration {
- return fk.resyncInterval
- }
- func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
- return fk.loopEntryTime
- }
- func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
- return fk.podByNameFunc(namespace, name)
- }
- func (fk *fakeKubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- return fk.containerInfoFunc(podFullName, uid, containerName, req)
- }
- func (fk *fakeKubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
- return fk.rawInfoFunc(req)
- }
- func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
- return fk.machineInfoFunc()
- }
- func (fk *fakeKubelet) GetPods() []*api.Pod {
- return fk.podsFunc()
- }
- func (fk *fakeKubelet) GetRunningPods() ([]*api.Pod, error) {
- return fk.runningPodsFunc()
- }
- func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
- fk.logFunc(w, req)
- }
- func (fk *fakeKubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
- return fk.containerLogsFunc(podFullName, containerName, logOptions, stdout, stderr)
- }
- func (fk *fakeKubelet) GetHostname() string {
- return fk.hostnameFunc()
- }
- func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- return fk.runFunc(podFullName, uid, containerName, cmd)
- }
- func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
- return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
- }
- func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
- return fk.attachFunc(name, uid, container, in, out, err, tty)
- }
- func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
- return fk.portForwardFunc(name, uid, port, stream)
- }
- func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
- return fk.streamingConnectionIdleTimeoutFunc()
- }
- func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil }
- // Unused functions
- func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
- return nil, nil
- }
- func (_ *fakeKubelet) ImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
- return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupported Operation ImagesFsInfo")
- }
- func (_ *fakeKubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
- return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupport Operation RootFsInfo")
- }
- func (_ *fakeKubelet) GetNode() (*api.Node, error) { return nil, nil }
- func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
- func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
- return map[string]volume.Volume{}, true
- }
- type fakeAuth struct {
- authenticateFunc func(*http.Request) (user.Info, bool, error)
- attributesFunc func(user.Info, *http.Request) authorizer.Attributes
- authorizeFunc func(authorizer.Attributes) (authorized bool, reason string, err error)
- }
- func (f *fakeAuth) AuthenticateRequest(req *http.Request) (user.Info, bool, error) {
- return f.authenticateFunc(req)
- }
- func (f *fakeAuth) GetRequestAttributes(u user.Info, req *http.Request) authorizer.Attributes {
- return f.attributesFunc(u, req)
- }
- func (f *fakeAuth) Authorize(a authorizer.Attributes) (authorized bool, reason string, err error) {
- return f.authorizeFunc(a)
- }
- type serverTestFramework struct {
- serverUnderTest *Server
- fakeKubelet *fakeKubelet
- fakeAuth *fakeAuth
- testHTTPServer *httptest.Server
- }
- func newServerTest() *serverTestFramework {
- fw := &serverTestFramework{}
- fw.fakeKubelet = &fakeKubelet{
- hostnameFunc: func() string {
- return "127.0.0.1"
- },
- podByNameFunc: func(namespace, name string) (*api.Pod, bool) {
- return &api.Pod{
- ObjectMeta: api.ObjectMeta{
- Namespace: namespace,
- Name: name,
- },
- }, true
- },
- plegHealth: true,
- }
- fw.fakeAuth = &fakeAuth{
- authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
- return &user.DefaultInfo{Name: "test"}, true, nil
- },
- attributesFunc: func(u user.Info, req *http.Request) authorizer.Attributes {
- return &authorizer.AttributesRecord{User: u}
- },
- authorizeFunc: func(a authorizer.Attributes) (authorized bool, reason string, err error) {
- return true, "", nil
- },
- }
- server := NewServer(
- fw.fakeKubelet,
- stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}),
- fw.fakeAuth,
- true,
- &kubecontainertesting.Mock{})
- fw.serverUnderTest = &server
- fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
- return fw
- }
- // encodeJSON returns obj marshalled as a JSON string, panicing on any errors
- func encodeJSON(obj interface{}) string {
- data, err := json.Marshal(obj)
- if err != nil {
- panic(err)
- }
- return string(data)
- }
- func readResp(resp *http.Response) (string, error) {
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- return string(body), err
- }
- // A helper function to return the correct pod name.
- func getPodName(name, namespace string) string {
- if namespace == "" {
- namespace = kubetypes.NamespaceDefault
- }
- return name + "_" + namespace
- }
- func TestContainerInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{}
- podID := "somepod"
- expectedPodID := getPodName(podID, "")
- expectedContainerName := "goodcontainer"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- if podID != expectedPodID || containerName != expectedContainerName {
- return nil, fmt.Errorf("bad podID or containerName: podID=%v; containerName=%v", podID, containerName)
- }
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v", podID, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestContainerInfoWithUidNamespace(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{}
- podID := "somepod"
- expectedNamespace := "custom"
- expectedPodID := getPodName(podID, expectedNamespace)
- expectedContainerName := "goodcontainer"
- expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- if podID != expectedPodID || string(uid) != expectedUid || containerName != expectedContainerName {
- return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName)
- }
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestContainerNotFound(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- podID := "somepod"
- expectedNamespace := "custom"
- expectedContainerName := "slowstartcontainer"
- expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
- fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
- return nil, kubecontainer.ErrContainerNotFound
- }
- resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- if resp.StatusCode != http.StatusNotFound {
- t.Fatalf("Received status %d expecting %d", resp.StatusCode, http.StatusNotFound)
- }
- defer resp.Body.Close()
- }
- func TestRootInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.ContainerInfo{
- ContainerReference: cadvisorapi.ContainerReference{
- Name: "/",
- },
- }
- fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
- return map[string]*cadvisorapi.ContainerInfo{
- expectedInfo.Name: expectedInfo,
- }, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/stats")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !receivedInfo.Eq(expectedInfo) {
- t.Errorf("received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
- }
- }
- func TestSubcontainerContainerInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- const kubeletContainer = "/kubelet"
- const kubeletSubContainer = "/kubelet/sub"
- expectedInfo := map[string]*cadvisorapi.ContainerInfo{
- kubeletContainer: {
- ContainerReference: cadvisorapi.ContainerReference{
- Name: kubeletContainer,
- },
- },
- kubeletSubContainer: {
- ContainerReference: cadvisorapi.ContainerReference{
- Name: kubeletSubContainer,
- },
- },
- }
- fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
- return expectedInfo, nil
- }
- request := fmt.Sprintf("{\"containerName\":%q, \"subcontainers\": true}", kubeletContainer)
- resp, err := http.Post(fw.testHTTPServer.URL+"/stats/container", "application/json", bytes.NewBuffer([]byte(request)))
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo map[string]*cadvisorapi.ContainerInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("Received invalid json data: %v", err)
- }
- if len(receivedInfo) != len(expectedInfo) {
- t.Errorf("Received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
- }
- for _, containerName := range []string{kubeletContainer, kubeletSubContainer} {
- if _, ok := receivedInfo[containerName]; !ok {
- t.Errorf("Expected container %q to be present in result: %#v", containerName, receivedInfo)
- }
- if !receivedInfo[containerName].Eq(expectedInfo[containerName]) {
- t.Errorf("Invalid result for %q: Expected %#v, received %#v", containerName, expectedInfo[containerName], receivedInfo[containerName])
- }
- }
- }
- func TestMachineInfo(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- expectedInfo := &cadvisorapi.MachineInfo{
- NumCores: 4,
- MemoryCapacity: 1024,
- }
- fw.fakeKubelet.machineInfoFunc = func() (*cadvisorapi.MachineInfo, error) {
- return expectedInfo, nil
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/spec")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- var receivedInfo cadvisorapi.MachineInfo
- err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
- if err != nil {
- t.Fatalf("received invalid json data: %v", err)
- }
- if !reflect.DeepEqual(&receivedInfo, expectedInfo) {
- t.Errorf("received wrong data: %#v", receivedInfo)
- }
- }
- func TestServeLogs(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
- fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
- w.WriteHeader(http.StatusOK)
- w.Header().Add("Content-Type", "text/html")
- w.Write([]byte(content))
- }
- resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := httputil.DumpResponse(resp, true)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
- t.Errorf("Received wrong data: %s", result)
- }
- }
- func TestServeRunInContainer(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if strings.Join(cmd, " ") != expectedCommand {
- t.Errorf("expected: %s, got %v", expectedCommand, cmd)
- }
- return []byte(output), nil
- }
- resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("expected %s, got %s", output, result)
- }
- }
- func TestServeRunInContainerWithUID(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedUID := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720"
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if string(uid) != expectedUID {
- t.Errorf("expected %s, got %s", expectedUID, uid)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if strings.Join(cmd, " ") != expectedCommand {
- t.Errorf("expected: %s, got %v", expectedCommand, cmd)
- }
- return []byte(output), nil
- }
- resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- // copying the response body did not work
- t.Errorf("Cannot copy resp: %#v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("expected %s, got %s", output, result)
- }
- }
- func TestHealthCheck(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.hostnameFunc = func() string {
- return "127.0.0.1"
- }
- // Test with correct hostname, Docker version
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- // Test with incorrect hostname
- fw.fakeKubelet.hostnameFunc = func() string {
- return "fake"
- }
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- }
- func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
- resp, err := http.Get(httpURL)
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != expectedErrorCode {
- t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
- }
- }
- type authTestCase struct {
- Method string
- Path string
- }
- func TestAuthFilters(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- testcases := []authTestCase{}
- // This is a sanity check that the Handle->HandleWithFilter() delegation is working
- // Ideally, these would move to registered web services and this list would get shorter
- expectedPaths := []string{"/healthz", "/metrics"}
- paths := sets.NewString(fw.serverUnderTest.restfulCont.RegisteredHandlePaths()...)
- for _, expectedPath := range expectedPaths {
- if !paths.Has(expectedPath) {
- t.Errorf("Expected registered handle path %s was missing", expectedPath)
- }
- }
- // Test all the non-web-service handlers
- for _, path := range fw.serverUnderTest.restfulCont.RegisteredHandlePaths() {
- testcases = append(testcases, authTestCase{"GET", path})
- testcases = append(testcases, authTestCase{"POST", path})
- // Test subpaths for directory handlers
- if strings.HasSuffix(path, "/") {
- testcases = append(testcases, authTestCase{"GET", path + "foo"})
- testcases = append(testcases, authTestCase{"POST", path + "foo"})
- }
- }
- // Test all the generated web-service paths
- for _, ws := range fw.serverUnderTest.restfulCont.RegisteredWebServices() {
- for _, r := range ws.Routes() {
- testcases = append(testcases, authTestCase{r.Method, r.Path})
- }
- }
- for _, tc := range testcases {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
- calledAuthenticate = true
- return expectedUser, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- if u != expectedUser {
- t.Fatalf("%s: expected user %v, got %v", tc.Path, expectedUser, u)
- }
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
- calledAuthorize = true
- if a != expectedAttributes {
- t.Fatalf("%s: expected attributes %v, got %v", tc.Path, expectedAttributes, a)
- }
- return false, "", nil
- }
- req, err := http.NewRequest(tc.Method, fw.testHTTPServer.URL+tc.Path, nil)
- if err != nil {
- t.Errorf("%s: unexpected error: %v", tc.Path, err)
- continue
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- t.Errorf("%s: unexpected error: %v", tc.Path, err)
- continue
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusForbidden {
- t.Errorf("%s: unexpected status code %d", tc.Path, resp.StatusCode)
- continue
- }
- if !calledAuthenticate {
- t.Errorf("%s: Authenticate was not called", tc.Path)
- continue
- }
- if !calledAttributes {
- t.Errorf("%s: Attributes were not called", tc.Path)
- continue
- }
- if !calledAuthorize {
- t.Errorf("%s: Authorize was not called", tc.Path)
- continue
- }
- }
- }
- func TestAuthenticationError(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
- calledAuthenticate = true
- return expectedUser, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
- calledAuthorize = true
- return false, "", errors.New("Failed")
- }
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if !calledAttributes {
- t.Fatalf("Attributes was not called")
- }
- if !calledAuthorize {
- t.Fatalf("Authorize was not called")
- }
- }
- func TestAuthenticationFailure(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
- calledAuthenticate = true
- return nil, false, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
- calledAuthorize = true
- return false, "", nil
- }
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusUnauthorized)
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if calledAttributes {
- t.Fatalf("Attributes was called unexpectedly")
- }
- if calledAuthorize {
- t.Fatalf("Authorize was called unexpectedly")
- }
- }
- func TestAuthorizationSuccess(t *testing.T) {
- var (
- expectedUser = &user.DefaultInfo{Name: "test"}
- expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
- calledAuthenticate = false
- calledAuthorize = false
- calledAttributes = false
- )
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
- calledAuthenticate = true
- return expectedUser, true, nil
- }
- fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
- calledAttributes = true
- return expectedAttributes
- }
- fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
- calledAuthorize = true
- return true, "", nil
- }
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- if !calledAuthenticate {
- t.Fatalf("Authenticate was not called")
- }
- if !calledAttributes {
- t.Fatalf("Attributes were not called")
- }
- if !calledAuthorize {
- t.Fatalf("Authorize was not called")
- }
- }
- func TestSyncLoopCheck(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.hostnameFunc = func() string {
- return "127.0.0.1"
- }
- fw.fakeKubelet.resyncInterval = time.Minute
- fw.fakeKubelet.loopEntryTime = time.Now()
- // Test with correct hostname, Docker version
- assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
- fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
- }
- func TestPLEGHealthCheck(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.hostnameFunc = func() string {
- return "127.0.0.1"
- }
- // Test with failed pleg health check.
- fw.fakeKubelet.plegHealth = false
- assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
- }
- // returns http response status code from the HTTP GET
- func assertHealthIsOk(t *testing.T, httpURL string) {
- resp, err := http.Get(httpURL)
- if err != nil {
- t.Fatalf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
- }
- body, readErr := ioutil.ReadAll(resp.Body)
- if readErr != nil {
- // copying the response body did not work
- t.Fatalf("Cannot copy resp: %#v", readErr)
- }
- result := string(body)
- if !strings.Contains(result, "ok") {
- t.Errorf("expected body contains ok, got %s", result)
- }
- }
- func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {
- fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*api.Pod, bool) {
- return &api.Pod{
- ObjectMeta: api.ObjectMeta{
- Namespace: namespace,
- Name: pod,
- },
- Spec: api.PodSpec{
- Containers: []api.Container{
- {
- Name: container,
- },
- },
- },
- }, true
- }
- }
- func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *api.PodLogOptions, output string) {
- fw.fakeKubelet.containerLogsFunc = func(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
- if podFullName != expectedPodName {
- t.Errorf("expected %s, got %s", expectedPodName, podFullName)
- }
- if containerName != expectedContainerName {
- t.Errorf("expected %s, got %s", expectedContainerName, containerName)
- }
- if !reflect.DeepEqual(expectedLogOptions, logOptions) {
- t.Errorf("expected %#v, got %#v", expectedLogOptions, logOptions)
- }
- io.WriteString(stdout, output)
- return nil
- }
- }
- // TODO: I really want to be a table driven test
- func TestContainerLogs(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName)
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- }
- func TestContainerLogsWithLimitBytes(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- bytes := int64(3)
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{LimitBytes: &bytes}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?limitBytes=3")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output[:bytes] {
- t.Errorf("Expected: '%v', got: '%v'", output[:bytes], result)
- }
- }
- func TestContainerLogsWithTail(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedTail := int64(5)
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{TailLines: &expectedTail}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tailLines=5")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- }
- func TestContainerLogsWithLegacyTail(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- expectedTail := int64(5)
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{TailLines: &expectedTail}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=5")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- }
- func TestContainerLogsWithTailAll(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=all")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- }
- func TestContainerLogsWithInvalidTail(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=-1")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != apierrs.StatusUnprocessableEntity {
- t.Errorf("Unexpected non-error reading container logs: %#v", resp)
- }
- }
- func TestContainerLogsWithFollow(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- output := "foo bar"
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedContainerName := "baz"
- setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
- setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{Follow: true}, output)
- resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?follow=1")
- if err != nil {
- t.Errorf("Got error GETing: %v", err)
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("Error reading container logs: %v", err)
- }
- result := string(body)
- if result != output {
- t.Errorf("Expected: '%v', got: '%v'", output, result)
- }
- }
- func TestServeExecInContainerIdleTimeout(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
- return 100 * time.Millisecond
- }
- podNamespace := "other"
- podName := "foo"
- expectedContainerName := "baz"
- url := fw.testHTTPServer.URL + "/exec/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?c=ls&c=-a&" + api.ExecStdinParam + "=1"
- upgradeRoundTripper := spdy.NewSpdyRoundTripper(nil)
- c := &http.Client{Transport: upgradeRoundTripper}
- resp, err := c.Post(url, "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- upgradeRoundTripper.Dialer = &net.Dialer{
- Deadline: time.Now().Add(60 * time.Second),
- Timeout: 60 * time.Second,
- }
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatal("Unexpected nil connection")
- }
- <-conn.CloseChan()
- }
- func testExecAttach(t *testing.T, verb string) {
- tests := []struct {
- stdin bool
- stdout bool
- stderr bool
- tty bool
- responseStatusCode int
- uid bool
- }{
- {responseStatusCode: http.StatusBadRequest},
- {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
- {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
- {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
- {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
- }
- for i, test := range tests {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
- return 0
- }
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
- expectedContainerName := "baz"
- expectedCommand := "ls -a"
- expectedStdin := "stdin"
- expectedStdout := "stdout"
- expectedStderr := "stderr"
- done := make(chan struct{})
- clientStdoutReadDone := make(chan struct{})
- clientStderrReadDone := make(chan struct{})
- execInvoked := false
- attachInvoked := false
- testStreamFunc := func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
- defer close(done)
- if podFullName != expectedPodName {
- t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName)
- }
- if test.uid && string(uid) != expectedUid {
- t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid)
- }
- if containerName != expectedContainerName {
- t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
- }
- if test.stdin {
- if in == nil {
- t.Fatalf("%d: stdin: expected non-nil", i)
- }
- b := make([]byte, 10)
- n, err := in.Read(b)
- if err != nil {
- t.Fatalf("%d: error reading from stdin: %v", i, err)
- }
- if e, a := expectedStdin, string(b[0:n]); e != a {
- t.Fatalf("%d: stdin: expected to read %v, got %v", i, e, a)
- }
- } else if in != nil {
- t.Fatalf("%d: stdin: expected nil: %#v", i, in)
- }
- if test.stdout {
- if out == nil {
- t.Fatalf("%d: stdout: expected non-nil", i)
- }
- _, err := out.Write([]byte(expectedStdout))
- if err != nil {
- t.Fatalf("%d:, error writing to stdout: %v", i, err)
- }
- out.Close()
- <-clientStdoutReadDone
- } else if out != nil {
- t.Fatalf("%d: stdout: expected nil: %#v", i, out)
- }
- if tty {
- if stderr != nil {
- t.Fatalf("%d: tty set but received non-nil stderr: %v", i, stderr)
- }
- } else if test.stderr {
- if stderr == nil {
- t.Fatalf("%d: stderr: expected non-nil", i)
- }
- _, err := stderr.Write([]byte(expectedStderr))
- if err != nil {
- t.Fatalf("%d:, error writing to stderr: %v", i, err)
- }
- stderr.Close()
- <-clientStderrReadDone
- } else if stderr != nil {
- t.Fatalf("%d: stderr: expected nil: %#v", i, stderr)
- }
- return nil
- }
- fw.fakeKubelet.execFunc = func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool) error {
- execInvoked = true
- if strings.Join(cmd, " ") != expectedCommand {
- t.Fatalf("%d: cmd: expected: %s, got %v", i, expectedCommand, cmd)
- }
- return testStreamFunc(podFullName, uid, containerName, cmd, in, out, stderr, tty, done)
- }
- fw.fakeKubelet.attachFunc = func(podFullName string, uid types.UID, containerName string, in io.Reader, out, stderr io.WriteCloser, tty bool) error {
- attachInvoked = true
- return testStreamFunc(podFullName, uid, containerName, nil, in, out, stderr, tty, done)
- }
- var url string
- if test.uid {
- url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?ignore=1"
- } else {
- url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
- }
- if verb == "exec" {
- url += "&command=ls&command=-a"
- }
- if test.stdin {
- url += "&" + api.ExecStdinParam + "=1"
- }
- if test.stdout {
- url += "&" + api.ExecStdoutParam + "=1"
- }
- if test.stderr && !test.tty {
- url += "&" + api.ExecStderrParam + "=1"
- }
- if test.tty {
- url += "&" + api.ExecTTYParam + "=1"
- }
- var (
- resp *http.Response
- err error
- upgradeRoundTripper httpstream.UpgradeRoundTripper
- c *http.Client
- )
- if test.responseStatusCode != http.StatusSwitchingProtocols {
- c = &http.Client{}
- } else {
- upgradeRoundTripper = spdy.NewRoundTripper(nil)
- c = &http.Client{Transport: upgradeRoundTripper}
- }
- resp, err = c.Post(url, "", nil)
- if err != nil {
- t.Fatalf("%d: Got error POSTing: %v", i, err)
- }
- defer resp.Body.Close()
- _, err = ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Errorf("%d: Error reading response body: %v", i, err)
- }
- if e, a := test.responseStatusCode, resp.StatusCode; e != a {
- t.Fatalf("%d: response status: expected %v, got %v", i, e, a)
- }
- if test.responseStatusCode != http.StatusSwitchingProtocols {
- continue
- }
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatalf("%d: unexpected nil conn", i)
- }
- defer conn.Close()
- h := http.Header{}
- h.Set(api.StreamType, api.StreamTypeError)
- if _, err := conn.CreateStream(h); err != nil {
- t.Fatalf("%d: error creating error stream: %v", i, err)
- }
- if test.stdin {
- h.Set(api.StreamType, api.StreamTypeStdin)
- stream, err := conn.CreateStream(h)
- if err != nil {
- t.Fatalf("%d: error creating stdin stream: %v", i, err)
- }
- _, err = stream.Write([]byte(expectedStdin))
- if err != nil {
- t.Fatalf("%d: error writing to stdin stream: %v", i, err)
- }
- }
- var stdoutStream httpstream.Stream
- if test.stdout {
- h.Set(api.StreamType, api.StreamTypeStdout)
- stdoutStream, err = conn.CreateStream(h)
- if err != nil {
- t.Fatalf("%d: error creating stdout stream: %v", i, err)
- }
- }
- var stderrStream httpstream.Stream
- if test.stderr && !test.tty {
- h.Set(api.StreamType, api.StreamTypeStderr)
- stderrStream, err = conn.CreateStream(h)
- if err != nil {
- t.Fatalf("%d: error creating stderr stream: %v", i, err)
- }
- }
- if test.stdout {
- output := make([]byte, 10)
- n, err := stdoutStream.Read(output)
- close(clientStdoutReadDone)
- if err != nil {
- t.Fatalf("%d: error reading from stdout stream: %v", i, err)
- }
- if e, a := expectedStdout, string(output[0:n]); e != a {
- t.Fatalf("%d: stdout: expected '%v', got '%v'", i, e, a)
- }
- }
- if test.stderr && !test.tty {
- output := make([]byte, 10)
- n, err := stderrStream.Read(output)
- close(clientStderrReadDone)
- if err != nil {
- t.Fatalf("%d: error reading from stderr stream: %v", i, err)
- }
- if e, a := expectedStderr, string(output[0:n]); e != a {
- t.Fatalf("%d: stderr: expected '%v', got '%v'", i, e, a)
- }
- }
- // wait for the server to finish before checking if the attach/exec funcs were invoked
- <-done
- if verb == "exec" {
- if !execInvoked {
- t.Errorf("%d: exec was not invoked", i)
- }
- if attachInvoked {
- t.Errorf("%d: attach should not have been invoked", i)
- }
- } else {
- if !attachInvoked {
- t.Errorf("%d: attach was not invoked", i)
- }
- if execInvoked {
- t.Errorf("%d: exec should not have been invoked", i)
- }
- }
- }
- }
- func TestServeExecInContainer(t *testing.T) {
- testExecAttach(t, "exec")
- }
- func TestServeAttachContainer(t *testing.T) {
- testExecAttach(t, "attach")
- }
- func TestServePortForwardIdleTimeout(t *testing.T) {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
- return 100 * time.Millisecond
- }
- podNamespace := "other"
- podName := "foo"
- url := fw.testHTTPServer.URL + "/portForward/" + podNamespace + "/" + podName
- upgradeRoundTripper := spdy.NewRoundTripper(nil)
- c := &http.Client{Transport: upgradeRoundTripper}
- resp, err := c.Post(url, "", nil)
- if err != nil {
- t.Fatalf("Got error POSTing: %v", err)
- }
- defer resp.Body.Close()
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatal("Unexpected nil connection")
- }
- defer conn.Close()
- <-conn.CloseChan()
- }
- func TestServePortForward(t *testing.T) {
- tests := []struct {
- port string
- uid bool
- clientData string
- containerData string
- shouldError bool
- }{
- {port: "", shouldError: true},
- {port: "abc", shouldError: true},
- {port: "-1", shouldError: true},
- {port: "65536", shouldError: true},
- {port: "0", shouldError: true},
- {port: "1", shouldError: false},
- {port: "8000", shouldError: false},
- {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
- {port: "65535", shouldError: false},
- {port: "65535", uid: true, shouldError: false},
- }
- podNamespace := "other"
- podName := "foo"
- expectedPodName := getPodName(podName, podNamespace)
- expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
- for i, test := range tests {
- fw := newServerTest()
- defer fw.testHTTPServer.Close()
- fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
- return 0
- }
- portForwardFuncDone := make(chan struct{})
- fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
- defer close(portForwardFuncDone)
- if e, a := expectedPodName, name; e != a {
- t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a)
- }
- if e, a := expectedUid, uid; test.uid && e != string(a) {
- t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
- }
- p, err := strconv.ParseUint(test.port, 10, 16)
- if err != nil {
- t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
- }
- if e, a := uint16(p), port; e != a {
- t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
- }
- if test.clientData != "" {
- fromClient := make([]byte, 32)
- n, err := stream.Read(fromClient)
- if err != nil {
- t.Fatalf("%d: error reading client data: %v", i, err)
- }
- if e, a := test.clientData, string(fromClient[0:n]); e != a {
- t.Fatalf("%d: client data: expected to receive '%v', got '%v'", i, e, a)
- }
- }
- if test.containerData != "" {
- _, err := stream.Write([]byte(test.containerData))
- if err != nil {
- t.Fatalf("%d: error writing container data: %v", i, err)
- }
- }
- return nil
- }
- var url string
- if test.uid {
- url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, expectedUid)
- } else {
- url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
- }
- upgradeRoundTripper := spdy.NewRoundTripper(nil)
- c := &http.Client{Transport: upgradeRoundTripper}
- resp, err := c.Post(url, "", nil)
- if err != nil {
- t.Fatalf("%d: Got error POSTing: %v", i, err)
- }
- defer resp.Body.Close()
- conn, err := upgradeRoundTripper.NewConnection(resp)
- if err != nil {
- t.Fatalf("Unexpected error creating streaming connection: %s", err)
- }
- if conn == nil {
- t.Fatalf("%d: Unexpected nil connection", i)
- }
- defer conn.Close()
- headers := http.Header{}
- headers.Set("streamType", "error")
- headers.Set("port", test.port)
- errorStream, err := conn.CreateStream(headers)
- _ = errorStream
- haveErr := err != nil
- if e, a := test.shouldError, haveErr; e != a {
- t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
- }
- if test.shouldError {
- continue
- }
- headers.Set("streamType", "data")
- headers.Set("port", test.port)
- dataStream, err := conn.CreateStream(headers)
- haveErr = err != nil
- if e, a := test.shouldError, haveErr; e != a {
- t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
- }
- if test.clientData != "" {
- _, err := dataStream.Write([]byte(test.clientData))
- if err != nil {
- t.Fatalf("%d: unexpected error writing client data: %v", i, err)
- }
- }
- if test.containerData != "" {
- fromContainer := make([]byte, 32)
- n, err := dataStream.Read(fromContainer)
- if err != nil {
- t.Fatalf("%d: unexpected error reading container data: %v", i, err)
- }
- if e, a := test.containerData, string(fromContainer[0:n]); e != a {
- t.Fatalf("%d: expected to receive '%v' from container, got '%v'", i, e, a)
- }
- }
- <-portForwardFuncDone
- }
- }
- type fakeHttpStream struct {
- headers http.Header
- id uint32
- }
- func newFakeHttpStream() *fakeHttpStream {
- return &fakeHttpStream{
- headers: make(http.Header),
- }
- }
- var _ httpstream.Stream = &fakeHttpStream{}
- func (s *fakeHttpStream) Read(data []byte) (int, error) {
- return 0, nil
- }
- func (s *fakeHttpStream) Write(data []byte) (int, error) {
- return 0, nil
- }
- func (s *fakeHttpStream) Close() error {
- return nil
- }
- func (s *fakeHttpStream) Reset() error {
- return nil
- }
- func (s *fakeHttpStream) Headers() http.Header {
- return s.headers
- }
- func (s *fakeHttpStream) Identifier() uint32 {
- return s.id
- }
- func TestPortForwardStreamReceived(t *testing.T) {
- tests := map[string]struct {
- port string
- streamType string
- expectedError string
- }{
- "missing port": {
- expectedError: `"port" header is required`,
- },
- "unable to parse port": {
- port: "abc",
- expectedError: `unable to parse "abc" as a port: strconv.ParseUint: parsing "abc": invalid syntax`,
- },
- "negative port": {
- port: "-1",
- expectedError: `unable to parse "-1" as a port: strconv.ParseUint: parsing "-1": invalid syntax`,
- },
- "missing stream type": {
- port: "80",
- expectedError: `"streamType" header is required`,
- },
- "valid port with error stream": {
- port: "80",
- streamType: "error",
- },
- "valid port with data stream": {
- port: "80",
- streamType: "data",
- },
- "invalid stream type": {
- port: "80",
- streamType: "foo",
- expectedError: `invalid stream type "foo"`,
- },
- }
- for name, test := range tests {
- streams := make(chan httpstream.Stream, 1)
- f := portForwardStreamReceived(streams)
- stream := newFakeHttpStream()
- if len(test.port) > 0 {
- stream.headers.Set("port", test.port)
- }
- if len(test.streamType) > 0 {
- stream.headers.Set("streamType", test.streamType)
- }
- replySent := make(chan struct{})
- err := f(stream, replySent)
- close(replySent)
- if len(test.expectedError) > 0 {
- if err == nil {
- t.Errorf("%s: expected err=%q, but it was nil", name, test.expectedError)
- }
- if e, a := test.expectedError, err.Error(); e != a {
- t.Errorf("%s: expected err=%q, got %q", name, e, a)
- }
- continue
- }
- if err != nil {
- t.Errorf("%s: unexpected error %v", name, err)
- continue
- }
- if s := <-streams; s != stream {
- t.Errorf("%s: expected stream %#v, got %#v", name, stream, s)
- }
- }
- }
- func TestGetStreamPair(t *testing.T) {
- timeout := make(chan time.Time)
- h := &portForwardStreamHandler{
- streamPairs: make(map[string]*portForwardStreamPair),
- }
- // test adding a new entry
- p, created := h.getStreamPair("1")
- if p == nil {
- t.Fatalf("unexpected nil pair")
- }
- if !created {
- t.Fatal("expected created=true")
- }
- if p.dataStream != nil {
- t.Errorf("unexpected non-nil data stream")
- }
- if p.errorStream != nil {
- t.Errorf("unexpected non-nil error stream")
- }
- // start the monitor for this pair
- monitorDone := make(chan struct{})
- go func() {
- h.monitorStreamPair(p, timeout)
- close(monitorDone)
- }()
- if !h.hasStreamPair("1") {
- t.Fatal("This should still be true")
- }
- // make sure we can retrieve an existing entry
- p2, created := h.getStreamPair("1")
- if created {
- t.Fatal("expected created=false")
- }
- if p != p2 {
- t.Fatalf("retrieving an existing pair: expected %#v, got %#v", p, p2)
- }
- // removed via complete
- dataStream := newFakeHttpStream()
- dataStream.headers.Set(api.StreamType, api.StreamTypeData)
- complete, err := p.add(dataStream)
- if err != nil {
- t.Fatalf("unexpected error adding data stream to pair: %v", err)
- }
- if complete {
- t.Fatalf("unexpected complete")
- }
- errorStream := newFakeHttpStream()
- errorStream.headers.Set(api.StreamType, api.StreamTypeError)
- complete, err = p.add(errorStream)
- if err != nil {
- t.Fatalf("unexpected error adding error stream to pair: %v", err)
- }
- if !complete {
- t.Fatal("unexpected incomplete")
- }
- // make sure monitorStreamPair completed
- <-monitorDone
- // make sure the pair was removed
- if h.hasStreamPair("1") {
- t.Fatal("expected removal of pair after both data and error streams received")
- }
- // removed via timeout
- p, created = h.getStreamPair("2")
- if !created {
- t.Fatal("expected created=true")
- }
- if p == nil {
- t.Fatal("expected p not to be nil")
- }
- monitorDone = make(chan struct{})
- go func() {
- h.monitorStreamPair(p, timeout)
- close(monitorDone)
- }()
- // cause the timeout
- close(timeout)
- // make sure monitorStreamPair completed
- <-monitorDone
- if h.hasStreamPair("2") {
- t.Fatal("expected stream pair to be removed")
- }
- }
- func TestRequestID(t *testing.T) {
- h := &portForwardStreamHandler{}
- s := newFakeHttpStream()
- s.headers.Set(api.StreamType, api.StreamTypeError)
- s.id = 1
- if e, a := "1", h.requestID(s); e != a {
- t.Errorf("expected %q, got %q", e, a)
- }
- s.headers.Set(api.StreamType, api.StreamTypeData)
- s.id = 3
- if e, a := "1", h.requestID(s); e != a {
- t.Errorf("expected %q, got %q", e, a)
- }
- s.id = 7
- s.headers.Set(api.PortForwardRequestIDHeader, "2")
- if e, a := "2", h.requestID(s); e != a {
- t.Errorf("expected %q, got %q", e, a)
- }
- }
|