watch_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apiserver
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io"
  18. "io/ioutil"
  19. "math/rand"
  20. "net/http"
  21. "net/http/httptest"
  22. "net/url"
  23. "reflect"
  24. "sync"
  25. "testing"
  26. "time"
  27. "golang.org/x/net/websocket"
  28. "k8s.io/kubernetes/pkg/api"
  29. "k8s.io/kubernetes/pkg/api/rest"
  30. apitesting "k8s.io/kubernetes/pkg/api/testing"
  31. "k8s.io/kubernetes/pkg/api/unversioned"
  32. apiservertesting "k8s.io/kubernetes/pkg/apiserver/testing"
  33. "k8s.io/kubernetes/pkg/fields"
  34. "k8s.io/kubernetes/pkg/labels"
  35. "k8s.io/kubernetes/pkg/runtime"
  36. "k8s.io/kubernetes/pkg/runtime/serializer/streaming"
  37. "k8s.io/kubernetes/pkg/util/diff"
  38. "k8s.io/kubernetes/pkg/util/wait"
  39. "k8s.io/kubernetes/pkg/watch"
  40. "k8s.io/kubernetes/pkg/watch/versioned"
  41. )
  42. // watchJSON defines the expected JSON wire equivalent of watch.Event
  43. type watchJSON struct {
  44. Type watch.EventType `json:"type,omitempty"`
  45. Object json.RawMessage `json:"object,omitempty"`
  46. }
  47. // roundTripOrDie round trips an object to get defaults set.
  48. func roundTripOrDie(codec runtime.Codec, object runtime.Object) runtime.Object {
  49. data, err := runtime.Encode(codec, object)
  50. if err != nil {
  51. panic(err)
  52. }
  53. obj, err := runtime.Decode(codec, data)
  54. if err != nil {
  55. panic(err)
  56. }
  57. return obj
  58. }
  59. var watchTestTable = []struct {
  60. t watch.EventType
  61. obj runtime.Object
  62. }{
  63. {watch.Added, &apiservertesting.Simple{ObjectMeta: api.ObjectMeta{Name: "foo"}}},
  64. {watch.Modified, &apiservertesting.Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}},
  65. {watch.Deleted, &apiservertesting.Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}},
  66. }
  67. var podWatchTestTable = []struct {
  68. t watch.EventType
  69. obj runtime.Object
  70. }{
  71. {watch.Added, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})},
  72. {watch.Modified, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})},
  73. {watch.Deleted, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})},
  74. }
  75. func TestWatchWebsocket(t *testing.T) {
  76. simpleStorage := &SimpleRESTStorage{}
  77. _ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work.
  78. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  79. server := httptest.NewServer(handler)
  80. defer server.Close()
  81. dest, _ := url.Parse(server.URL)
  82. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
  83. dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples"
  84. dest.RawQuery = ""
  85. ws, err := websocket.Dial(dest.String(), "", "http://localhost")
  86. if err != nil {
  87. t.Fatalf("unexpected error: %v", err)
  88. }
  89. try := func(action watch.EventType, object runtime.Object) {
  90. // Send
  91. simpleStorage.fakeWatch.Action(action, object)
  92. // Test receive
  93. var got watchJSON
  94. err := websocket.JSON.Receive(ws, &got)
  95. if err != nil {
  96. t.Fatalf("Unexpected error: %v", err)
  97. }
  98. if got.Type != action {
  99. t.Errorf("Unexpected type: %v", got.Type)
  100. }
  101. gotObj, err := runtime.Decode(codec, got.Object)
  102. if err != nil {
  103. t.Fatalf("Decode error: %v\n%v", err, got)
  104. }
  105. if _, err := api.GetReference(gotObj); err != nil {
  106. t.Errorf("Unable to construct reference: %v", err)
  107. }
  108. if e, a := object, gotObj; !reflect.DeepEqual(e, a) {
  109. t.Errorf("Expected %#v, got %#v", e, a)
  110. }
  111. }
  112. for _, item := range watchTestTable {
  113. try(item.t, item.obj)
  114. }
  115. simpleStorage.fakeWatch.Stop()
  116. var got watchJSON
  117. err = websocket.JSON.Receive(ws, &got)
  118. if err == nil {
  119. t.Errorf("Unexpected non-error")
  120. }
  121. }
  122. func TestWatchWebsocketClientClose(t *testing.T) {
  123. simpleStorage := &SimpleRESTStorage{}
  124. _ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work.
  125. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  126. server := httptest.NewServer(handler)
  127. defer server.Close()
  128. dest, _ := url.Parse(server.URL)
  129. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
  130. dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples"
  131. dest.RawQuery = ""
  132. ws, err := websocket.Dial(dest.String(), "", "http://localhost")
  133. if err != nil {
  134. t.Fatalf("unexpected error: %v", err)
  135. }
  136. try := func(action watch.EventType, object runtime.Object) {
  137. // Send
  138. simpleStorage.fakeWatch.Action(action, object)
  139. // Test receive
  140. var got watchJSON
  141. err := websocket.JSON.Receive(ws, &got)
  142. if err != nil {
  143. t.Fatalf("Unexpected error: %v", err)
  144. }
  145. if got.Type != action {
  146. t.Errorf("Unexpected type: %v", got.Type)
  147. }
  148. gotObj, err := runtime.Decode(codec, got.Object)
  149. if err != nil {
  150. t.Fatalf("Decode error: %v\n%v", err, got)
  151. }
  152. if _, err := api.GetReference(gotObj); err != nil {
  153. t.Errorf("Unable to construct reference: %v", err)
  154. }
  155. if e, a := object, gotObj; !reflect.DeepEqual(e, a) {
  156. t.Errorf("Expected %#v, got %#v", e, a)
  157. }
  158. }
  159. // Send/receive should work
  160. for _, item := range watchTestTable {
  161. try(item.t, item.obj)
  162. }
  163. // Sending normal data should be ignored
  164. websocket.JSON.Send(ws, map[string]interface{}{"test": "data"})
  165. // Send/receive should still work
  166. for _, item := range watchTestTable {
  167. try(item.t, item.obj)
  168. }
  169. // Client requests a close
  170. ws.Close()
  171. select {
  172. case data, ok := <-simpleStorage.fakeWatch.ResultChan():
  173. if ok {
  174. t.Errorf("expected a closed result channel, but got watch result %#v", data)
  175. }
  176. case <-time.After(5 * time.Second):
  177. t.Errorf("watcher did not close when client closed")
  178. }
  179. var got watchJSON
  180. err = websocket.JSON.Receive(ws, &got)
  181. if err == nil {
  182. t.Errorf("Unexpected non-error")
  183. }
  184. }
  185. func TestWatchRead(t *testing.T) {
  186. simpleStorage := &SimpleRESTStorage{}
  187. _ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work.
  188. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  189. server := httptest.NewServer(handler)
  190. defer server.Close()
  191. dest, _ := url.Parse(server.URL)
  192. dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/simples"
  193. dest.RawQuery = "watch=1"
  194. connectHTTP := func(accept string) (io.ReadCloser, string) {
  195. client := http.Client{}
  196. request, err := http.NewRequest("GET", dest.String(), nil)
  197. if err != nil {
  198. t.Fatalf("unexpected error: %v", err)
  199. }
  200. request.Header.Add("Accept", accept)
  201. response, err := client.Do(request)
  202. if err != nil {
  203. t.Fatalf("unexpected error: %v", err)
  204. }
  205. if response.StatusCode != http.StatusOK {
  206. t.Fatalf("Unexpected response %#v", response)
  207. }
  208. return response.Body, response.Header.Get("Content-Type")
  209. }
  210. connectWebSocket := func(accept string) (io.ReadCloser, string) {
  211. dest := *dest
  212. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
  213. config, err := websocket.NewConfig(dest.String(), "http://localhost")
  214. if err != nil {
  215. t.Fatalf("unexpected error: %v", err)
  216. }
  217. config.Header.Add("Accept", accept)
  218. ws, err := websocket.DialConfig(config)
  219. if err != nil {
  220. t.Fatalf("unexpected error: %v", err)
  221. }
  222. return ws, "__default__"
  223. }
  224. testCases := []struct {
  225. Accept string
  226. ExpectedContentType string
  227. MediaType string
  228. }{
  229. {
  230. Accept: "application/json",
  231. ExpectedContentType: "application/json",
  232. MediaType: "application/json",
  233. },
  234. // TODO: yaml stream serialization requires that RawExtension.MarshalJSON
  235. // be able to understand nested encoding (since yaml calls json.Marshal
  236. // rather than yaml.Marshal, which results in the raw bytes being in yaml).
  237. // Same problem as thirdparty object.
  238. /*{
  239. Accept: "application/yaml",
  240. ExpectedContentType: "application/yaml;stream=watch",
  241. MediaType: "application/yaml",
  242. },*/
  243. {
  244. Accept: "application/vnd.kubernetes.protobuf",
  245. ExpectedContentType: "application/vnd.kubernetes.protobuf;stream=watch",
  246. MediaType: "application/vnd.kubernetes.protobuf",
  247. },
  248. {
  249. Accept: "application/vnd.kubernetes.protobuf;stream=watch",
  250. ExpectedContentType: "application/vnd.kubernetes.protobuf;stream=watch",
  251. MediaType: "application/vnd.kubernetes.protobuf",
  252. },
  253. }
  254. protocols := []struct {
  255. name string
  256. selfFraming bool
  257. fn func(string) (io.ReadCloser, string)
  258. }{
  259. {name: "http", fn: connectHTTP},
  260. {name: "websocket", selfFraming: true, fn: connectWebSocket},
  261. }
  262. for _, protocol := range protocols {
  263. for _, test := range testCases {
  264. serializer, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil)
  265. if !ok {
  266. t.Fatal(serializer)
  267. }
  268. r, contentType := protocol.fn(test.Accept)
  269. defer r.Close()
  270. if contentType != "__default__" && contentType != test.ExpectedContentType {
  271. t.Errorf("Unexpected content type: %#v", contentType)
  272. }
  273. objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil)
  274. if !ok {
  275. t.Fatal(objectSerializer)
  276. }
  277. objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion)
  278. var fr io.ReadCloser = r
  279. if !protocol.selfFraming {
  280. fr = serializer.Framer.NewFrameReader(r)
  281. }
  282. d := streaming.NewDecoder(fr, serializer)
  283. var w *watch.FakeWatcher
  284. for w == nil {
  285. w = simpleStorage.Watcher()
  286. time.Sleep(time.Millisecond)
  287. }
  288. for i, item := range podWatchTestTable {
  289. action, object := item.t, item.obj
  290. name := fmt.Sprintf("%s-%s-%d", protocol.name, test.MediaType, i)
  291. // Send
  292. w.Action(action, object)
  293. // Test receive
  294. var got versioned.Event
  295. _, _, err := d.Decode(nil, &got)
  296. if err != nil {
  297. t.Fatalf("%s: Unexpected error: %v", name, err)
  298. }
  299. if got.Type != string(action) {
  300. t.Errorf("%s: Unexpected type: %v", name, got.Type)
  301. }
  302. gotObj, err := runtime.Decode(objectCodec, got.Object.Raw)
  303. if err != nil {
  304. t.Fatalf("%s: Decode error: %v", name, err)
  305. }
  306. if _, err := api.GetReference(gotObj); err != nil {
  307. t.Errorf("%s: Unable to construct reference: %v", name, err)
  308. }
  309. if e, a := object, gotObj; !api.Semantic.DeepEqual(e, a) {
  310. t.Errorf("%s: different: %s", name, diff.ObjectDiff(e, a))
  311. }
  312. }
  313. w.Stop()
  314. var got versioned.Event
  315. _, _, err := d.Decode(nil, &got)
  316. if err == nil {
  317. t.Errorf("Unexpected non-error")
  318. }
  319. r.Close()
  320. }
  321. }
  322. }
  323. func TestWatchHTTPAccept(t *testing.T) {
  324. simpleStorage := &SimpleRESTStorage{}
  325. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  326. server := httptest.NewServer(handler)
  327. defer server.Close()
  328. client := http.Client{}
  329. dest, _ := url.Parse(server.URL)
  330. dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples"
  331. dest.RawQuery = ""
  332. request, err := http.NewRequest("GET", dest.String(), nil)
  333. if err != nil {
  334. t.Errorf("unexpected error: %v", err)
  335. }
  336. request.Header.Set("Accept", "application/XYZ")
  337. response, err := client.Do(request)
  338. if err != nil {
  339. t.Errorf("unexpected error: %v", err)
  340. }
  341. // TODO: once this is fixed, this test will change
  342. if response.StatusCode != http.StatusNotAcceptable {
  343. t.Errorf("Unexpected response %#v", response)
  344. }
  345. }
  346. func TestWatchParamParsing(t *testing.T) {
  347. simpleStorage := &SimpleRESTStorage{}
  348. handler := handle(map[string]rest.Storage{
  349. "simples": simpleStorage,
  350. "simpleroots": simpleStorage,
  351. })
  352. server := httptest.NewServer(handler)
  353. defer server.Close()
  354. dest, _ := url.Parse(server.URL)
  355. rootPath := "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples"
  356. namespacedPath := "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/namespaces/other/simpleroots"
  357. table := []struct {
  358. path string
  359. rawQuery string
  360. resourceVersion string
  361. labelSelector string
  362. fieldSelector string
  363. namespace string
  364. }{
  365. {
  366. path: rootPath,
  367. rawQuery: "resourceVersion=1234",
  368. resourceVersion: "1234",
  369. labelSelector: "",
  370. fieldSelector: "",
  371. namespace: api.NamespaceAll,
  372. }, {
  373. path: rootPath,
  374. rawQuery: "resourceVersion=314159&fieldSelector=Host%3D&labelSelector=name%3Dfoo",
  375. resourceVersion: "314159",
  376. labelSelector: "name=foo",
  377. fieldSelector: "Host=",
  378. namespace: api.NamespaceAll,
  379. }, {
  380. path: rootPath,
  381. rawQuery: "fieldSelector=id%3dfoo&resourceVersion=1492",
  382. resourceVersion: "1492",
  383. labelSelector: "",
  384. fieldSelector: "id=foo",
  385. namespace: api.NamespaceAll,
  386. }, {
  387. path: rootPath,
  388. rawQuery: "",
  389. resourceVersion: "",
  390. labelSelector: "",
  391. fieldSelector: "",
  392. namespace: api.NamespaceAll,
  393. },
  394. {
  395. path: namespacedPath,
  396. rawQuery: "resourceVersion=1234",
  397. resourceVersion: "1234",
  398. labelSelector: "",
  399. fieldSelector: "",
  400. namespace: "other",
  401. }, {
  402. path: namespacedPath,
  403. rawQuery: "resourceVersion=314159&fieldSelector=Host%3D&labelSelector=name%3Dfoo",
  404. resourceVersion: "314159",
  405. labelSelector: "name=foo",
  406. fieldSelector: "Host=",
  407. namespace: "other",
  408. }, {
  409. path: namespacedPath,
  410. rawQuery: "fieldSelector=id%3dfoo&resourceVersion=1492",
  411. resourceVersion: "1492",
  412. labelSelector: "",
  413. fieldSelector: "id=foo",
  414. namespace: "other",
  415. }, {
  416. path: namespacedPath,
  417. rawQuery: "",
  418. resourceVersion: "",
  419. labelSelector: "",
  420. fieldSelector: "",
  421. namespace: "other",
  422. },
  423. }
  424. for _, item := range table {
  425. simpleStorage.requestedLabelSelector = labels.Everything()
  426. simpleStorage.requestedFieldSelector = fields.Everything()
  427. simpleStorage.requestedResourceVersion = "5" // Prove this is set in all cases
  428. simpleStorage.requestedResourceNamespace = ""
  429. dest.Path = item.path
  430. dest.RawQuery = item.rawQuery
  431. resp, err := http.Get(dest.String())
  432. if err != nil {
  433. t.Errorf("%v: unexpected error: %v", item.rawQuery, err)
  434. continue
  435. }
  436. resp.Body.Close()
  437. if e, a := item.namespace, simpleStorage.requestedResourceNamespace; e != a {
  438. t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
  439. }
  440. if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a {
  441. t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
  442. }
  443. if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a {
  444. t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
  445. }
  446. if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a {
  447. t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
  448. }
  449. }
  450. }
  451. func TestWatchProtocolSelection(t *testing.T) {
  452. simpleStorage := &SimpleRESTStorage{}
  453. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  454. server := httptest.NewServer(handler)
  455. defer server.Close()
  456. defer server.CloseClientConnections()
  457. client := http.Client{}
  458. dest, _ := url.Parse(server.URL)
  459. dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples"
  460. dest.RawQuery = ""
  461. table := []struct {
  462. isWebsocket bool
  463. connHeader string
  464. }{
  465. {true, "Upgrade"},
  466. {true, "keep-alive, Upgrade"},
  467. {true, "upgrade"},
  468. {false, "keep-alive"},
  469. }
  470. for _, item := range table {
  471. request, err := http.NewRequest("GET", dest.String(), nil)
  472. if err != nil {
  473. t.Errorf("unexpected error: %v", err)
  474. }
  475. request.Header.Set("Connection", item.connHeader)
  476. request.Header.Set("Upgrade", "websocket")
  477. response, err := client.Do(request)
  478. if err != nil {
  479. t.Errorf("unexpected error: %v", err)
  480. }
  481. // The requests recognized as websocket requests based on connection
  482. // and upgrade headers will not also have the necessary Sec-Websocket-*
  483. // headers so it is expected to throw a 400
  484. if item.isWebsocket && response.StatusCode != http.StatusBadRequest {
  485. t.Errorf("Unexpected response %#v", response)
  486. }
  487. if !item.isWebsocket && response.StatusCode != http.StatusOK {
  488. t.Errorf("Unexpected response %#v", response)
  489. }
  490. }
  491. }
  492. type fakeTimeoutFactory struct {
  493. timeoutCh chan time.Time
  494. done chan struct{}
  495. }
  496. func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
  497. return t.timeoutCh, func() bool {
  498. defer close(t.done)
  499. return true
  500. }
  501. }
  502. func TestWatchHTTPTimeout(t *testing.T) {
  503. watcher := watch.NewFake()
  504. timeoutCh := make(chan time.Time)
  505. done := make(chan struct{})
  506. serializer, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil)
  507. if !ok {
  508. t.Fatal(serializer)
  509. }
  510. // Setup a new watchserver
  511. watchServer := &WatchServer{
  512. watching: watcher,
  513. mediaType: "testcase/json",
  514. framer: serializer.Framer,
  515. encoder: newCodec,
  516. embeddedEncoder: newCodec,
  517. fixup: func(obj runtime.Object) {},
  518. t: &fakeTimeoutFactory{timeoutCh, done},
  519. }
  520. s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  521. watchServer.ServeHTTP(w, req)
  522. }))
  523. defer s.Close()
  524. // Setup a client
  525. dest, _ := url.Parse(s.URL)
  526. dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/simple"
  527. dest.RawQuery = "watch=true"
  528. req, _ := http.NewRequest("GET", dest.String(), nil)
  529. client := http.Client{}
  530. resp, err := client.Do(req)
  531. watcher.Add(&apiservertesting.Simple{TypeMeta: unversioned.TypeMeta{APIVersion: newGroupVersion.String()}})
  532. // Make sure we can actually watch an endpoint
  533. decoder := json.NewDecoder(resp.Body)
  534. var got watchJSON
  535. err = decoder.Decode(&got)
  536. if err != nil {
  537. t.Fatalf("Unexpected error: %v", err)
  538. }
  539. // Timeout and check for leaks
  540. close(timeoutCh)
  541. select {
  542. case <-done:
  543. if !watcher.Stopped {
  544. t.Errorf("Leaked watch on timeout")
  545. }
  546. case <-time.After(wait.ForeverTestTimeout):
  547. t.Errorf("Failed to stop watcher after %s of timeout signal", wait.ForeverTestTimeout.String())
  548. }
  549. // Make sure we can't receive any more events through the timeout watch
  550. err = decoder.Decode(&got)
  551. if err != io.EOF {
  552. t.Errorf("Unexpected non-error")
  553. }
  554. }
  555. const benchmarkSeed = 100
  556. func benchmarkItems() []api.Pod {
  557. apiObjectFuzzer := apitesting.FuzzerFor(nil, api.SchemeGroupVersion, rand.NewSource(benchmarkSeed))
  558. items := make([]api.Pod, 3)
  559. for i := range items {
  560. apiObjectFuzzer.Fuzz(&items[i])
  561. items[i].Spec.InitContainers, items[i].Status.InitContainerStatuses = nil, nil
  562. }
  563. return items
  564. }
  565. // BenchmarkWatchHTTP measures the cost of serving a watch.
  566. func BenchmarkWatchHTTP(b *testing.B) {
  567. items := benchmarkItems()
  568. simpleStorage := &SimpleRESTStorage{}
  569. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  570. server := httptest.NewServer(handler)
  571. defer server.Close()
  572. client := http.Client{}
  573. dest, _ := url.Parse(server.URL)
  574. dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
  575. dest.RawQuery = ""
  576. request, err := http.NewRequest("GET", dest.String(), nil)
  577. if err != nil {
  578. b.Fatalf("unexpected error: %v", err)
  579. }
  580. response, err := client.Do(request)
  581. if err != nil {
  582. b.Fatalf("unexpected error: %v", err)
  583. }
  584. if response.StatusCode != http.StatusOK {
  585. b.Fatalf("Unexpected response %#v", response)
  586. }
  587. wg := sync.WaitGroup{}
  588. wg.Add(1)
  589. go func() {
  590. defer response.Body.Close()
  591. if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
  592. b.Fatal(err)
  593. }
  594. wg.Done()
  595. }()
  596. actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
  597. b.ResetTimer()
  598. for i := 0; i < b.N; i++ {
  599. simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
  600. }
  601. simpleStorage.fakeWatch.Stop()
  602. wg.Wait()
  603. b.StopTimer()
  604. }
  605. // BenchmarkWatchWebsocket measures the cost of serving a watch.
  606. func BenchmarkWatchWebsocket(b *testing.B) {
  607. items := benchmarkItems()
  608. simpleStorage := &SimpleRESTStorage{}
  609. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  610. server := httptest.NewServer(handler)
  611. defer server.Close()
  612. dest, _ := url.Parse(server.URL)
  613. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
  614. dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
  615. dest.RawQuery = ""
  616. ws, err := websocket.Dial(dest.String(), "", "http://localhost")
  617. if err != nil {
  618. b.Fatalf("unexpected error: %v", err)
  619. }
  620. wg := sync.WaitGroup{}
  621. wg.Add(1)
  622. go func() {
  623. defer ws.Close()
  624. if _, err := io.Copy(ioutil.Discard, ws); err != nil {
  625. b.Fatal(err)
  626. }
  627. wg.Done()
  628. }()
  629. actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
  630. b.ResetTimer()
  631. for i := 0; i < b.N; i++ {
  632. simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
  633. }
  634. simpleStorage.fakeWatch.Stop()
  635. wg.Wait()
  636. b.StopTimer()
  637. }
  638. // BenchmarkWatchProtobuf measures the cost of serving a watch.
  639. func BenchmarkWatchProtobuf(b *testing.B) {
  640. items := benchmarkItems()
  641. simpleStorage := &SimpleRESTStorage{}
  642. handler := handle(map[string]rest.Storage{"simples": simpleStorage})
  643. server := httptest.NewServer(handler)
  644. defer server.Close()
  645. client := http.Client{}
  646. dest, _ := url.Parse(server.URL)
  647. dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
  648. dest.RawQuery = ""
  649. request, err := http.NewRequest("GET", dest.String(), nil)
  650. if err != nil {
  651. b.Fatalf("unexpected error: %v", err)
  652. }
  653. request.Header.Set("Accept", "application/vnd.kubernetes.protobuf")
  654. response, err := client.Do(request)
  655. if err != nil {
  656. b.Fatalf("unexpected error: %v", err)
  657. }
  658. if response.StatusCode != http.StatusOK {
  659. body, _ := ioutil.ReadAll(response.Body)
  660. b.Fatalf("Unexpected response %#v\n%s", response, body)
  661. }
  662. wg := sync.WaitGroup{}
  663. wg.Add(1)
  664. go func() {
  665. defer response.Body.Close()
  666. if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
  667. b.Fatal(err)
  668. }
  669. wg.Done()
  670. }()
  671. actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
  672. b.ResetTimer()
  673. for i := 0; i < b.N; i++ {
  674. simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
  675. }
  676. simpleStorage.fakeWatch.Stop()
  677. wg.Wait()
  678. b.StopTimer()
  679. }