123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package remotecommand
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "time"
- "k8s.io/kubernetes/pkg/api"
- apierrors "k8s.io/kubernetes/pkg/api/errors"
- "k8s.io/kubernetes/pkg/api/unversioned"
- "k8s.io/kubernetes/pkg/util/httpstream"
- "k8s.io/kubernetes/pkg/util/httpstream/spdy"
- "k8s.io/kubernetes/pkg/util/runtime"
- "k8s.io/kubernetes/pkg/util/term"
- "k8s.io/kubernetes/pkg/util/wsstream"
- "github.com/golang/glog"
- )
- // options contains details about which streams are required for
- // remote command execution.
- type options struct {
- stdin bool
- stdout bool
- stderr bool
- tty bool
- expectedStreams int
- }
- // newOptions creates a new options from the Request.
- func newOptions(req *http.Request) (*options, error) {
- tty := req.FormValue(api.ExecTTYParam) == "1"
- stdin := req.FormValue(api.ExecStdinParam) == "1"
- stdout := req.FormValue(api.ExecStdoutParam) == "1"
- stderr := req.FormValue(api.ExecStderrParam) == "1"
- if tty && stderr {
- // TODO: make this an error before we reach this method
- glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
- stderr = false
- }
- // count the streams client asked for, starting with 1
- expectedStreams := 1
- if stdin {
- expectedStreams++
- }
- if stdout {
- expectedStreams++
- }
- if stderr {
- expectedStreams++
- }
- if expectedStreams == 1 {
- return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
- }
- return &options{
- stdin: stdin,
- stdout: stdout,
- stderr: stderr,
- tty: tty,
- expectedStreams: expectedStreams,
- }, nil
- }
- // context contains the connection and streams used when
- // forwarding an attach or execute session into a container.
- type context struct {
- conn io.Closer
- stdinStream io.ReadCloser
- stdoutStream io.WriteCloser
- stderrStream io.WriteCloser
- writeStatus func(status *apierrors.StatusError) error
- resizeStream io.ReadCloser
- resizeChan chan term.Size
- tty bool
- }
- // streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
- // enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
- // replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
- // received and right after, the connection gets closed).
- type streamAndReply struct {
- httpstream.Stream
- replySent <-chan struct{}
- }
- // waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
- // an empty struct to the notify channel.
- func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
- select {
- case <-replySent:
- notify <- struct{}{}
- case <-stop:
- }
- }
- func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
- opts, err := newOptions(req)
- if err != nil {
- runtime.HandleError(err)
- w.WriteHeader(http.StatusBadRequest)
- fmt.Fprint(w, err.Error())
- return nil, false
- }
- var ctx *context
- var ok bool
- if wsstream.IsWebSocketRequest(req) {
- ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
- } else {
- ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
- }
- if !ok {
- return nil, false
- }
- if ctx.resizeStream != nil {
- ctx.resizeChan = make(chan term.Size)
- go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
- }
- return ctx, true
- }
- func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
- protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
- if err != nil {
- w.WriteHeader(http.StatusBadRequest)
- fmt.Fprint(w, err.Error())
- return nil, false
- }
- streamCh := make(chan streamAndReply)
- upgrader := spdy.NewResponseUpgrader()
- conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
- streamCh <- streamAndReply{Stream: stream, replySent: replySent}
- return nil
- })
- // from this point on, we can no longer call methods on response
- if conn == nil {
- // The upgrader is responsible for notifying the client of any errors that
- // occurred during upgrading. All we can do is return here at this point
- // if we weren't successful in upgrading.
- return nil, false
- }
- conn.SetIdleTimeout(idleTimeout)
- var handler protocolHandler
- switch protocol {
- case StreamProtocolV4Name:
- handler = &v4ProtocolHandler{}
- case StreamProtocolV3Name:
- handler = &v3ProtocolHandler{}
- case StreamProtocolV2Name:
- handler = &v2ProtocolHandler{}
- case "":
- glog.V(4).Infof("Client did not request protocol negotiaion. Falling back to %q", StreamProtocolV1Name)
- fallthrough
- case StreamProtocolV1Name:
- handler = &v1ProtocolHandler{}
- }
- if opts.tty && handler.supportsTerminalResizing() {
- opts.expectedStreams++
- }
- expired := time.NewTimer(streamCreationTimeout)
- defer expired.Stop()
- ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C)
- if err != nil {
- runtime.HandleError(err)
- return nil, false
- }
- ctx.conn = conn
- ctx.tty = opts.tty
- return ctx, true
- }
- type protocolHandler interface {
- // waitForStreams waits for the expected streams or a timeout, returning a
- // remoteCommandContext if all the streams were received, or an error if not.
- waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
- // supportsTerminalResizing returns true if the protocol handler supports terminal resizing
- supportsTerminalResizing() bool
- }
- // v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
- // in from v3 in the error stream format using an json-marshaled unversioned.Status which carries
- // the process' exit code.
- type v4ProtocolHandler struct{}
- func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
- ctx := &context{}
- receivedStreams := 0
- replyChan := make(chan struct{})
- stop := make(chan struct{})
- defer close(stop)
- WaitForStreams:
- for {
- select {
- case stream := <-streams:
- streamType := stream.Headers().Get(api.StreamType)
- switch streamType {
- case api.StreamTypeError:
- ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdin:
- ctx.stdinStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdout:
- ctx.stdoutStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStderr:
- ctx.stderrStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeResize:
- ctx.resizeStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- default:
- runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
- }
- case <-replyChan:
- receivedStreams++
- if receivedStreams == expectedStreams {
- break WaitForStreams
- }
- case <-expired:
- // TODO find a way to return the error to the user. Maybe use a separate
- // stream to report errors?
- return nil, errors.New("timed out waiting for client to create streams")
- }
- }
- return ctx, nil
- }
- // supportsTerminalResizing returns true because v4ProtocolHandler supports it
- func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
- // v3ProtocolHandler implements the V3 protocol version for streaming command execution.
- type v3ProtocolHandler struct{}
- func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
- ctx := &context{}
- receivedStreams := 0
- replyChan := make(chan struct{})
- stop := make(chan struct{})
- defer close(stop)
- WaitForStreams:
- for {
- select {
- case stream := <-streams:
- streamType := stream.Headers().Get(api.StreamType)
- switch streamType {
- case api.StreamTypeError:
- ctx.writeStatus = v1WriteStatusFunc(stream)
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdin:
- ctx.stdinStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdout:
- ctx.stdoutStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStderr:
- ctx.stderrStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeResize:
- ctx.resizeStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- default:
- runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
- }
- case <-replyChan:
- receivedStreams++
- if receivedStreams == expectedStreams {
- break WaitForStreams
- }
- case <-expired:
- // TODO find a way to return the error to the user. Maybe use a separate
- // stream to report errors?
- return nil, errors.New("timed out waiting for client to create streams")
- }
- }
- return ctx, nil
- }
- // supportsTerminalResizing returns true because v3ProtocolHandler supports it
- func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
- // v2ProtocolHandler implements the V2 protocol version for streaming command execution.
- type v2ProtocolHandler struct{}
- func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
- ctx := &context{}
- receivedStreams := 0
- replyChan := make(chan struct{})
- stop := make(chan struct{})
- defer close(stop)
- WaitForStreams:
- for {
- select {
- case stream := <-streams:
- streamType := stream.Headers().Get(api.StreamType)
- switch streamType {
- case api.StreamTypeError:
- ctx.writeStatus = v1WriteStatusFunc(stream)
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdin:
- ctx.stdinStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdout:
- ctx.stdoutStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStderr:
- ctx.stderrStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- default:
- runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
- }
- case <-replyChan:
- receivedStreams++
- if receivedStreams == expectedStreams {
- break WaitForStreams
- }
- case <-expired:
- // TODO find a way to return the error to the user. Maybe use a separate
- // stream to report errors?
- return nil, errors.New("timed out waiting for client to create streams")
- }
- }
- return ctx, nil
- }
- // supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
- func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
- // v1ProtocolHandler implements the V1 protocol version for streaming command execution.
- type v1ProtocolHandler struct{}
- func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
- ctx := &context{}
- receivedStreams := 0
- replyChan := make(chan struct{})
- stop := make(chan struct{})
- defer close(stop)
- WaitForStreams:
- for {
- select {
- case stream := <-streams:
- streamType := stream.Headers().Get(api.StreamType)
- switch streamType {
- case api.StreamTypeError:
- ctx.writeStatus = v1WriteStatusFunc(stream)
- // This defer statement shouldn't be here, but due to previous refactoring, it ended up in
- // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
- // the v2ProtocolHandler.
- defer stream.Reset()
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdin:
- ctx.stdinStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStdout:
- ctx.stdoutStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- case api.StreamTypeStderr:
- ctx.stderrStream = stream
- go waitStreamReply(stream.replySent, replyChan, stop)
- default:
- runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
- }
- case <-replyChan:
- receivedStreams++
- if receivedStreams == expectedStreams {
- break WaitForStreams
- }
- case <-expired:
- // TODO find a way to return the error to the user. Maybe use a separate
- // stream to report errors?
- return nil, errors.New("timed out waiting for client to create streams")
- }
- }
- if ctx.stdinStream != nil {
- ctx.stdinStream.Close()
- }
- return ctx, nil
- }
- // supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
- func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
- func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
- defer runtime.HandleCrash()
- decoder := json.NewDecoder(stream)
- for {
- size := term.Size{}
- if err := decoder.Decode(&size); err != nil {
- break
- }
- channel <- size
- }
- }
- func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
- return func(status *apierrors.StatusError) error {
- if status.Status().Status == unversioned.StatusSuccess {
- return nil // send error messages
- }
- _, err := stream.Write([]byte(status.Error()))
- return err
- }
- }
- // v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
- // as json in the error channel.
- func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
- return func(status *apierrors.StatusError) error {
- bs, err := json.Marshal(status.Status())
- if err != nil {
- return err
- }
- _, err = stream.Write(bs)
- return err
- }
- }
|