v3.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package remotecommand
  14. import (
  15. "encoding/json"
  16. "io"
  17. "net/http"
  18. "sync"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/util/runtime"
  21. )
  22. // streamProtocolV3 implements version 3 of the streaming protocol for attach
  23. // and exec. This version adds support for resizing the container's terminal.
  24. type streamProtocolV3 struct {
  25. *streamProtocolV2
  26. resizeStream io.Writer
  27. }
  28. var _ streamProtocolHandler = &streamProtocolV3{}
  29. func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
  30. return &streamProtocolV3{
  31. streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
  32. }
  33. }
  34. func (p *streamProtocolV3) createStreams(conn streamCreator) error {
  35. // set up the streams from v2
  36. if err := p.streamProtocolV2.createStreams(conn); err != nil {
  37. return err
  38. }
  39. // set up resize stream
  40. if p.Tty {
  41. headers := http.Header{}
  42. headers.Set(api.StreamType, api.StreamTypeResize)
  43. var err error
  44. p.resizeStream, err = conn.CreateStream(headers)
  45. if err != nil {
  46. return err
  47. }
  48. }
  49. return nil
  50. }
  51. func (p *streamProtocolV3) handleResizes() {
  52. if p.resizeStream == nil || p.TerminalSizeQueue == nil {
  53. return
  54. }
  55. go func() {
  56. defer runtime.HandleCrash()
  57. encoder := json.NewEncoder(p.resizeStream)
  58. for {
  59. size := p.TerminalSizeQueue.Next()
  60. if size == nil {
  61. return
  62. }
  63. if err := encoder.Encode(&size); err != nil {
  64. runtime.HandleError(err)
  65. }
  66. }
  67. }()
  68. }
  69. func (p *streamProtocolV3) stream(conn streamCreator) error {
  70. if err := p.createStreams(conn); err != nil {
  71. return err
  72. }
  73. // now that all the streams have been created, proceed with reading & copying
  74. errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
  75. p.handleResizes()
  76. p.copyStdin()
  77. var wg sync.WaitGroup
  78. p.copyStdout(&wg)
  79. p.copyStderr(&wg)
  80. // we're waiting for stdout/stderr to finish copying
  81. wg.Wait()
  82. // waits for errorStream to finish reading with an error or nil
  83. return <-errorChan
  84. }
  85. type errorDecoderV3 struct {
  86. errorDecoderV2
  87. }