v1.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package remotecommand
  14. import (
  15. "fmt"
  16. "io"
  17. "io/ioutil"
  18. "net/http"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/util/httpstream"
  22. )
  23. // streamProtocolV1 implements the first version of the streaming exec & attach
  24. // protocol. This version has some bugs, such as not being able to detect when
  25. // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
  26. // http://issues.k8s.io/13395 for more details.
  27. type streamProtocolV1 struct {
  28. StreamOptions
  29. errorStream httpstream.Stream
  30. remoteStdin httpstream.Stream
  31. remoteStdout httpstream.Stream
  32. remoteStderr httpstream.Stream
  33. }
  34. var _ streamProtocolHandler = &streamProtocolV1{}
  35. func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
  36. return &streamProtocolV1{
  37. StreamOptions: options,
  38. }
  39. }
  40. func (p *streamProtocolV1) stream(conn streamCreator) error {
  41. doneChan := make(chan struct{}, 2)
  42. errorChan := make(chan error)
  43. cp := func(s string, dst io.Writer, src io.Reader) {
  44. glog.V(6).Infof("Copying %s", s)
  45. defer glog.V(6).Infof("Done copying %s", s)
  46. if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
  47. glog.Errorf("Error copying %s: %v", s, err)
  48. }
  49. if s == api.StreamTypeStdout || s == api.StreamTypeStderr {
  50. doneChan <- struct{}{}
  51. }
  52. }
  53. // set up all the streams first
  54. var err error
  55. headers := http.Header{}
  56. headers.Set(api.StreamType, api.StreamTypeError)
  57. p.errorStream, err = conn.CreateStream(headers)
  58. if err != nil {
  59. return err
  60. }
  61. defer p.errorStream.Reset()
  62. // Create all the streams first, then start the copy goroutines. The server doesn't start its copy
  63. // goroutines until it's received all of the streams. If the client creates the stdin stream and
  64. // immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
  65. // spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
  66. // getting processed because the server hasn't started its copying, and it won't do that until it
  67. // gets all the streams. By creating all the streams first, we ensure that the server is ready to
  68. // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
  69. if p.Stdin != nil {
  70. headers.Set(api.StreamType, api.StreamTypeStdin)
  71. p.remoteStdin, err = conn.CreateStream(headers)
  72. if err != nil {
  73. return err
  74. }
  75. defer p.remoteStdin.Reset()
  76. }
  77. if p.Stdout != nil {
  78. headers.Set(api.StreamType, api.StreamTypeStdout)
  79. p.remoteStdout, err = conn.CreateStream(headers)
  80. if err != nil {
  81. return err
  82. }
  83. defer p.remoteStdout.Reset()
  84. }
  85. if p.Stderr != nil && !p.Tty {
  86. headers.Set(api.StreamType, api.StreamTypeStderr)
  87. p.remoteStderr, err = conn.CreateStream(headers)
  88. if err != nil {
  89. return err
  90. }
  91. defer p.remoteStderr.Reset()
  92. }
  93. // now that all the streams have been created, proceed with reading & copying
  94. // always read from errorStream
  95. go func() {
  96. message, err := ioutil.ReadAll(p.errorStream)
  97. if err != nil && err != io.EOF {
  98. errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
  99. return
  100. }
  101. if len(message) > 0 {
  102. errorChan <- fmt.Errorf("Error executing remote command: %s", message)
  103. return
  104. }
  105. }()
  106. if p.Stdin != nil {
  107. // TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
  108. // because stdin is not closed until the process exits. If we try to call
  109. // stdin.Close(), it returns no error but doesn't unblock the copy. It will
  110. // exit when the process exits, instead.
  111. go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
  112. }
  113. waitCount := 0
  114. completedStreams := 0
  115. if p.Stdout != nil {
  116. waitCount++
  117. go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
  118. }
  119. if p.Stderr != nil && !p.Tty {
  120. waitCount++
  121. go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
  122. }
  123. Loop:
  124. for {
  125. select {
  126. case <-doneChan:
  127. completedStreams++
  128. if completedStreams == waitCount {
  129. break Loop
  130. }
  131. case err := <-errorChan:
  132. return err
  133. }
  134. }
  135. return nil
  136. }