123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package remotecommand
- import (
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/util/httpstream"
- )
- // streamProtocolV1 implements the first version of the streaming exec & attach
- // protocol. This version has some bugs, such as not being able to detect when
- // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
- // http://issues.k8s.io/13395 for more details.
- type streamProtocolV1 struct {
- StreamOptions
- errorStream httpstream.Stream
- remoteStdin httpstream.Stream
- remoteStdout httpstream.Stream
- remoteStderr httpstream.Stream
- }
- var _ streamProtocolHandler = &streamProtocolV1{}
- func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
- return &streamProtocolV1{
- StreamOptions: options,
- }
- }
- func (p *streamProtocolV1) stream(conn streamCreator) error {
- doneChan := make(chan struct{}, 2)
- errorChan := make(chan error)
- cp := func(s string, dst io.Writer, src io.Reader) {
- glog.V(6).Infof("Copying %s", s)
- defer glog.V(6).Infof("Done copying %s", s)
- if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
- glog.Errorf("Error copying %s: %v", s, err)
- }
- if s == api.StreamTypeStdout || s == api.StreamTypeStderr {
- doneChan <- struct{}{}
- }
- }
- // set up all the streams first
- var err error
- headers := http.Header{}
- headers.Set(api.StreamType, api.StreamTypeError)
- p.errorStream, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- defer p.errorStream.Reset()
- // Create all the streams first, then start the copy goroutines. The server doesn't start its copy
- // goroutines until it's received all of the streams. If the client creates the stdin stream and
- // immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
- // spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
- // getting processed because the server hasn't started its copying, and it won't do that until it
- // gets all the streams. By creating all the streams first, we ensure that the server is ready to
- // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
- if p.Stdin != nil {
- headers.Set(api.StreamType, api.StreamTypeStdin)
- p.remoteStdin, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- defer p.remoteStdin.Reset()
- }
- if p.Stdout != nil {
- headers.Set(api.StreamType, api.StreamTypeStdout)
- p.remoteStdout, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- defer p.remoteStdout.Reset()
- }
- if p.Stderr != nil && !p.Tty {
- headers.Set(api.StreamType, api.StreamTypeStderr)
- p.remoteStderr, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- defer p.remoteStderr.Reset()
- }
- // now that all the streams have been created, proceed with reading & copying
- // always read from errorStream
- go func() {
- message, err := ioutil.ReadAll(p.errorStream)
- if err != nil && err != io.EOF {
- errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
- return
- }
- if len(message) > 0 {
- errorChan <- fmt.Errorf("Error executing remote command: %s", message)
- return
- }
- }()
- if p.Stdin != nil {
- // TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
- // because stdin is not closed until the process exits. If we try to call
- // stdin.Close(), it returns no error but doesn't unblock the copy. It will
- // exit when the process exits, instead.
- go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
- }
- waitCount := 0
- completedStreams := 0
- if p.Stdout != nil {
- waitCount++
- go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
- }
- if p.Stderr != nil && !p.Tty {
- waitCount++
- go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
- }
- Loop:
- for {
- select {
- case <-doneChan:
- completedStreams++
- if completedStreams == waitCount {
- break Loop
- }
- case err := <-errorChan:
- return err
- }
- }
- return nil
- }
|