server_test.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package server
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "net"
  22. "net/http"
  23. "net/http/httptest"
  24. "net/http/httputil"
  25. "reflect"
  26. "strconv"
  27. "strings"
  28. "testing"
  29. "time"
  30. cadvisorapi "github.com/google/cadvisor/info/v1"
  31. cadvisorapiv2 "github.com/google/cadvisor/info/v2"
  32. "k8s.io/kubernetes/pkg/api"
  33. apierrs "k8s.io/kubernetes/pkg/api/errors"
  34. "k8s.io/kubernetes/pkg/auth/authorizer"
  35. "k8s.io/kubernetes/pkg/auth/user"
  36. "k8s.io/kubernetes/pkg/kubelet/cm"
  37. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  38. kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
  39. "k8s.io/kubernetes/pkg/kubelet/server/stats"
  40. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  41. "k8s.io/kubernetes/pkg/types"
  42. "k8s.io/kubernetes/pkg/util/httpstream"
  43. "k8s.io/kubernetes/pkg/util/httpstream/spdy"
  44. "k8s.io/kubernetes/pkg/util/sets"
  45. "k8s.io/kubernetes/pkg/util/term"
  46. "k8s.io/kubernetes/pkg/volume"
  47. )
  48. type fakeKubelet struct {
  49. podByNameFunc func(namespace, name string) (*api.Pod, bool)
  50. containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
  51. rawInfoFunc func(query *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
  52. machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
  53. podsFunc func() []*api.Pod
  54. runningPodsFunc func() ([]*api.Pod, error)
  55. logFunc func(w http.ResponseWriter, req *http.Request)
  56. runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
  57. execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
  58. attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
  59. portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
  60. containerLogsFunc func(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
  61. streamingConnectionIdleTimeoutFunc func() time.Duration
  62. hostnameFunc func() string
  63. resyncInterval time.Duration
  64. loopEntryTime time.Time
  65. plegHealth bool
  66. }
  67. func (fk *fakeKubelet) ResyncInterval() time.Duration {
  68. return fk.resyncInterval
  69. }
  70. func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
  71. return fk.loopEntryTime
  72. }
  73. func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
  74. return fk.podByNameFunc(namespace, name)
  75. }
  76. func (fk *fakeKubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
  77. return fk.containerInfoFunc(podFullName, uid, containerName, req)
  78. }
  79. func (fk *fakeKubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
  80. return fk.rawInfoFunc(req)
  81. }
  82. func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
  83. return fk.machineInfoFunc()
  84. }
  85. func (fk *fakeKubelet) GetPods() []*api.Pod {
  86. return fk.podsFunc()
  87. }
  88. func (fk *fakeKubelet) GetRunningPods() ([]*api.Pod, error) {
  89. return fk.runningPodsFunc()
  90. }
  91. func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
  92. fk.logFunc(w, req)
  93. }
  94. func (fk *fakeKubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
  95. return fk.containerLogsFunc(podFullName, containerName, logOptions, stdout, stderr)
  96. }
  97. func (fk *fakeKubelet) GetHostname() string {
  98. return fk.hostnameFunc()
  99. }
  100. func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
  101. return fk.runFunc(podFullName, uid, containerName, cmd)
  102. }
  103. func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
  104. return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
  105. }
  106. func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
  107. return fk.attachFunc(name, uid, container, in, out, err, tty)
  108. }
  109. func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
  110. return fk.portForwardFunc(name, uid, port, stream)
  111. }
  112. func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
  113. return fk.streamingConnectionIdleTimeoutFunc()
  114. }
  115. func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil }
  116. // Unused functions
  117. func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
  118. return nil, nil
  119. }
  120. func (_ *fakeKubelet) ImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
  121. return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupported Operation ImagesFsInfo")
  122. }
  123. func (_ *fakeKubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
  124. return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupport Operation RootFsInfo")
  125. }
  126. func (_ *fakeKubelet) GetNode() (*api.Node, error) { return nil, nil }
  127. func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
  128. func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
  129. return map[string]volume.Volume{}, true
  130. }
  131. type fakeAuth struct {
  132. authenticateFunc func(*http.Request) (user.Info, bool, error)
  133. attributesFunc func(user.Info, *http.Request) authorizer.Attributes
  134. authorizeFunc func(authorizer.Attributes) (authorized bool, reason string, err error)
  135. }
  136. func (f *fakeAuth) AuthenticateRequest(req *http.Request) (user.Info, bool, error) {
  137. return f.authenticateFunc(req)
  138. }
  139. func (f *fakeAuth) GetRequestAttributes(u user.Info, req *http.Request) authorizer.Attributes {
  140. return f.attributesFunc(u, req)
  141. }
  142. func (f *fakeAuth) Authorize(a authorizer.Attributes) (authorized bool, reason string, err error) {
  143. return f.authorizeFunc(a)
  144. }
  145. type serverTestFramework struct {
  146. serverUnderTest *Server
  147. fakeKubelet *fakeKubelet
  148. fakeAuth *fakeAuth
  149. testHTTPServer *httptest.Server
  150. }
  151. func newServerTest() *serverTestFramework {
  152. fw := &serverTestFramework{}
  153. fw.fakeKubelet = &fakeKubelet{
  154. hostnameFunc: func() string {
  155. return "127.0.0.1"
  156. },
  157. podByNameFunc: func(namespace, name string) (*api.Pod, bool) {
  158. return &api.Pod{
  159. ObjectMeta: api.ObjectMeta{
  160. Namespace: namespace,
  161. Name: name,
  162. },
  163. }, true
  164. },
  165. plegHealth: true,
  166. }
  167. fw.fakeAuth = &fakeAuth{
  168. authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
  169. return &user.DefaultInfo{Name: "test"}, true, nil
  170. },
  171. attributesFunc: func(u user.Info, req *http.Request) authorizer.Attributes {
  172. return &authorizer.AttributesRecord{User: u}
  173. },
  174. authorizeFunc: func(a authorizer.Attributes) (authorized bool, reason string, err error) {
  175. return true, "", nil
  176. },
  177. }
  178. server := NewServer(
  179. fw.fakeKubelet,
  180. stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}),
  181. fw.fakeAuth,
  182. true,
  183. &kubecontainertesting.Mock{})
  184. fw.serverUnderTest = &server
  185. fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
  186. return fw
  187. }
  188. // encodeJSON returns obj marshalled as a JSON string, panicing on any errors
  189. func encodeJSON(obj interface{}) string {
  190. data, err := json.Marshal(obj)
  191. if err != nil {
  192. panic(err)
  193. }
  194. return string(data)
  195. }
  196. func readResp(resp *http.Response) (string, error) {
  197. defer resp.Body.Close()
  198. body, err := ioutil.ReadAll(resp.Body)
  199. return string(body), err
  200. }
  201. // A helper function to return the correct pod name.
  202. func getPodName(name, namespace string) string {
  203. if namespace == "" {
  204. namespace = kubetypes.NamespaceDefault
  205. }
  206. return name + "_" + namespace
  207. }
  208. func TestContainerInfo(t *testing.T) {
  209. fw := newServerTest()
  210. defer fw.testHTTPServer.Close()
  211. expectedInfo := &cadvisorapi.ContainerInfo{}
  212. podID := "somepod"
  213. expectedPodID := getPodName(podID, "")
  214. expectedContainerName := "goodcontainer"
  215. fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
  216. if podID != expectedPodID || containerName != expectedContainerName {
  217. return nil, fmt.Errorf("bad podID or containerName: podID=%v; containerName=%v", podID, containerName)
  218. }
  219. return expectedInfo, nil
  220. }
  221. resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v", podID, expectedContainerName))
  222. if err != nil {
  223. t.Fatalf("Got error GETing: %v", err)
  224. }
  225. defer resp.Body.Close()
  226. var receivedInfo cadvisorapi.ContainerInfo
  227. err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
  228. if err != nil {
  229. t.Fatalf("received invalid json data: %v", err)
  230. }
  231. if !receivedInfo.Eq(expectedInfo) {
  232. t.Errorf("received wrong data: %#v", receivedInfo)
  233. }
  234. }
  235. func TestContainerInfoWithUidNamespace(t *testing.T) {
  236. fw := newServerTest()
  237. defer fw.testHTTPServer.Close()
  238. expectedInfo := &cadvisorapi.ContainerInfo{}
  239. podID := "somepod"
  240. expectedNamespace := "custom"
  241. expectedPodID := getPodName(podID, expectedNamespace)
  242. expectedContainerName := "goodcontainer"
  243. expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
  244. fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
  245. if podID != expectedPodID || string(uid) != expectedUid || containerName != expectedContainerName {
  246. return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName)
  247. }
  248. return expectedInfo, nil
  249. }
  250. resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
  251. if err != nil {
  252. t.Fatalf("Got error GETing: %v", err)
  253. }
  254. defer resp.Body.Close()
  255. var receivedInfo cadvisorapi.ContainerInfo
  256. err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
  257. if err != nil {
  258. t.Fatalf("received invalid json data: %v", err)
  259. }
  260. if !receivedInfo.Eq(expectedInfo) {
  261. t.Errorf("received wrong data: %#v", receivedInfo)
  262. }
  263. }
  264. func TestContainerNotFound(t *testing.T) {
  265. fw := newServerTest()
  266. defer fw.testHTTPServer.Close()
  267. podID := "somepod"
  268. expectedNamespace := "custom"
  269. expectedContainerName := "slowstartcontainer"
  270. expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
  271. fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
  272. return nil, kubecontainer.ErrContainerNotFound
  273. }
  274. resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
  275. if err != nil {
  276. t.Fatalf("Got error GETing: %v", err)
  277. }
  278. if resp.StatusCode != http.StatusNotFound {
  279. t.Fatalf("Received status %d expecting %d", resp.StatusCode, http.StatusNotFound)
  280. }
  281. defer resp.Body.Close()
  282. }
  283. func TestRootInfo(t *testing.T) {
  284. fw := newServerTest()
  285. defer fw.testHTTPServer.Close()
  286. expectedInfo := &cadvisorapi.ContainerInfo{
  287. ContainerReference: cadvisorapi.ContainerReference{
  288. Name: "/",
  289. },
  290. }
  291. fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
  292. return map[string]*cadvisorapi.ContainerInfo{
  293. expectedInfo.Name: expectedInfo,
  294. }, nil
  295. }
  296. resp, err := http.Get(fw.testHTTPServer.URL + "/stats")
  297. if err != nil {
  298. t.Fatalf("Got error GETing: %v", err)
  299. }
  300. defer resp.Body.Close()
  301. var receivedInfo cadvisorapi.ContainerInfo
  302. err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
  303. if err != nil {
  304. t.Fatalf("received invalid json data: %v", err)
  305. }
  306. if !receivedInfo.Eq(expectedInfo) {
  307. t.Errorf("received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
  308. }
  309. }
  310. func TestSubcontainerContainerInfo(t *testing.T) {
  311. fw := newServerTest()
  312. defer fw.testHTTPServer.Close()
  313. const kubeletContainer = "/kubelet"
  314. const kubeletSubContainer = "/kubelet/sub"
  315. expectedInfo := map[string]*cadvisorapi.ContainerInfo{
  316. kubeletContainer: {
  317. ContainerReference: cadvisorapi.ContainerReference{
  318. Name: kubeletContainer,
  319. },
  320. },
  321. kubeletSubContainer: {
  322. ContainerReference: cadvisorapi.ContainerReference{
  323. Name: kubeletSubContainer,
  324. },
  325. },
  326. }
  327. fw.fakeKubelet.rawInfoFunc = func(req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
  328. return expectedInfo, nil
  329. }
  330. request := fmt.Sprintf("{\"containerName\":%q, \"subcontainers\": true}", kubeletContainer)
  331. resp, err := http.Post(fw.testHTTPServer.URL+"/stats/container", "application/json", bytes.NewBuffer([]byte(request)))
  332. if err != nil {
  333. t.Fatalf("Got error GETing: %v", err)
  334. }
  335. defer resp.Body.Close()
  336. var receivedInfo map[string]*cadvisorapi.ContainerInfo
  337. err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
  338. if err != nil {
  339. t.Fatalf("Received invalid json data: %v", err)
  340. }
  341. if len(receivedInfo) != len(expectedInfo) {
  342. t.Errorf("Received wrong data: %#v, expected %#v", receivedInfo, expectedInfo)
  343. }
  344. for _, containerName := range []string{kubeletContainer, kubeletSubContainer} {
  345. if _, ok := receivedInfo[containerName]; !ok {
  346. t.Errorf("Expected container %q to be present in result: %#v", containerName, receivedInfo)
  347. }
  348. if !receivedInfo[containerName].Eq(expectedInfo[containerName]) {
  349. t.Errorf("Invalid result for %q: Expected %#v, received %#v", containerName, expectedInfo[containerName], receivedInfo[containerName])
  350. }
  351. }
  352. }
  353. func TestMachineInfo(t *testing.T) {
  354. fw := newServerTest()
  355. defer fw.testHTTPServer.Close()
  356. expectedInfo := &cadvisorapi.MachineInfo{
  357. NumCores: 4,
  358. MemoryCapacity: 1024,
  359. }
  360. fw.fakeKubelet.machineInfoFunc = func() (*cadvisorapi.MachineInfo, error) {
  361. return expectedInfo, nil
  362. }
  363. resp, err := http.Get(fw.testHTTPServer.URL + "/spec")
  364. if err != nil {
  365. t.Fatalf("Got error GETing: %v", err)
  366. }
  367. defer resp.Body.Close()
  368. var receivedInfo cadvisorapi.MachineInfo
  369. err = json.NewDecoder(resp.Body).Decode(&receivedInfo)
  370. if err != nil {
  371. t.Fatalf("received invalid json data: %v", err)
  372. }
  373. if !reflect.DeepEqual(&receivedInfo, expectedInfo) {
  374. t.Errorf("received wrong data: %#v", receivedInfo)
  375. }
  376. }
  377. func TestServeLogs(t *testing.T) {
  378. fw := newServerTest()
  379. defer fw.testHTTPServer.Close()
  380. content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
  381. fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
  382. w.WriteHeader(http.StatusOK)
  383. w.Header().Add("Content-Type", "text/html")
  384. w.Write([]byte(content))
  385. }
  386. resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
  387. if err != nil {
  388. t.Fatalf("Got error GETing: %v", err)
  389. }
  390. defer resp.Body.Close()
  391. body, err := httputil.DumpResponse(resp, true)
  392. if err != nil {
  393. // copying the response body did not work
  394. t.Errorf("Cannot copy resp: %#v", err)
  395. }
  396. result := string(body)
  397. if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
  398. t.Errorf("Received wrong data: %s", result)
  399. }
  400. }
  401. func TestServeRunInContainer(t *testing.T) {
  402. fw := newServerTest()
  403. defer fw.testHTTPServer.Close()
  404. output := "foo bar"
  405. podNamespace := "other"
  406. podName := "foo"
  407. expectedPodName := getPodName(podName, podNamespace)
  408. expectedContainerName := "baz"
  409. expectedCommand := "ls -a"
  410. fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
  411. if podFullName != expectedPodName {
  412. t.Errorf("expected %s, got %s", expectedPodName, podFullName)
  413. }
  414. if containerName != expectedContainerName {
  415. t.Errorf("expected %s, got %s", expectedContainerName, containerName)
  416. }
  417. if strings.Join(cmd, " ") != expectedCommand {
  418. t.Errorf("expected: %s, got %v", expectedCommand, cmd)
  419. }
  420. return []byte(output), nil
  421. }
  422. resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
  423. if err != nil {
  424. t.Fatalf("Got error POSTing: %v", err)
  425. }
  426. defer resp.Body.Close()
  427. body, err := ioutil.ReadAll(resp.Body)
  428. if err != nil {
  429. // copying the response body did not work
  430. t.Errorf("Cannot copy resp: %#v", err)
  431. }
  432. result := string(body)
  433. if result != output {
  434. t.Errorf("expected %s, got %s", output, result)
  435. }
  436. }
  437. func TestServeRunInContainerWithUID(t *testing.T) {
  438. fw := newServerTest()
  439. defer fw.testHTTPServer.Close()
  440. output := "foo bar"
  441. podNamespace := "other"
  442. podName := "foo"
  443. expectedPodName := getPodName(podName, podNamespace)
  444. expectedUID := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720"
  445. expectedContainerName := "baz"
  446. expectedCommand := "ls -a"
  447. fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
  448. if podFullName != expectedPodName {
  449. t.Errorf("expected %s, got %s", expectedPodName, podFullName)
  450. }
  451. if string(uid) != expectedUID {
  452. t.Errorf("expected %s, got %s", expectedUID, uid)
  453. }
  454. if containerName != expectedContainerName {
  455. t.Errorf("expected %s, got %s", expectedContainerName, containerName)
  456. }
  457. if strings.Join(cmd, " ") != expectedCommand {
  458. t.Errorf("expected: %s, got %v", expectedCommand, cmd)
  459. }
  460. return []byte(output), nil
  461. }
  462. resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
  463. if err != nil {
  464. t.Fatalf("Got error POSTing: %v", err)
  465. }
  466. defer resp.Body.Close()
  467. body, err := ioutil.ReadAll(resp.Body)
  468. if err != nil {
  469. // copying the response body did not work
  470. t.Errorf("Cannot copy resp: %#v", err)
  471. }
  472. result := string(body)
  473. if result != output {
  474. t.Errorf("expected %s, got %s", output, result)
  475. }
  476. }
  477. func TestHealthCheck(t *testing.T) {
  478. fw := newServerTest()
  479. defer fw.testHTTPServer.Close()
  480. fw.fakeKubelet.hostnameFunc = func() string {
  481. return "127.0.0.1"
  482. }
  483. // Test with correct hostname, Docker version
  484. assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
  485. // Test with incorrect hostname
  486. fw.fakeKubelet.hostnameFunc = func() string {
  487. return "fake"
  488. }
  489. assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
  490. }
  491. func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
  492. resp, err := http.Get(httpURL)
  493. if err != nil {
  494. t.Fatalf("Got error GETing: %v", err)
  495. }
  496. defer resp.Body.Close()
  497. if resp.StatusCode != expectedErrorCode {
  498. t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
  499. }
  500. }
  501. type authTestCase struct {
  502. Method string
  503. Path string
  504. }
  505. func TestAuthFilters(t *testing.T) {
  506. fw := newServerTest()
  507. defer fw.testHTTPServer.Close()
  508. testcases := []authTestCase{}
  509. // This is a sanity check that the Handle->HandleWithFilter() delegation is working
  510. // Ideally, these would move to registered web services and this list would get shorter
  511. expectedPaths := []string{"/healthz", "/metrics"}
  512. paths := sets.NewString(fw.serverUnderTest.restfulCont.RegisteredHandlePaths()...)
  513. for _, expectedPath := range expectedPaths {
  514. if !paths.Has(expectedPath) {
  515. t.Errorf("Expected registered handle path %s was missing", expectedPath)
  516. }
  517. }
  518. // Test all the non-web-service handlers
  519. for _, path := range fw.serverUnderTest.restfulCont.RegisteredHandlePaths() {
  520. testcases = append(testcases, authTestCase{"GET", path})
  521. testcases = append(testcases, authTestCase{"POST", path})
  522. // Test subpaths for directory handlers
  523. if strings.HasSuffix(path, "/") {
  524. testcases = append(testcases, authTestCase{"GET", path + "foo"})
  525. testcases = append(testcases, authTestCase{"POST", path + "foo"})
  526. }
  527. }
  528. // Test all the generated web-service paths
  529. for _, ws := range fw.serverUnderTest.restfulCont.RegisteredWebServices() {
  530. for _, r := range ws.Routes() {
  531. testcases = append(testcases, authTestCase{r.Method, r.Path})
  532. }
  533. }
  534. for _, tc := range testcases {
  535. var (
  536. expectedUser = &user.DefaultInfo{Name: "test"}
  537. expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
  538. calledAuthenticate = false
  539. calledAuthorize = false
  540. calledAttributes = false
  541. )
  542. fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
  543. calledAuthenticate = true
  544. return expectedUser, true, nil
  545. }
  546. fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
  547. calledAttributes = true
  548. if u != expectedUser {
  549. t.Fatalf("%s: expected user %v, got %v", tc.Path, expectedUser, u)
  550. }
  551. return expectedAttributes
  552. }
  553. fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
  554. calledAuthorize = true
  555. if a != expectedAttributes {
  556. t.Fatalf("%s: expected attributes %v, got %v", tc.Path, expectedAttributes, a)
  557. }
  558. return false, "", nil
  559. }
  560. req, err := http.NewRequest(tc.Method, fw.testHTTPServer.URL+tc.Path, nil)
  561. if err != nil {
  562. t.Errorf("%s: unexpected error: %v", tc.Path, err)
  563. continue
  564. }
  565. resp, err := http.DefaultClient.Do(req)
  566. if err != nil {
  567. t.Errorf("%s: unexpected error: %v", tc.Path, err)
  568. continue
  569. }
  570. defer resp.Body.Close()
  571. if resp.StatusCode != http.StatusForbidden {
  572. t.Errorf("%s: unexpected status code %d", tc.Path, resp.StatusCode)
  573. continue
  574. }
  575. if !calledAuthenticate {
  576. t.Errorf("%s: Authenticate was not called", tc.Path)
  577. continue
  578. }
  579. if !calledAttributes {
  580. t.Errorf("%s: Attributes were not called", tc.Path)
  581. continue
  582. }
  583. if !calledAuthorize {
  584. t.Errorf("%s: Authorize was not called", tc.Path)
  585. continue
  586. }
  587. }
  588. }
  589. func TestAuthenticationError(t *testing.T) {
  590. var (
  591. expectedUser = &user.DefaultInfo{Name: "test"}
  592. expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
  593. calledAuthenticate = false
  594. calledAuthorize = false
  595. calledAttributes = false
  596. )
  597. fw := newServerTest()
  598. defer fw.testHTTPServer.Close()
  599. fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
  600. calledAuthenticate = true
  601. return expectedUser, true, nil
  602. }
  603. fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
  604. calledAttributes = true
  605. return expectedAttributes
  606. }
  607. fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
  608. calledAuthorize = true
  609. return false, "", errors.New("Failed")
  610. }
  611. assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
  612. if !calledAuthenticate {
  613. t.Fatalf("Authenticate was not called")
  614. }
  615. if !calledAttributes {
  616. t.Fatalf("Attributes was not called")
  617. }
  618. if !calledAuthorize {
  619. t.Fatalf("Authorize was not called")
  620. }
  621. }
  622. func TestAuthenticationFailure(t *testing.T) {
  623. var (
  624. expectedUser = &user.DefaultInfo{Name: "test"}
  625. expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
  626. calledAuthenticate = false
  627. calledAuthorize = false
  628. calledAttributes = false
  629. )
  630. fw := newServerTest()
  631. defer fw.testHTTPServer.Close()
  632. fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
  633. calledAuthenticate = true
  634. return nil, false, nil
  635. }
  636. fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
  637. calledAttributes = true
  638. return expectedAttributes
  639. }
  640. fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
  641. calledAuthorize = true
  642. return false, "", nil
  643. }
  644. assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusUnauthorized)
  645. if !calledAuthenticate {
  646. t.Fatalf("Authenticate was not called")
  647. }
  648. if calledAttributes {
  649. t.Fatalf("Attributes was called unexpectedly")
  650. }
  651. if calledAuthorize {
  652. t.Fatalf("Authorize was called unexpectedly")
  653. }
  654. }
  655. func TestAuthorizationSuccess(t *testing.T) {
  656. var (
  657. expectedUser = &user.DefaultInfo{Name: "test"}
  658. expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
  659. calledAuthenticate = false
  660. calledAuthorize = false
  661. calledAttributes = false
  662. )
  663. fw := newServerTest()
  664. defer fw.testHTTPServer.Close()
  665. fw.fakeAuth.authenticateFunc = func(req *http.Request) (user.Info, bool, error) {
  666. calledAuthenticate = true
  667. return expectedUser, true, nil
  668. }
  669. fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
  670. calledAttributes = true
  671. return expectedAttributes
  672. }
  673. fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (authorized bool, reason string, err error) {
  674. calledAuthorize = true
  675. return true, "", nil
  676. }
  677. assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
  678. if !calledAuthenticate {
  679. t.Fatalf("Authenticate was not called")
  680. }
  681. if !calledAttributes {
  682. t.Fatalf("Attributes were not called")
  683. }
  684. if !calledAuthorize {
  685. t.Fatalf("Authorize was not called")
  686. }
  687. }
  688. func TestSyncLoopCheck(t *testing.T) {
  689. fw := newServerTest()
  690. defer fw.testHTTPServer.Close()
  691. fw.fakeKubelet.hostnameFunc = func() string {
  692. return "127.0.0.1"
  693. }
  694. fw.fakeKubelet.resyncInterval = time.Minute
  695. fw.fakeKubelet.loopEntryTime = time.Now()
  696. // Test with correct hostname, Docker version
  697. assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
  698. fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
  699. assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
  700. }
  701. func TestPLEGHealthCheck(t *testing.T) {
  702. fw := newServerTest()
  703. defer fw.testHTTPServer.Close()
  704. fw.fakeKubelet.hostnameFunc = func() string {
  705. return "127.0.0.1"
  706. }
  707. // Test with failed pleg health check.
  708. fw.fakeKubelet.plegHealth = false
  709. assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
  710. }
  711. // returns http response status code from the HTTP GET
  712. func assertHealthIsOk(t *testing.T, httpURL string) {
  713. resp, err := http.Get(httpURL)
  714. if err != nil {
  715. t.Fatalf("Got error GETing: %v", err)
  716. }
  717. defer resp.Body.Close()
  718. if resp.StatusCode != http.StatusOK {
  719. t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
  720. }
  721. body, readErr := ioutil.ReadAll(resp.Body)
  722. if readErr != nil {
  723. // copying the response body did not work
  724. t.Fatalf("Cannot copy resp: %#v", readErr)
  725. }
  726. result := string(body)
  727. if !strings.Contains(result, "ok") {
  728. t.Errorf("expected body contains ok, got %s", result)
  729. }
  730. }
  731. func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {
  732. fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*api.Pod, bool) {
  733. return &api.Pod{
  734. ObjectMeta: api.ObjectMeta{
  735. Namespace: namespace,
  736. Name: pod,
  737. },
  738. Spec: api.PodSpec{
  739. Containers: []api.Container{
  740. {
  741. Name: container,
  742. },
  743. },
  744. },
  745. }, true
  746. }
  747. }
  748. func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *api.PodLogOptions, output string) {
  749. fw.fakeKubelet.containerLogsFunc = func(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
  750. if podFullName != expectedPodName {
  751. t.Errorf("expected %s, got %s", expectedPodName, podFullName)
  752. }
  753. if containerName != expectedContainerName {
  754. t.Errorf("expected %s, got %s", expectedContainerName, containerName)
  755. }
  756. if !reflect.DeepEqual(expectedLogOptions, logOptions) {
  757. t.Errorf("expected %#v, got %#v", expectedLogOptions, logOptions)
  758. }
  759. io.WriteString(stdout, output)
  760. return nil
  761. }
  762. }
  763. // TODO: I really want to be a table driven test
  764. func TestContainerLogs(t *testing.T) {
  765. fw := newServerTest()
  766. defer fw.testHTTPServer.Close()
  767. output := "foo bar"
  768. podNamespace := "other"
  769. podName := "foo"
  770. expectedPodName := getPodName(podName, podNamespace)
  771. expectedContainerName := "baz"
  772. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  773. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
  774. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName)
  775. if err != nil {
  776. t.Errorf("Got error GETing: %v", err)
  777. }
  778. defer resp.Body.Close()
  779. body, err := ioutil.ReadAll(resp.Body)
  780. if err != nil {
  781. t.Errorf("Error reading container logs: %v", err)
  782. }
  783. result := string(body)
  784. if result != output {
  785. t.Errorf("Expected: '%v', got: '%v'", output, result)
  786. }
  787. }
  788. func TestContainerLogsWithLimitBytes(t *testing.T) {
  789. fw := newServerTest()
  790. defer fw.testHTTPServer.Close()
  791. output := "foo bar"
  792. podNamespace := "other"
  793. podName := "foo"
  794. expectedPodName := getPodName(podName, podNamespace)
  795. expectedContainerName := "baz"
  796. bytes := int64(3)
  797. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  798. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{LimitBytes: &bytes}, output)
  799. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?limitBytes=3")
  800. if err != nil {
  801. t.Errorf("Got error GETing: %v", err)
  802. }
  803. defer resp.Body.Close()
  804. body, err := ioutil.ReadAll(resp.Body)
  805. if err != nil {
  806. t.Errorf("Error reading container logs: %v", err)
  807. }
  808. result := string(body)
  809. if result != output[:bytes] {
  810. t.Errorf("Expected: '%v', got: '%v'", output[:bytes], result)
  811. }
  812. }
  813. func TestContainerLogsWithTail(t *testing.T) {
  814. fw := newServerTest()
  815. defer fw.testHTTPServer.Close()
  816. output := "foo bar"
  817. podNamespace := "other"
  818. podName := "foo"
  819. expectedPodName := getPodName(podName, podNamespace)
  820. expectedContainerName := "baz"
  821. expectedTail := int64(5)
  822. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  823. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{TailLines: &expectedTail}, output)
  824. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tailLines=5")
  825. if err != nil {
  826. t.Errorf("Got error GETing: %v", err)
  827. }
  828. defer resp.Body.Close()
  829. body, err := ioutil.ReadAll(resp.Body)
  830. if err != nil {
  831. t.Errorf("Error reading container logs: %v", err)
  832. }
  833. result := string(body)
  834. if result != output {
  835. t.Errorf("Expected: '%v', got: '%v'", output, result)
  836. }
  837. }
  838. func TestContainerLogsWithLegacyTail(t *testing.T) {
  839. fw := newServerTest()
  840. defer fw.testHTTPServer.Close()
  841. output := "foo bar"
  842. podNamespace := "other"
  843. podName := "foo"
  844. expectedPodName := getPodName(podName, podNamespace)
  845. expectedContainerName := "baz"
  846. expectedTail := int64(5)
  847. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  848. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{TailLines: &expectedTail}, output)
  849. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=5")
  850. if err != nil {
  851. t.Errorf("Got error GETing: %v", err)
  852. }
  853. defer resp.Body.Close()
  854. body, err := ioutil.ReadAll(resp.Body)
  855. if err != nil {
  856. t.Errorf("Error reading container logs: %v", err)
  857. }
  858. result := string(body)
  859. if result != output {
  860. t.Errorf("Expected: '%v', got: '%v'", output, result)
  861. }
  862. }
  863. func TestContainerLogsWithTailAll(t *testing.T) {
  864. fw := newServerTest()
  865. defer fw.testHTTPServer.Close()
  866. output := "foo bar"
  867. podNamespace := "other"
  868. podName := "foo"
  869. expectedPodName := getPodName(podName, podNamespace)
  870. expectedContainerName := "baz"
  871. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  872. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
  873. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=all")
  874. if err != nil {
  875. t.Errorf("Got error GETing: %v", err)
  876. }
  877. defer resp.Body.Close()
  878. body, err := ioutil.ReadAll(resp.Body)
  879. if err != nil {
  880. t.Errorf("Error reading container logs: %v", err)
  881. }
  882. result := string(body)
  883. if result != output {
  884. t.Errorf("Expected: '%v', got: '%v'", output, result)
  885. }
  886. }
  887. func TestContainerLogsWithInvalidTail(t *testing.T) {
  888. fw := newServerTest()
  889. defer fw.testHTTPServer.Close()
  890. output := "foo bar"
  891. podNamespace := "other"
  892. podName := "foo"
  893. expectedPodName := getPodName(podName, podNamespace)
  894. expectedContainerName := "baz"
  895. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  896. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{}, output)
  897. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=-1")
  898. if err != nil {
  899. t.Errorf("Got error GETing: %v", err)
  900. }
  901. defer resp.Body.Close()
  902. if resp.StatusCode != apierrs.StatusUnprocessableEntity {
  903. t.Errorf("Unexpected non-error reading container logs: %#v", resp)
  904. }
  905. }
  906. func TestContainerLogsWithFollow(t *testing.T) {
  907. fw := newServerTest()
  908. defer fw.testHTTPServer.Close()
  909. output := "foo bar"
  910. podNamespace := "other"
  911. podName := "foo"
  912. expectedPodName := getPodName(podName, podNamespace)
  913. expectedContainerName := "baz"
  914. setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
  915. setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &api.PodLogOptions{Follow: true}, output)
  916. resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?follow=1")
  917. if err != nil {
  918. t.Errorf("Got error GETing: %v", err)
  919. }
  920. defer resp.Body.Close()
  921. body, err := ioutil.ReadAll(resp.Body)
  922. if err != nil {
  923. t.Errorf("Error reading container logs: %v", err)
  924. }
  925. result := string(body)
  926. if result != output {
  927. t.Errorf("Expected: '%v', got: '%v'", output, result)
  928. }
  929. }
  930. func TestServeExecInContainerIdleTimeout(t *testing.T) {
  931. fw := newServerTest()
  932. defer fw.testHTTPServer.Close()
  933. fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
  934. return 100 * time.Millisecond
  935. }
  936. podNamespace := "other"
  937. podName := "foo"
  938. expectedContainerName := "baz"
  939. url := fw.testHTTPServer.URL + "/exec/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?c=ls&c=-a&" + api.ExecStdinParam + "=1"
  940. upgradeRoundTripper := spdy.NewSpdyRoundTripper(nil)
  941. c := &http.Client{Transport: upgradeRoundTripper}
  942. resp, err := c.Post(url, "", nil)
  943. if err != nil {
  944. t.Fatalf("Got error POSTing: %v", err)
  945. }
  946. defer resp.Body.Close()
  947. upgradeRoundTripper.Dialer = &net.Dialer{
  948. Deadline: time.Now().Add(60 * time.Second),
  949. Timeout: 60 * time.Second,
  950. }
  951. conn, err := upgradeRoundTripper.NewConnection(resp)
  952. if err != nil {
  953. t.Fatalf("Unexpected error creating streaming connection: %s", err)
  954. }
  955. if conn == nil {
  956. t.Fatal("Unexpected nil connection")
  957. }
  958. <-conn.CloseChan()
  959. }
  960. func testExecAttach(t *testing.T, verb string) {
  961. tests := []struct {
  962. stdin bool
  963. stdout bool
  964. stderr bool
  965. tty bool
  966. responseStatusCode int
  967. uid bool
  968. }{
  969. {responseStatusCode: http.StatusBadRequest},
  970. {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
  971. {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
  972. {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  973. {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  974. {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
  975. {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  976. }
  977. for i, test := range tests {
  978. fw := newServerTest()
  979. defer fw.testHTTPServer.Close()
  980. fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
  981. return 0
  982. }
  983. podNamespace := "other"
  984. podName := "foo"
  985. expectedPodName := getPodName(podName, podNamespace)
  986. expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
  987. expectedContainerName := "baz"
  988. expectedCommand := "ls -a"
  989. expectedStdin := "stdin"
  990. expectedStdout := "stdout"
  991. expectedStderr := "stderr"
  992. done := make(chan struct{})
  993. clientStdoutReadDone := make(chan struct{})
  994. clientStderrReadDone := make(chan struct{})
  995. execInvoked := false
  996. attachInvoked := false
  997. testStreamFunc := func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
  998. defer close(done)
  999. if podFullName != expectedPodName {
  1000. t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName)
  1001. }
  1002. if test.uid && string(uid) != expectedUid {
  1003. t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid)
  1004. }
  1005. if containerName != expectedContainerName {
  1006. t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
  1007. }
  1008. if test.stdin {
  1009. if in == nil {
  1010. t.Fatalf("%d: stdin: expected non-nil", i)
  1011. }
  1012. b := make([]byte, 10)
  1013. n, err := in.Read(b)
  1014. if err != nil {
  1015. t.Fatalf("%d: error reading from stdin: %v", i, err)
  1016. }
  1017. if e, a := expectedStdin, string(b[0:n]); e != a {
  1018. t.Fatalf("%d: stdin: expected to read %v, got %v", i, e, a)
  1019. }
  1020. } else if in != nil {
  1021. t.Fatalf("%d: stdin: expected nil: %#v", i, in)
  1022. }
  1023. if test.stdout {
  1024. if out == nil {
  1025. t.Fatalf("%d: stdout: expected non-nil", i)
  1026. }
  1027. _, err := out.Write([]byte(expectedStdout))
  1028. if err != nil {
  1029. t.Fatalf("%d:, error writing to stdout: %v", i, err)
  1030. }
  1031. out.Close()
  1032. <-clientStdoutReadDone
  1033. } else if out != nil {
  1034. t.Fatalf("%d: stdout: expected nil: %#v", i, out)
  1035. }
  1036. if tty {
  1037. if stderr != nil {
  1038. t.Fatalf("%d: tty set but received non-nil stderr: %v", i, stderr)
  1039. }
  1040. } else if test.stderr {
  1041. if stderr == nil {
  1042. t.Fatalf("%d: stderr: expected non-nil", i)
  1043. }
  1044. _, err := stderr.Write([]byte(expectedStderr))
  1045. if err != nil {
  1046. t.Fatalf("%d:, error writing to stderr: %v", i, err)
  1047. }
  1048. stderr.Close()
  1049. <-clientStderrReadDone
  1050. } else if stderr != nil {
  1051. t.Fatalf("%d: stderr: expected nil: %#v", i, stderr)
  1052. }
  1053. return nil
  1054. }
  1055. fw.fakeKubelet.execFunc = func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool) error {
  1056. execInvoked = true
  1057. if strings.Join(cmd, " ") != expectedCommand {
  1058. t.Fatalf("%d: cmd: expected: %s, got %v", i, expectedCommand, cmd)
  1059. }
  1060. return testStreamFunc(podFullName, uid, containerName, cmd, in, out, stderr, tty, done)
  1061. }
  1062. fw.fakeKubelet.attachFunc = func(podFullName string, uid types.UID, containerName string, in io.Reader, out, stderr io.WriteCloser, tty bool) error {
  1063. attachInvoked = true
  1064. return testStreamFunc(podFullName, uid, containerName, nil, in, out, stderr, tty, done)
  1065. }
  1066. var url string
  1067. if test.uid {
  1068. url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?ignore=1"
  1069. } else {
  1070. url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
  1071. }
  1072. if verb == "exec" {
  1073. url += "&command=ls&command=-a"
  1074. }
  1075. if test.stdin {
  1076. url += "&" + api.ExecStdinParam + "=1"
  1077. }
  1078. if test.stdout {
  1079. url += "&" + api.ExecStdoutParam + "=1"
  1080. }
  1081. if test.stderr && !test.tty {
  1082. url += "&" + api.ExecStderrParam + "=1"
  1083. }
  1084. if test.tty {
  1085. url += "&" + api.ExecTTYParam + "=1"
  1086. }
  1087. var (
  1088. resp *http.Response
  1089. err error
  1090. upgradeRoundTripper httpstream.UpgradeRoundTripper
  1091. c *http.Client
  1092. )
  1093. if test.responseStatusCode != http.StatusSwitchingProtocols {
  1094. c = &http.Client{}
  1095. } else {
  1096. upgradeRoundTripper = spdy.NewRoundTripper(nil)
  1097. c = &http.Client{Transport: upgradeRoundTripper}
  1098. }
  1099. resp, err = c.Post(url, "", nil)
  1100. if err != nil {
  1101. t.Fatalf("%d: Got error POSTing: %v", i, err)
  1102. }
  1103. defer resp.Body.Close()
  1104. _, err = ioutil.ReadAll(resp.Body)
  1105. if err != nil {
  1106. t.Errorf("%d: Error reading response body: %v", i, err)
  1107. }
  1108. if e, a := test.responseStatusCode, resp.StatusCode; e != a {
  1109. t.Fatalf("%d: response status: expected %v, got %v", i, e, a)
  1110. }
  1111. if test.responseStatusCode != http.StatusSwitchingProtocols {
  1112. continue
  1113. }
  1114. conn, err := upgradeRoundTripper.NewConnection(resp)
  1115. if err != nil {
  1116. t.Fatalf("Unexpected error creating streaming connection: %s", err)
  1117. }
  1118. if conn == nil {
  1119. t.Fatalf("%d: unexpected nil conn", i)
  1120. }
  1121. defer conn.Close()
  1122. h := http.Header{}
  1123. h.Set(api.StreamType, api.StreamTypeError)
  1124. if _, err := conn.CreateStream(h); err != nil {
  1125. t.Fatalf("%d: error creating error stream: %v", i, err)
  1126. }
  1127. if test.stdin {
  1128. h.Set(api.StreamType, api.StreamTypeStdin)
  1129. stream, err := conn.CreateStream(h)
  1130. if err != nil {
  1131. t.Fatalf("%d: error creating stdin stream: %v", i, err)
  1132. }
  1133. _, err = stream.Write([]byte(expectedStdin))
  1134. if err != nil {
  1135. t.Fatalf("%d: error writing to stdin stream: %v", i, err)
  1136. }
  1137. }
  1138. var stdoutStream httpstream.Stream
  1139. if test.stdout {
  1140. h.Set(api.StreamType, api.StreamTypeStdout)
  1141. stdoutStream, err = conn.CreateStream(h)
  1142. if err != nil {
  1143. t.Fatalf("%d: error creating stdout stream: %v", i, err)
  1144. }
  1145. }
  1146. var stderrStream httpstream.Stream
  1147. if test.stderr && !test.tty {
  1148. h.Set(api.StreamType, api.StreamTypeStderr)
  1149. stderrStream, err = conn.CreateStream(h)
  1150. if err != nil {
  1151. t.Fatalf("%d: error creating stderr stream: %v", i, err)
  1152. }
  1153. }
  1154. if test.stdout {
  1155. output := make([]byte, 10)
  1156. n, err := stdoutStream.Read(output)
  1157. close(clientStdoutReadDone)
  1158. if err != nil {
  1159. t.Fatalf("%d: error reading from stdout stream: %v", i, err)
  1160. }
  1161. if e, a := expectedStdout, string(output[0:n]); e != a {
  1162. t.Fatalf("%d: stdout: expected '%v', got '%v'", i, e, a)
  1163. }
  1164. }
  1165. if test.stderr && !test.tty {
  1166. output := make([]byte, 10)
  1167. n, err := stderrStream.Read(output)
  1168. close(clientStderrReadDone)
  1169. if err != nil {
  1170. t.Fatalf("%d: error reading from stderr stream: %v", i, err)
  1171. }
  1172. if e, a := expectedStderr, string(output[0:n]); e != a {
  1173. t.Fatalf("%d: stderr: expected '%v', got '%v'", i, e, a)
  1174. }
  1175. }
  1176. // wait for the server to finish before checking if the attach/exec funcs were invoked
  1177. <-done
  1178. if verb == "exec" {
  1179. if !execInvoked {
  1180. t.Errorf("%d: exec was not invoked", i)
  1181. }
  1182. if attachInvoked {
  1183. t.Errorf("%d: attach should not have been invoked", i)
  1184. }
  1185. } else {
  1186. if !attachInvoked {
  1187. t.Errorf("%d: attach was not invoked", i)
  1188. }
  1189. if execInvoked {
  1190. t.Errorf("%d: exec should not have been invoked", i)
  1191. }
  1192. }
  1193. }
  1194. }
  1195. func TestServeExecInContainer(t *testing.T) {
  1196. testExecAttach(t, "exec")
  1197. }
  1198. func TestServeAttachContainer(t *testing.T) {
  1199. testExecAttach(t, "attach")
  1200. }
  1201. func TestServePortForwardIdleTimeout(t *testing.T) {
  1202. fw := newServerTest()
  1203. defer fw.testHTTPServer.Close()
  1204. fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
  1205. return 100 * time.Millisecond
  1206. }
  1207. podNamespace := "other"
  1208. podName := "foo"
  1209. url := fw.testHTTPServer.URL + "/portForward/" + podNamespace + "/" + podName
  1210. upgradeRoundTripper := spdy.NewRoundTripper(nil)
  1211. c := &http.Client{Transport: upgradeRoundTripper}
  1212. resp, err := c.Post(url, "", nil)
  1213. if err != nil {
  1214. t.Fatalf("Got error POSTing: %v", err)
  1215. }
  1216. defer resp.Body.Close()
  1217. conn, err := upgradeRoundTripper.NewConnection(resp)
  1218. if err != nil {
  1219. t.Fatalf("Unexpected error creating streaming connection: %s", err)
  1220. }
  1221. if conn == nil {
  1222. t.Fatal("Unexpected nil connection")
  1223. }
  1224. defer conn.Close()
  1225. <-conn.CloseChan()
  1226. }
  1227. func TestServePortForward(t *testing.T) {
  1228. tests := []struct {
  1229. port string
  1230. uid bool
  1231. clientData string
  1232. containerData string
  1233. shouldError bool
  1234. }{
  1235. {port: "", shouldError: true},
  1236. {port: "abc", shouldError: true},
  1237. {port: "-1", shouldError: true},
  1238. {port: "65536", shouldError: true},
  1239. {port: "0", shouldError: true},
  1240. {port: "1", shouldError: false},
  1241. {port: "8000", shouldError: false},
  1242. {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
  1243. {port: "65535", shouldError: false},
  1244. {port: "65535", uid: true, shouldError: false},
  1245. }
  1246. podNamespace := "other"
  1247. podName := "foo"
  1248. expectedPodName := getPodName(podName, podNamespace)
  1249. expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
  1250. for i, test := range tests {
  1251. fw := newServerTest()
  1252. defer fw.testHTTPServer.Close()
  1253. fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
  1254. return 0
  1255. }
  1256. portForwardFuncDone := make(chan struct{})
  1257. fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
  1258. defer close(portForwardFuncDone)
  1259. if e, a := expectedPodName, name; e != a {
  1260. t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a)
  1261. }
  1262. if e, a := expectedUid, uid; test.uid && e != string(a) {
  1263. t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
  1264. }
  1265. p, err := strconv.ParseUint(test.port, 10, 16)
  1266. if err != nil {
  1267. t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
  1268. }
  1269. if e, a := uint16(p), port; e != a {
  1270. t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
  1271. }
  1272. if test.clientData != "" {
  1273. fromClient := make([]byte, 32)
  1274. n, err := stream.Read(fromClient)
  1275. if err != nil {
  1276. t.Fatalf("%d: error reading client data: %v", i, err)
  1277. }
  1278. if e, a := test.clientData, string(fromClient[0:n]); e != a {
  1279. t.Fatalf("%d: client data: expected to receive '%v', got '%v'", i, e, a)
  1280. }
  1281. }
  1282. if test.containerData != "" {
  1283. _, err := stream.Write([]byte(test.containerData))
  1284. if err != nil {
  1285. t.Fatalf("%d: error writing container data: %v", i, err)
  1286. }
  1287. }
  1288. return nil
  1289. }
  1290. var url string
  1291. if test.uid {
  1292. url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, expectedUid)
  1293. } else {
  1294. url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
  1295. }
  1296. upgradeRoundTripper := spdy.NewRoundTripper(nil)
  1297. c := &http.Client{Transport: upgradeRoundTripper}
  1298. resp, err := c.Post(url, "", nil)
  1299. if err != nil {
  1300. t.Fatalf("%d: Got error POSTing: %v", i, err)
  1301. }
  1302. defer resp.Body.Close()
  1303. conn, err := upgradeRoundTripper.NewConnection(resp)
  1304. if err != nil {
  1305. t.Fatalf("Unexpected error creating streaming connection: %s", err)
  1306. }
  1307. if conn == nil {
  1308. t.Fatalf("%d: Unexpected nil connection", i)
  1309. }
  1310. defer conn.Close()
  1311. headers := http.Header{}
  1312. headers.Set("streamType", "error")
  1313. headers.Set("port", test.port)
  1314. errorStream, err := conn.CreateStream(headers)
  1315. _ = errorStream
  1316. haveErr := err != nil
  1317. if e, a := test.shouldError, haveErr; e != a {
  1318. t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
  1319. }
  1320. if test.shouldError {
  1321. continue
  1322. }
  1323. headers.Set("streamType", "data")
  1324. headers.Set("port", test.port)
  1325. dataStream, err := conn.CreateStream(headers)
  1326. haveErr = err != nil
  1327. if e, a := test.shouldError, haveErr; e != a {
  1328. t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
  1329. }
  1330. if test.clientData != "" {
  1331. _, err := dataStream.Write([]byte(test.clientData))
  1332. if err != nil {
  1333. t.Fatalf("%d: unexpected error writing client data: %v", i, err)
  1334. }
  1335. }
  1336. if test.containerData != "" {
  1337. fromContainer := make([]byte, 32)
  1338. n, err := dataStream.Read(fromContainer)
  1339. if err != nil {
  1340. t.Fatalf("%d: unexpected error reading container data: %v", i, err)
  1341. }
  1342. if e, a := test.containerData, string(fromContainer[0:n]); e != a {
  1343. t.Fatalf("%d: expected to receive '%v' from container, got '%v'", i, e, a)
  1344. }
  1345. }
  1346. <-portForwardFuncDone
  1347. }
  1348. }
  1349. type fakeHttpStream struct {
  1350. headers http.Header
  1351. id uint32
  1352. }
  1353. func newFakeHttpStream() *fakeHttpStream {
  1354. return &fakeHttpStream{
  1355. headers: make(http.Header),
  1356. }
  1357. }
  1358. var _ httpstream.Stream = &fakeHttpStream{}
  1359. func (s *fakeHttpStream) Read(data []byte) (int, error) {
  1360. return 0, nil
  1361. }
  1362. func (s *fakeHttpStream) Write(data []byte) (int, error) {
  1363. return 0, nil
  1364. }
  1365. func (s *fakeHttpStream) Close() error {
  1366. return nil
  1367. }
  1368. func (s *fakeHttpStream) Reset() error {
  1369. return nil
  1370. }
  1371. func (s *fakeHttpStream) Headers() http.Header {
  1372. return s.headers
  1373. }
  1374. func (s *fakeHttpStream) Identifier() uint32 {
  1375. return s.id
  1376. }
  1377. func TestPortForwardStreamReceived(t *testing.T) {
  1378. tests := map[string]struct {
  1379. port string
  1380. streamType string
  1381. expectedError string
  1382. }{
  1383. "missing port": {
  1384. expectedError: `"port" header is required`,
  1385. },
  1386. "unable to parse port": {
  1387. port: "abc",
  1388. expectedError: `unable to parse "abc" as a port: strconv.ParseUint: parsing "abc": invalid syntax`,
  1389. },
  1390. "negative port": {
  1391. port: "-1",
  1392. expectedError: `unable to parse "-1" as a port: strconv.ParseUint: parsing "-1": invalid syntax`,
  1393. },
  1394. "missing stream type": {
  1395. port: "80",
  1396. expectedError: `"streamType" header is required`,
  1397. },
  1398. "valid port with error stream": {
  1399. port: "80",
  1400. streamType: "error",
  1401. },
  1402. "valid port with data stream": {
  1403. port: "80",
  1404. streamType: "data",
  1405. },
  1406. "invalid stream type": {
  1407. port: "80",
  1408. streamType: "foo",
  1409. expectedError: `invalid stream type "foo"`,
  1410. },
  1411. }
  1412. for name, test := range tests {
  1413. streams := make(chan httpstream.Stream, 1)
  1414. f := portForwardStreamReceived(streams)
  1415. stream := newFakeHttpStream()
  1416. if len(test.port) > 0 {
  1417. stream.headers.Set("port", test.port)
  1418. }
  1419. if len(test.streamType) > 0 {
  1420. stream.headers.Set("streamType", test.streamType)
  1421. }
  1422. replySent := make(chan struct{})
  1423. err := f(stream, replySent)
  1424. close(replySent)
  1425. if len(test.expectedError) > 0 {
  1426. if err == nil {
  1427. t.Errorf("%s: expected err=%q, but it was nil", name, test.expectedError)
  1428. }
  1429. if e, a := test.expectedError, err.Error(); e != a {
  1430. t.Errorf("%s: expected err=%q, got %q", name, e, a)
  1431. }
  1432. continue
  1433. }
  1434. if err != nil {
  1435. t.Errorf("%s: unexpected error %v", name, err)
  1436. continue
  1437. }
  1438. if s := <-streams; s != stream {
  1439. t.Errorf("%s: expected stream %#v, got %#v", name, stream, s)
  1440. }
  1441. }
  1442. }
  1443. func TestGetStreamPair(t *testing.T) {
  1444. timeout := make(chan time.Time)
  1445. h := &portForwardStreamHandler{
  1446. streamPairs: make(map[string]*portForwardStreamPair),
  1447. }
  1448. // test adding a new entry
  1449. p, created := h.getStreamPair("1")
  1450. if p == nil {
  1451. t.Fatalf("unexpected nil pair")
  1452. }
  1453. if !created {
  1454. t.Fatal("expected created=true")
  1455. }
  1456. if p.dataStream != nil {
  1457. t.Errorf("unexpected non-nil data stream")
  1458. }
  1459. if p.errorStream != nil {
  1460. t.Errorf("unexpected non-nil error stream")
  1461. }
  1462. // start the monitor for this pair
  1463. monitorDone := make(chan struct{})
  1464. go func() {
  1465. h.monitorStreamPair(p, timeout)
  1466. close(monitorDone)
  1467. }()
  1468. if !h.hasStreamPair("1") {
  1469. t.Fatal("This should still be true")
  1470. }
  1471. // make sure we can retrieve an existing entry
  1472. p2, created := h.getStreamPair("1")
  1473. if created {
  1474. t.Fatal("expected created=false")
  1475. }
  1476. if p != p2 {
  1477. t.Fatalf("retrieving an existing pair: expected %#v, got %#v", p, p2)
  1478. }
  1479. // removed via complete
  1480. dataStream := newFakeHttpStream()
  1481. dataStream.headers.Set(api.StreamType, api.StreamTypeData)
  1482. complete, err := p.add(dataStream)
  1483. if err != nil {
  1484. t.Fatalf("unexpected error adding data stream to pair: %v", err)
  1485. }
  1486. if complete {
  1487. t.Fatalf("unexpected complete")
  1488. }
  1489. errorStream := newFakeHttpStream()
  1490. errorStream.headers.Set(api.StreamType, api.StreamTypeError)
  1491. complete, err = p.add(errorStream)
  1492. if err != nil {
  1493. t.Fatalf("unexpected error adding error stream to pair: %v", err)
  1494. }
  1495. if !complete {
  1496. t.Fatal("unexpected incomplete")
  1497. }
  1498. // make sure monitorStreamPair completed
  1499. <-monitorDone
  1500. // make sure the pair was removed
  1501. if h.hasStreamPair("1") {
  1502. t.Fatal("expected removal of pair after both data and error streams received")
  1503. }
  1504. // removed via timeout
  1505. p, created = h.getStreamPair("2")
  1506. if !created {
  1507. t.Fatal("expected created=true")
  1508. }
  1509. if p == nil {
  1510. t.Fatal("expected p not to be nil")
  1511. }
  1512. monitorDone = make(chan struct{})
  1513. go func() {
  1514. h.monitorStreamPair(p, timeout)
  1515. close(monitorDone)
  1516. }()
  1517. // cause the timeout
  1518. close(timeout)
  1519. // make sure monitorStreamPair completed
  1520. <-monitorDone
  1521. if h.hasStreamPair("2") {
  1522. t.Fatal("expected stream pair to be removed")
  1523. }
  1524. }
  1525. func TestRequestID(t *testing.T) {
  1526. h := &portForwardStreamHandler{}
  1527. s := newFakeHttpStream()
  1528. s.headers.Set(api.StreamType, api.StreamTypeError)
  1529. s.id = 1
  1530. if e, a := "1", h.requestID(s); e != a {
  1531. t.Errorf("expected %q, got %q", e, a)
  1532. }
  1533. s.headers.Set(api.StreamType, api.StreamTypeData)
  1534. s.id = 3
  1535. if e, a := "1", h.requestID(s); e != a {
  1536. t.Errorf("expected %q, got %q", e, a)
  1537. }
  1538. s.id = 7
  1539. s.headers.Set(api.PortForwardRequestIDHeader, "2")
  1540. if e, a := "2", h.requestID(s); e != a {
  1541. t.Errorf("expected %q, got %q", e, a)
  1542. }
  1543. }