httpstream.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package remotecommand
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. apierrors "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/api/unversioned"
  24. "k8s.io/kubernetes/pkg/util/httpstream"
  25. "k8s.io/kubernetes/pkg/util/httpstream/spdy"
  26. "k8s.io/kubernetes/pkg/util/runtime"
  27. "k8s.io/kubernetes/pkg/util/term"
  28. "k8s.io/kubernetes/pkg/util/wsstream"
  29. "github.com/golang/glog"
  30. )
  31. // options contains details about which streams are required for
  32. // remote command execution.
  33. type options struct {
  34. stdin bool
  35. stdout bool
  36. stderr bool
  37. tty bool
  38. expectedStreams int
  39. }
  40. // newOptions creates a new options from the Request.
  41. func newOptions(req *http.Request) (*options, error) {
  42. tty := req.FormValue(api.ExecTTYParam) == "1"
  43. stdin := req.FormValue(api.ExecStdinParam) == "1"
  44. stdout := req.FormValue(api.ExecStdoutParam) == "1"
  45. stderr := req.FormValue(api.ExecStderrParam) == "1"
  46. if tty && stderr {
  47. // TODO: make this an error before we reach this method
  48. glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
  49. stderr = false
  50. }
  51. // count the streams client asked for, starting with 1
  52. expectedStreams := 1
  53. if stdin {
  54. expectedStreams++
  55. }
  56. if stdout {
  57. expectedStreams++
  58. }
  59. if stderr {
  60. expectedStreams++
  61. }
  62. if expectedStreams == 1 {
  63. return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
  64. }
  65. return &options{
  66. stdin: stdin,
  67. stdout: stdout,
  68. stderr: stderr,
  69. tty: tty,
  70. expectedStreams: expectedStreams,
  71. }, nil
  72. }
  73. // context contains the connection and streams used when
  74. // forwarding an attach or execute session into a container.
  75. type context struct {
  76. conn io.Closer
  77. stdinStream io.ReadCloser
  78. stdoutStream io.WriteCloser
  79. stderrStream io.WriteCloser
  80. writeStatus func(status *apierrors.StatusError) error
  81. resizeStream io.ReadCloser
  82. resizeChan chan term.Size
  83. tty bool
  84. }
  85. // streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
  86. // enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
  87. // replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
  88. // received and right after, the connection gets closed).
  89. type streamAndReply struct {
  90. httpstream.Stream
  91. replySent <-chan struct{}
  92. }
  93. // waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
  94. // an empty struct to the notify channel.
  95. func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
  96. select {
  97. case <-replySent:
  98. notify <- struct{}{}
  99. case <-stop:
  100. }
  101. }
  102. func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
  103. opts, err := newOptions(req)
  104. if err != nil {
  105. runtime.HandleError(err)
  106. w.WriteHeader(http.StatusBadRequest)
  107. fmt.Fprint(w, err.Error())
  108. return nil, false
  109. }
  110. var ctx *context
  111. var ok bool
  112. if wsstream.IsWebSocketRequest(req) {
  113. ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
  114. } else {
  115. ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
  116. }
  117. if !ok {
  118. return nil, false
  119. }
  120. if ctx.resizeStream != nil {
  121. ctx.resizeChan = make(chan term.Size)
  122. go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
  123. }
  124. return ctx, true
  125. }
  126. func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
  127. protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
  128. if err != nil {
  129. w.WriteHeader(http.StatusBadRequest)
  130. fmt.Fprint(w, err.Error())
  131. return nil, false
  132. }
  133. streamCh := make(chan streamAndReply)
  134. upgrader := spdy.NewResponseUpgrader()
  135. conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
  136. streamCh <- streamAndReply{Stream: stream, replySent: replySent}
  137. return nil
  138. })
  139. // from this point on, we can no longer call methods on response
  140. if conn == nil {
  141. // The upgrader is responsible for notifying the client of any errors that
  142. // occurred during upgrading. All we can do is return here at this point
  143. // if we weren't successful in upgrading.
  144. return nil, false
  145. }
  146. conn.SetIdleTimeout(idleTimeout)
  147. var handler protocolHandler
  148. switch protocol {
  149. case StreamProtocolV4Name:
  150. handler = &v4ProtocolHandler{}
  151. case StreamProtocolV3Name:
  152. handler = &v3ProtocolHandler{}
  153. case StreamProtocolV2Name:
  154. handler = &v2ProtocolHandler{}
  155. case "":
  156. glog.V(4).Infof("Client did not request protocol negotiaion. Falling back to %q", StreamProtocolV1Name)
  157. fallthrough
  158. case StreamProtocolV1Name:
  159. handler = &v1ProtocolHandler{}
  160. }
  161. if opts.tty && handler.supportsTerminalResizing() {
  162. opts.expectedStreams++
  163. }
  164. expired := time.NewTimer(streamCreationTimeout)
  165. defer expired.Stop()
  166. ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C)
  167. if err != nil {
  168. runtime.HandleError(err)
  169. return nil, false
  170. }
  171. ctx.conn = conn
  172. ctx.tty = opts.tty
  173. return ctx, true
  174. }
  175. type protocolHandler interface {
  176. // waitForStreams waits for the expected streams or a timeout, returning a
  177. // remoteCommandContext if all the streams were received, or an error if not.
  178. waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
  179. // supportsTerminalResizing returns true if the protocol handler supports terminal resizing
  180. supportsTerminalResizing() bool
  181. }
  182. // v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
  183. // in from v3 in the error stream format using an json-marshaled unversioned.Status which carries
  184. // the process' exit code.
  185. type v4ProtocolHandler struct{}
  186. func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  187. ctx := &context{}
  188. receivedStreams := 0
  189. replyChan := make(chan struct{})
  190. stop := make(chan struct{})
  191. defer close(stop)
  192. WaitForStreams:
  193. for {
  194. select {
  195. case stream := <-streams:
  196. streamType := stream.Headers().Get(api.StreamType)
  197. switch streamType {
  198. case api.StreamTypeError:
  199. ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
  200. go waitStreamReply(stream.replySent, replyChan, stop)
  201. case api.StreamTypeStdin:
  202. ctx.stdinStream = stream
  203. go waitStreamReply(stream.replySent, replyChan, stop)
  204. case api.StreamTypeStdout:
  205. ctx.stdoutStream = stream
  206. go waitStreamReply(stream.replySent, replyChan, stop)
  207. case api.StreamTypeStderr:
  208. ctx.stderrStream = stream
  209. go waitStreamReply(stream.replySent, replyChan, stop)
  210. case api.StreamTypeResize:
  211. ctx.resizeStream = stream
  212. go waitStreamReply(stream.replySent, replyChan, stop)
  213. default:
  214. runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
  215. }
  216. case <-replyChan:
  217. receivedStreams++
  218. if receivedStreams == expectedStreams {
  219. break WaitForStreams
  220. }
  221. case <-expired:
  222. // TODO find a way to return the error to the user. Maybe use a separate
  223. // stream to report errors?
  224. return nil, errors.New("timed out waiting for client to create streams")
  225. }
  226. }
  227. return ctx, nil
  228. }
  229. // supportsTerminalResizing returns true because v4ProtocolHandler supports it
  230. func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
  231. // v3ProtocolHandler implements the V3 protocol version for streaming command execution.
  232. type v3ProtocolHandler struct{}
  233. func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  234. ctx := &context{}
  235. receivedStreams := 0
  236. replyChan := make(chan struct{})
  237. stop := make(chan struct{})
  238. defer close(stop)
  239. WaitForStreams:
  240. for {
  241. select {
  242. case stream := <-streams:
  243. streamType := stream.Headers().Get(api.StreamType)
  244. switch streamType {
  245. case api.StreamTypeError:
  246. ctx.writeStatus = v1WriteStatusFunc(stream)
  247. go waitStreamReply(stream.replySent, replyChan, stop)
  248. case api.StreamTypeStdin:
  249. ctx.stdinStream = stream
  250. go waitStreamReply(stream.replySent, replyChan, stop)
  251. case api.StreamTypeStdout:
  252. ctx.stdoutStream = stream
  253. go waitStreamReply(stream.replySent, replyChan, stop)
  254. case api.StreamTypeStderr:
  255. ctx.stderrStream = stream
  256. go waitStreamReply(stream.replySent, replyChan, stop)
  257. case api.StreamTypeResize:
  258. ctx.resizeStream = stream
  259. go waitStreamReply(stream.replySent, replyChan, stop)
  260. default:
  261. runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
  262. }
  263. case <-replyChan:
  264. receivedStreams++
  265. if receivedStreams == expectedStreams {
  266. break WaitForStreams
  267. }
  268. case <-expired:
  269. // TODO find a way to return the error to the user. Maybe use a separate
  270. // stream to report errors?
  271. return nil, errors.New("timed out waiting for client to create streams")
  272. }
  273. }
  274. return ctx, nil
  275. }
  276. // supportsTerminalResizing returns true because v3ProtocolHandler supports it
  277. func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
  278. // v2ProtocolHandler implements the V2 protocol version for streaming command execution.
  279. type v2ProtocolHandler struct{}
  280. func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  281. ctx := &context{}
  282. receivedStreams := 0
  283. replyChan := make(chan struct{})
  284. stop := make(chan struct{})
  285. defer close(stop)
  286. WaitForStreams:
  287. for {
  288. select {
  289. case stream := <-streams:
  290. streamType := stream.Headers().Get(api.StreamType)
  291. switch streamType {
  292. case api.StreamTypeError:
  293. ctx.writeStatus = v1WriteStatusFunc(stream)
  294. go waitStreamReply(stream.replySent, replyChan, stop)
  295. case api.StreamTypeStdin:
  296. ctx.stdinStream = stream
  297. go waitStreamReply(stream.replySent, replyChan, stop)
  298. case api.StreamTypeStdout:
  299. ctx.stdoutStream = stream
  300. go waitStreamReply(stream.replySent, replyChan, stop)
  301. case api.StreamTypeStderr:
  302. ctx.stderrStream = stream
  303. go waitStreamReply(stream.replySent, replyChan, stop)
  304. default:
  305. runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
  306. }
  307. case <-replyChan:
  308. receivedStreams++
  309. if receivedStreams == expectedStreams {
  310. break WaitForStreams
  311. }
  312. case <-expired:
  313. // TODO find a way to return the error to the user. Maybe use a separate
  314. // stream to report errors?
  315. return nil, errors.New("timed out waiting for client to create streams")
  316. }
  317. }
  318. return ctx, nil
  319. }
  320. // supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
  321. func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
  322. // v1ProtocolHandler implements the V1 protocol version for streaming command execution.
  323. type v1ProtocolHandler struct{}
  324. func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
  325. ctx := &context{}
  326. receivedStreams := 0
  327. replyChan := make(chan struct{})
  328. stop := make(chan struct{})
  329. defer close(stop)
  330. WaitForStreams:
  331. for {
  332. select {
  333. case stream := <-streams:
  334. streamType := stream.Headers().Get(api.StreamType)
  335. switch streamType {
  336. case api.StreamTypeError:
  337. ctx.writeStatus = v1WriteStatusFunc(stream)
  338. // This defer statement shouldn't be here, but due to previous refactoring, it ended up in
  339. // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
  340. // the v2ProtocolHandler.
  341. defer stream.Reset()
  342. go waitStreamReply(stream.replySent, replyChan, stop)
  343. case api.StreamTypeStdin:
  344. ctx.stdinStream = stream
  345. go waitStreamReply(stream.replySent, replyChan, stop)
  346. case api.StreamTypeStdout:
  347. ctx.stdoutStream = stream
  348. go waitStreamReply(stream.replySent, replyChan, stop)
  349. case api.StreamTypeStderr:
  350. ctx.stderrStream = stream
  351. go waitStreamReply(stream.replySent, replyChan, stop)
  352. default:
  353. runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
  354. }
  355. case <-replyChan:
  356. receivedStreams++
  357. if receivedStreams == expectedStreams {
  358. break WaitForStreams
  359. }
  360. case <-expired:
  361. // TODO find a way to return the error to the user. Maybe use a separate
  362. // stream to report errors?
  363. return nil, errors.New("timed out waiting for client to create streams")
  364. }
  365. }
  366. if ctx.stdinStream != nil {
  367. ctx.stdinStream.Close()
  368. }
  369. return ctx, nil
  370. }
  371. // supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
  372. func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
  373. func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
  374. defer runtime.HandleCrash()
  375. decoder := json.NewDecoder(stream)
  376. for {
  377. size := term.Size{}
  378. if err := decoder.Decode(&size); err != nil {
  379. break
  380. }
  381. channel <- size
  382. }
  383. }
  384. func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
  385. return func(status *apierrors.StatusError) error {
  386. if status.Status().Status == unversioned.StatusSuccess {
  387. return nil // send error messages
  388. }
  389. _, err := stream.Write([]byte(status.Error()))
  390. return err
  391. }
  392. }
  393. // v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
  394. // as json in the error channel.
  395. func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
  396. return func(status *apierrors.StatusError) error {
  397. bs, err := json.Marshal(status.Status())
  398. if err != nil {
  399. return err
  400. }
  401. _, err = stream.Write(bs)
  402. return err
  403. }
  404. }