remotecommand.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package remotecommand
  14. import (
  15. "fmt"
  16. "io"
  17. "net/http"
  18. "net/url"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/client/restclient"
  21. "k8s.io/kubernetes/pkg/client/transport"
  22. "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
  23. "k8s.io/kubernetes/pkg/util/httpstream"
  24. "k8s.io/kubernetes/pkg/util/httpstream/spdy"
  25. "k8s.io/kubernetes/pkg/util/term"
  26. )
  27. // StreamOptions holds information pertaining to the current streaming session: supported stream
  28. // protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
  29. // support terminal resizing.
  30. type StreamOptions struct {
  31. SupportedProtocols []string
  32. Stdin io.Reader
  33. Stdout io.Writer
  34. Stderr io.Writer
  35. Tty bool
  36. TerminalSizeQueue term.TerminalSizeQueue
  37. }
  38. // Executor is an interface for transporting shell-style streams.
  39. type Executor interface {
  40. // Stream initiates the transport of the standard shell streams. It will transport any
  41. // non-nil stream to a remote system, and return an error if a problem occurs. If tty
  42. // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
  43. // stdout stream).
  44. Stream(options StreamOptions) error
  45. }
  46. // StreamExecutor supports the ability to dial an httpstream connection and the ability to
  47. // run a command line stream protocol over that dialer.
  48. type StreamExecutor interface {
  49. Executor
  50. httpstream.Dialer
  51. }
  52. // streamExecutor handles transporting standard shell streams over an httpstream connection.
  53. type streamExecutor struct {
  54. upgrader httpstream.UpgradeRoundTripper
  55. transport http.RoundTripper
  56. method string
  57. url *url.URL
  58. }
  59. // NewExecutor connects to the provided server and upgrades the connection to
  60. // multiplexed bidirectional streams. The current implementation uses SPDY,
  61. // but this could be replaced with HTTP/2 once it's available, or something else.
  62. // TODO: the common code between this and portforward could be abstracted.
  63. func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
  64. tlsConfig, err := restclient.TLSConfigFor(config)
  65. if err != nil {
  66. return nil, err
  67. }
  68. upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig)
  69. wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
  70. if err != nil {
  71. return nil, err
  72. }
  73. return &streamExecutor{
  74. upgrader: upgradeRoundTripper,
  75. transport: wrapper,
  76. method: method,
  77. url: url,
  78. }, nil
  79. }
  80. // NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
  81. // streams. This method takes a stream upgrader and an optional function that is invoked
  82. // to wrap the round tripper. This method may be used by clients that are lower level than
  83. // Kubernetes clients or need to provide their own upgrade round tripper.
  84. func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
  85. var rt http.RoundTripper = upgrader
  86. if fn != nil {
  87. rt = fn(rt)
  88. }
  89. return &streamExecutor{
  90. upgrader: upgrader,
  91. transport: rt,
  92. method: method,
  93. url: url,
  94. }, nil
  95. }
  96. // Dial opens a connection to a remote server and attempts to negotiate a SPDY
  97. // connection. Upon success, it returns the connection and the protocol
  98. // selected by the server.
  99. func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
  100. rt := transport.DebugWrappers(e.transport)
  101. // TODO the client probably shouldn't be created here, as it doesn't allow
  102. // flexibility to allow callers to configure it.
  103. client := &http.Client{Transport: rt}
  104. req, err := http.NewRequest(e.method, e.url.String(), nil)
  105. if err != nil {
  106. return nil, "", fmt.Errorf("error creating request: %v", err)
  107. }
  108. for i := range protocols {
  109. req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
  110. }
  111. resp, err := client.Do(req)
  112. if err != nil {
  113. return nil, "", fmt.Errorf("error sending request: %v", err)
  114. }
  115. defer resp.Body.Close()
  116. conn, err := e.upgrader.NewConnection(resp)
  117. if err != nil {
  118. return nil, "", err
  119. }
  120. return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
  121. }
  122. type streamCreator interface {
  123. CreateStream(headers http.Header) (httpstream.Stream, error)
  124. }
  125. type streamProtocolHandler interface {
  126. stream(conn streamCreator) error
  127. }
  128. // Stream opens a protocol streamer to the server and streams until a client closes
  129. // the connection or the server disconnects.
  130. func (e *streamExecutor) Stream(options StreamOptions) error {
  131. conn, protocol, err := e.Dial(options.SupportedProtocols...)
  132. if err != nil {
  133. return err
  134. }
  135. defer conn.Close()
  136. var streamer streamProtocolHandler
  137. switch protocol {
  138. case remotecommand.StreamProtocolV4Name:
  139. streamer = newStreamProtocolV4(options)
  140. case remotecommand.StreamProtocolV3Name:
  141. streamer = newStreamProtocolV3(options)
  142. case remotecommand.StreamProtocolV2Name:
  143. streamer = newStreamProtocolV2(options)
  144. case "":
  145. glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
  146. fallthrough
  147. case remotecommand.StreamProtocolV1Name:
  148. streamer = newStreamProtocolV1(options)
  149. }
  150. return streamer.stream(conn)
  151. }