123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package remotecommand
- import (
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "sync"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/util/runtime"
- )
- // streamProtocolV2 implements version 2 of the streaming protocol for attach
- // and exec. The original streaming protocol was unversioned. As a result, this
- // version is referred to as version 2, even though it is the first actual
- // numbered version.
- type streamProtocolV2 struct {
- StreamOptions
- errorStream io.Reader
- remoteStdin io.ReadWriteCloser
- remoteStdout io.Reader
- remoteStderr io.Reader
- }
- var _ streamProtocolHandler = &streamProtocolV2{}
- func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
- return &streamProtocolV2{
- StreamOptions: options,
- }
- }
- func (p *streamProtocolV2) createStreams(conn streamCreator) error {
- var err error
- headers := http.Header{}
- // set up error stream
- headers.Set(api.StreamType, api.StreamTypeError)
- p.errorStream, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- // set up stdin stream
- if p.Stdin != nil {
- headers.Set(api.StreamType, api.StreamTypeStdin)
- p.remoteStdin, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- }
- // set up stdout stream
- if p.Stdout != nil {
- headers.Set(api.StreamType, api.StreamTypeStdout)
- p.remoteStdout, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- }
- // set up stderr stream
- if p.Stderr != nil && !p.Tty {
- headers.Set(api.StreamType, api.StreamTypeStderr)
- p.remoteStderr, err = conn.CreateStream(headers)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func (p *streamProtocolV2) copyStdin() {
- if p.Stdin != nil {
- var once sync.Once
- // copy from client's stdin to container's stdin
- go func() {
- defer runtime.HandleCrash()
- // if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
- // we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
- // the executed command will remain running.
- defer once.Do(func() { p.remoteStdin.Close() })
- if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
- runtime.HandleError(err)
- }
- }()
- // read from remoteStdin until the stream is closed. this is essential to
- // be able to exit interactive sessions cleanly and not leak goroutines or
- // hang the client's terminal.
- //
- // TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
- // required by engine-api.
- //
- // go-dockerclient's current hijack implementation
- // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
- // waits for all three streams (stdin/stdout/stderr) to finish copying
- // before returning. When hijack finishes copying stdout/stderr, it calls
- // Close() on its side of remoteStdin, which allows this copy to complete.
- // When that happens, we must Close() on our side of remoteStdin, to
- // allow the copy in hijack to complete, and hijack to return.
- go func() {
- defer runtime.HandleCrash()
- defer once.Do(func() { p.remoteStdin.Close() })
- // this "copy" doesn't actually read anything - it's just here to wait for
- // the server to close remoteStdin.
- if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil {
- runtime.HandleError(err)
- }
- }()
- }
- }
- func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
- if p.Stdout == nil {
- return
- }
- wg.Add(1)
- go func() {
- defer runtime.HandleCrash()
- defer wg.Done()
- if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
- runtime.HandleError(err)
- }
- }()
- }
- func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
- if p.Stderr == nil || p.Tty {
- return
- }
- wg.Add(1)
- go func() {
- defer runtime.HandleCrash()
- defer wg.Done()
- if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
- runtime.HandleError(err)
- }
- }()
- }
- func (p *streamProtocolV2) stream(conn streamCreator) error {
- if err := p.createStreams(conn); err != nil {
- return err
- }
- // now that all the streams have been created, proceed with reading & copying
- errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
- p.copyStdin()
- var wg sync.WaitGroup
- p.copyStdout(&wg)
- p.copyStderr(&wg)
- // we're waiting for stdout/stderr to finish copying
- wg.Wait()
- // waits for errorStream to finish reading with an error or nil
- return <-errorChan
- }
- // errorDecoderV2 interprets the error channel data as plain text.
- type errorDecoderV2 struct{}
- func (d *errorDecoderV2) decode(message []byte) error {
- return fmt.Errorf("error executing remote command: %s", message)
- }
|