watch.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apiserver
  14. import (
  15. "bytes"
  16. "fmt"
  17. "net/http"
  18. "reflect"
  19. "time"
  20. "k8s.io/kubernetes/pkg/api/errors"
  21. "k8s.io/kubernetes/pkg/httplog"
  22. "k8s.io/kubernetes/pkg/runtime"
  23. "k8s.io/kubernetes/pkg/runtime/serializer/streaming"
  24. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  25. "k8s.io/kubernetes/pkg/util/wsstream"
  26. "k8s.io/kubernetes/pkg/watch"
  27. "k8s.io/kubernetes/pkg/watch/versioned"
  28. "github.com/emicklei/go-restful"
  29. "golang.org/x/net/websocket"
  30. )
  31. // nothing will ever be sent down this channel
  32. var neverExitWatch <-chan time.Time = make(chan time.Time)
  33. // timeoutFactory abstracts watch timeout logic for testing
  34. type timeoutFactory interface {
  35. TimeoutCh() (<-chan time.Time, func() bool)
  36. }
  37. // realTimeoutFactory implements timeoutFactory
  38. type realTimeoutFactory struct {
  39. timeout time.Duration
  40. }
  41. // TimeoutChan returns a channel which will receive something when the watch times out,
  42. // and a cleanup function to call when this happens.
  43. func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
  44. if w.timeout == 0 {
  45. return neverExitWatch, func() bool { return false }
  46. }
  47. t := time.NewTimer(w.timeout)
  48. return t.C, t.Stop
  49. }
  50. // serveWatch handles serving requests to the server
  51. // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
  52. func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
  53. // negotiate for the stream serializer
  54. serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
  55. if err != nil {
  56. scope.err(err, res.ResponseWriter, req.Request)
  57. return
  58. }
  59. if serializer.Framer == nil {
  60. scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request)
  61. return
  62. }
  63. encoder := scope.Serializer.EncoderForVersion(serializer.Serializer, scope.Kind.GroupVersion())
  64. useTextFraming := serializer.EncodesAsText
  65. // find the embedded serializer matching the media type
  66. embeddedEncoder := scope.Serializer.EncoderForVersion(serializer.Embedded.Serializer, scope.Kind.GroupVersion())
  67. server := &WatchServer{
  68. watching: watcher,
  69. scope: scope,
  70. useTextFraming: useTextFraming,
  71. mediaType: serializer.MediaType,
  72. framer: serializer.Framer,
  73. encoder: encoder,
  74. embeddedEncoder: embeddedEncoder,
  75. fixup: func(obj runtime.Object) {
  76. if err := setSelfLink(obj, req, scope.Namer); err != nil {
  77. utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
  78. }
  79. },
  80. t: &realTimeoutFactory{timeout},
  81. }
  82. server.ServeHTTP(res.ResponseWriter, req.Request)
  83. }
  84. // WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
  85. type WatchServer struct {
  86. watching watch.Interface
  87. scope RequestScope
  88. // true if websocket messages should use text framing (as opposed to binary framing)
  89. useTextFraming bool
  90. // the media type this watch is being served with
  91. mediaType string
  92. // used to frame the watch stream
  93. framer runtime.Framer
  94. // used to encode the watch stream event itself
  95. encoder runtime.Encoder
  96. // used to encode the nested object in the watch stream
  97. embeddedEncoder runtime.Encoder
  98. fixup func(runtime.Object)
  99. t timeoutFactory
  100. }
  101. // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
  102. // or over a websocket connection.
  103. func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  104. w = httplog.Unlogged(w)
  105. if wsstream.IsWebSocketRequest(req) {
  106. w.Header().Set("Content-Type", s.mediaType)
  107. websocket.Handler(s.HandleWS).ServeHTTP(w, req)
  108. return
  109. }
  110. cn, ok := w.(http.CloseNotifier)
  111. if !ok {
  112. err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
  113. utilruntime.HandleError(err)
  114. s.scope.err(errors.NewInternalError(err), w, req)
  115. return
  116. }
  117. flusher, ok := w.(http.Flusher)
  118. if !ok {
  119. err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
  120. utilruntime.HandleError(err)
  121. s.scope.err(errors.NewInternalError(err), w, req)
  122. return
  123. }
  124. framer := s.framer.NewFrameWriter(w)
  125. if framer == nil {
  126. // programmer error
  127. err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType)
  128. utilruntime.HandleError(err)
  129. s.scope.err(errors.NewBadRequest(err.Error()), w, req)
  130. return
  131. }
  132. e := streaming.NewEncoder(framer, s.encoder)
  133. // ensure the connection times out
  134. timeoutCh, cleanup := s.t.TimeoutCh()
  135. defer cleanup()
  136. defer s.watching.Stop()
  137. // begin the stream
  138. w.Header().Set("Content-Type", s.mediaType)
  139. w.Header().Set("Transfer-Encoding", "chunked")
  140. w.WriteHeader(http.StatusOK)
  141. flusher.Flush()
  142. var unknown runtime.Unknown
  143. internalEvent := &versioned.InternalEvent{}
  144. buf := &bytes.Buffer{}
  145. ch := s.watching.ResultChan()
  146. for {
  147. select {
  148. case <-cn.CloseNotify():
  149. return
  150. case <-timeoutCh:
  151. return
  152. case event, ok := <-ch:
  153. if !ok {
  154. // End of results.
  155. return
  156. }
  157. obj := event.Object
  158. s.fixup(obj)
  159. if err := s.embeddedEncoder.Encode(obj, buf); err != nil {
  160. // unexpected error
  161. utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
  162. return
  163. }
  164. // ContentType is not required here because we are defaulting to the serializer
  165. // type
  166. unknown.Raw = buf.Bytes()
  167. event.Object = &unknown
  168. // the internal event will be versioned by the encoder
  169. *internalEvent = versioned.InternalEvent(event)
  170. if err := e.Encode(internalEvent); err != nil {
  171. utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
  172. // client disconnect.
  173. return
  174. }
  175. if len(ch) == 0 {
  176. flusher.Flush()
  177. }
  178. buf.Reset()
  179. }
  180. }
  181. }
  182. // HandleWS implements a websocket handler.
  183. func (s *WatchServer) HandleWS(ws *websocket.Conn) {
  184. defer ws.Close()
  185. done := make(chan struct{})
  186. go func() {
  187. defer utilruntime.HandleCrash()
  188. // This blocks until the connection is closed.
  189. // Client should not send anything.
  190. wsstream.IgnoreReceives(ws, 0)
  191. // Once the client closes, we should also close
  192. close(done)
  193. }()
  194. var unknown runtime.Unknown
  195. internalEvent := &versioned.InternalEvent{}
  196. buf := &bytes.Buffer{}
  197. streamBuf := &bytes.Buffer{}
  198. ch := s.watching.ResultChan()
  199. for {
  200. select {
  201. case <-done:
  202. s.watching.Stop()
  203. return
  204. case event, ok := <-ch:
  205. if !ok {
  206. // End of results.
  207. return
  208. }
  209. obj := event.Object
  210. s.fixup(obj)
  211. if err := s.embeddedEncoder.Encode(obj, buf); err != nil {
  212. // unexpected error
  213. utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
  214. return
  215. }
  216. // ContentType is not required here because we are defaulting to the serializer
  217. // type
  218. unknown.Raw = buf.Bytes()
  219. event.Object = &unknown
  220. // the internal event will be versioned by the encoder
  221. *internalEvent = versioned.InternalEvent(event)
  222. if err := s.encoder.Encode(internalEvent, streamBuf); err != nil {
  223. // encoding error
  224. utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
  225. s.watching.Stop()
  226. return
  227. }
  228. if s.useTextFraming {
  229. if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
  230. // Client disconnect.
  231. s.watching.Stop()
  232. return
  233. }
  234. } else {
  235. if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
  236. // Client disconnect.
  237. s.watching.Stop()
  238. return
  239. }
  240. }
  241. buf.Reset()
  242. streamBuf.Reset()
  243. }
  244. }
  245. }