conn.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wsstream
  14. import (
  15. "encoding/base64"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "regexp"
  20. "strings"
  21. "time"
  22. "github.com/golang/glog"
  23. "golang.org/x/net/websocket"
  24. "k8s.io/kubernetes/pkg/util/runtime"
  25. )
  26. // The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
  27. // the channel number (zero indexed) the message was sent on. Messages in both directions should
  28. // prefix their messages with this channel byte. When used for remote execution, the channel numbers
  29. // are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
  30. // (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
  31. // are received by the server.
  32. //
  33. // Example client session:
  34. //
  35. // CONNECT http://server.com with subprotocol "channel.k8s.io"
  36. // WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
  37. // READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
  38. // CLOSE
  39. //
  40. const ChannelWebSocketProtocol = "channel.k8s.io"
  41. // The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
  42. // indicating the channel number (zero indexed) the message was sent on. Messages in both directions
  43. // should prefix their messages with this channel char. When used for remote execution, the channel
  44. // numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
  45. // and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
  46. // be valid) and data written by the server to the client is base64 encoded.
  47. //
  48. // Example client session:
  49. //
  50. // CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
  51. // WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
  52. // READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
  53. // CLOSE
  54. //
  55. const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
  56. type codecType int
  57. const (
  58. rawCodec codecType = iota
  59. base64Codec
  60. )
  61. type ChannelType int
  62. const (
  63. IgnoreChannel ChannelType = iota
  64. ReadChannel
  65. WriteChannel
  66. ReadWriteChannel
  67. )
  68. var (
  69. // connectionUpgradeRegex matches any Connection header value that includes upgrade
  70. connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
  71. )
  72. // IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
  73. // for WebSockets.
  74. func IsWebSocketRequest(req *http.Request) bool {
  75. return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
  76. }
  77. // IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
  78. // read and write deadlines are pushed every time a new message is received.
  79. func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
  80. defer runtime.HandleCrash()
  81. var data []byte
  82. for {
  83. resetTimeout(ws, timeout)
  84. if err := websocket.Message.Receive(ws, &data); err != nil {
  85. return
  86. }
  87. }
  88. }
  89. // handshake ensures the provided user protocol matches one of the allowed protocols. It returns
  90. // no error if no protocol is specified.
  91. func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
  92. protocols := config.Protocol
  93. if len(protocols) == 0 {
  94. protocols = []string{""}
  95. }
  96. for _, protocol := range protocols {
  97. for _, allow := range allowed {
  98. if allow == protocol {
  99. config.Protocol = []string{protocol}
  100. return nil
  101. }
  102. }
  103. }
  104. return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
  105. }
  106. // ChannelProtocolConfig describes a websocket subprotocol with channels.
  107. type ChannelProtocolConfig struct {
  108. Binary bool
  109. Channels []ChannelType
  110. }
  111. // NewDefaultChannelProtocols returns a channel protocol map with the
  112. // subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
  113. // channels.
  114. func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
  115. return map[string]ChannelProtocolConfig{
  116. "": {Binary: true, Channels: channels},
  117. ChannelWebSocketProtocol: {Binary: true, Channels: channels},
  118. Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
  119. }
  120. }
  121. // Conn supports sending multiple binary channels over a websocket connection.
  122. type Conn struct {
  123. protocols map[string]ChannelProtocolConfig
  124. selectedProtocol string
  125. channels []*websocketChannel
  126. codec codecType
  127. ready chan struct{}
  128. ws *websocket.Conn
  129. timeout time.Duration
  130. }
  131. // NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
  132. // web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
  133. // future use. The channel types for each channel are passed as an array, supporting the different
  134. // duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
  135. //
  136. // The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
  137. // name is used if websocket.Config.Protocol is empty.
  138. func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
  139. return &Conn{
  140. ready: make(chan struct{}),
  141. protocols: protocols,
  142. }
  143. }
  144. // SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
  145. // there is no timeout on the connection.
  146. func (conn *Conn) SetIdleTimeout(duration time.Duration) {
  147. conn.timeout = duration
  148. }
  149. // Open the connection and create channels for reading and writing. It returns
  150. // the selected subprotocol, a slice of channels and an error.
  151. func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
  152. go func() {
  153. defer runtime.HandleCrash()
  154. defer conn.Close()
  155. websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
  156. }()
  157. <-conn.ready
  158. rwc := make([]io.ReadWriteCloser, len(conn.channels))
  159. for i := range conn.channels {
  160. rwc[i] = conn.channels[i]
  161. }
  162. return conn.selectedProtocol, rwc, nil
  163. }
  164. func (conn *Conn) initialize(ws *websocket.Conn) {
  165. negotiated := ws.Config().Protocol
  166. conn.selectedProtocol = negotiated[0]
  167. p := conn.protocols[conn.selectedProtocol]
  168. if p.Binary {
  169. conn.codec = rawCodec
  170. } else {
  171. conn.codec = base64Codec
  172. }
  173. conn.ws = ws
  174. conn.channels = make([]*websocketChannel, len(p.Channels))
  175. for i, t := range p.Channels {
  176. switch t {
  177. case ReadChannel:
  178. conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
  179. case WriteChannel:
  180. conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
  181. case ReadWriteChannel:
  182. conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
  183. case IgnoreChannel:
  184. conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
  185. }
  186. }
  187. close(conn.ready)
  188. }
  189. func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
  190. supportedProtocols := make([]string, 0, len(conn.protocols))
  191. for p := range conn.protocols {
  192. supportedProtocols = append(supportedProtocols, p)
  193. }
  194. return handshake(config, req, supportedProtocols)
  195. }
  196. func (conn *Conn) resetTimeout() {
  197. if conn.timeout > 0 {
  198. conn.ws.SetDeadline(time.Now().Add(conn.timeout))
  199. }
  200. }
  201. // Close is only valid after Open has been called
  202. func (conn *Conn) Close() error {
  203. <-conn.ready
  204. for _, s := range conn.channels {
  205. s.Close()
  206. }
  207. conn.ws.Close()
  208. return nil
  209. }
  210. // handle implements a websocket handler.
  211. func (conn *Conn) handle(ws *websocket.Conn) {
  212. defer conn.Close()
  213. conn.initialize(ws)
  214. for {
  215. conn.resetTimeout()
  216. var data []byte
  217. if err := websocket.Message.Receive(ws, &data); err != nil {
  218. if err != io.EOF {
  219. glog.Errorf("Error on socket receive: %v", err)
  220. }
  221. break
  222. }
  223. if len(data) == 0 {
  224. continue
  225. }
  226. channel := data[0]
  227. if conn.codec == base64Codec {
  228. channel = channel - '0'
  229. }
  230. data = data[1:]
  231. if int(channel) >= len(conn.channels) {
  232. glog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
  233. continue
  234. }
  235. if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
  236. glog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data))
  237. continue
  238. }
  239. }
  240. }
  241. // write multiplexes the specified channel onto the websocket
  242. func (conn *Conn) write(num byte, data []byte) (int, error) {
  243. conn.resetTimeout()
  244. switch conn.codec {
  245. case rawCodec:
  246. frame := make([]byte, len(data)+1)
  247. frame[0] = num
  248. copy(frame[1:], data)
  249. if err := websocket.Message.Send(conn.ws, frame); err != nil {
  250. return 0, err
  251. }
  252. case base64Codec:
  253. frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
  254. if err := websocket.Message.Send(conn.ws, frame); err != nil {
  255. return 0, err
  256. }
  257. }
  258. return len(data), nil
  259. }
  260. // websocketChannel represents a channel in a connection
  261. type websocketChannel struct {
  262. conn *Conn
  263. num byte
  264. r io.Reader
  265. w io.WriteCloser
  266. read, write bool
  267. }
  268. // newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
  269. // prior to the connection being opened. It may be no, half, or full duplex depending on
  270. // read and write.
  271. func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
  272. r, w := io.Pipe()
  273. return &websocketChannel{conn, num, r, w, read, write}
  274. }
  275. func (p *websocketChannel) Write(data []byte) (int, error) {
  276. if !p.write {
  277. return len(data), nil
  278. }
  279. return p.conn.write(p.num, data)
  280. }
  281. // DataFromSocket is invoked by the connection receiver to move data from the connection
  282. // into a specific channel.
  283. func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
  284. if !p.read {
  285. return len(data), nil
  286. }
  287. switch p.conn.codec {
  288. case rawCodec:
  289. return p.w.Write(data)
  290. case base64Codec:
  291. dst := make([]byte, len(data))
  292. n, err := base64.StdEncoding.Decode(dst, data)
  293. if err != nil {
  294. return 0, err
  295. }
  296. return p.w.Write(dst[:n])
  297. }
  298. return 0, nil
  299. }
  300. func (p *websocketChannel) Read(data []byte) (int, error) {
  301. if !p.read {
  302. return 0, io.EOF
  303. }
  304. return p.r.Read(data)
  305. }
  306. func (p *websocketChannel) Close() error {
  307. return p.w.Close()
  308. }