tunneler.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package genericapiserver
  14. import (
  15. "io/ioutil"
  16. "net"
  17. "net/url"
  18. "os"
  19. "sync/atomic"
  20. "time"
  21. "k8s.io/kubernetes/pkg/ssh"
  22. "k8s.io/kubernetes/pkg/util"
  23. "k8s.io/kubernetes/pkg/util/clock"
  24. "k8s.io/kubernetes/pkg/util/wait"
  25. "github.com/golang/glog"
  26. "github.com/prometheus/client_golang/prometheus"
  27. )
  28. type InstallSSHKey func(user string, data []byte) error
  29. type AddressFunc func() (addresses []string, err error)
  30. type Tunneler interface {
  31. Run(AddressFunc)
  32. Stop()
  33. Dial(net, addr string) (net.Conn, error)
  34. SecondsSinceSync() int64
  35. SecondsSinceSSHKeySync() int64
  36. }
  37. type SSHTunneler struct {
  38. SSHUser string
  39. SSHKeyfile string
  40. InstallSSHKey InstallSSHKey
  41. HealthCheckURL *url.URL
  42. tunnels *ssh.SSHTunnelList
  43. lastSync int64 // Seconds since Epoch
  44. lastSSHKeySync int64 // Seconds since Epoch
  45. lastSyncMetric prometheus.GaugeFunc
  46. clock clock.Clock
  47. getAddresses AddressFunc
  48. stopChan chan struct{}
  49. }
  50. func NewSSHTunneler(sshUser, sshKeyfile string, healthCheckURL *url.URL, installSSHKey InstallSSHKey) Tunneler {
  51. return &SSHTunneler{
  52. SSHUser: sshUser,
  53. SSHKeyfile: sshKeyfile,
  54. InstallSSHKey: installSSHKey,
  55. HealthCheckURL: healthCheckURL,
  56. clock: clock.RealClock{},
  57. }
  58. }
  59. // Run establishes tunnel loops and returns
  60. func (c *SSHTunneler) Run(getAddresses AddressFunc) {
  61. if c.stopChan != nil {
  62. return
  63. }
  64. c.stopChan = make(chan struct{})
  65. // Save the address getter
  66. if getAddresses != nil {
  67. c.getAddresses = getAddresses
  68. }
  69. // Usernames are capped @ 32
  70. if len(c.SSHUser) > 32 {
  71. glog.Warning("SSH User is too long, truncating to 32 chars")
  72. c.SSHUser = c.SSHUser[0:32]
  73. }
  74. glog.Infof("Setting up proxy: %s %s", c.SSHUser, c.SSHKeyfile)
  75. // public keyfile is written last, so check for that.
  76. publicKeyFile := c.SSHKeyfile + ".pub"
  77. exists, err := util.FileExists(publicKeyFile)
  78. if err != nil {
  79. glog.Errorf("Error detecting if key exists: %v", err)
  80. } else if !exists {
  81. glog.Infof("Key doesn't exist, attempting to create")
  82. if err := generateSSHKey(c.SSHKeyfile, publicKeyFile); err != nil {
  83. glog.Errorf("Failed to create key pair: %v", err)
  84. }
  85. }
  86. c.tunnels = ssh.NewSSHTunnelList(c.SSHUser, c.SSHKeyfile, c.HealthCheckURL, c.stopChan)
  87. // Sync loop to ensure that the SSH key has been installed.
  88. c.lastSSHKeySync = c.clock.Now().Unix()
  89. c.installSSHKeySyncLoop(c.SSHUser, publicKeyFile)
  90. // Sync tunnelList w/ nodes.
  91. c.lastSync = c.clock.Now().Unix()
  92. c.nodesSyncLoop()
  93. }
  94. // Stop gracefully shuts down the tunneler
  95. func (c *SSHTunneler) Stop() {
  96. if c.stopChan != nil {
  97. close(c.stopChan)
  98. c.stopChan = nil
  99. }
  100. }
  101. func (c *SSHTunneler) Dial(net, addr string) (net.Conn, error) {
  102. return c.tunnels.Dial(net, addr)
  103. }
  104. func (c *SSHTunneler) SecondsSinceSync() int64 {
  105. now := c.clock.Now().Unix()
  106. then := atomic.LoadInt64(&c.lastSync)
  107. return now - then
  108. }
  109. func (c *SSHTunneler) SecondsSinceSSHKeySync() int64 {
  110. now := c.clock.Now().Unix()
  111. then := atomic.LoadInt64(&c.lastSSHKeySync)
  112. return now - then
  113. }
  114. func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
  115. go wait.Until(func() {
  116. if c.InstallSSHKey == nil {
  117. glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
  118. return
  119. }
  120. key, err := ssh.ParsePublicKeyFromFile(publicKeyfile)
  121. if err != nil {
  122. glog.Errorf("Failed to load public key: %v", err)
  123. return
  124. }
  125. keyData, err := ssh.EncodeSSHKey(key)
  126. if err != nil {
  127. glog.Errorf("Failed to encode public key: %v", err)
  128. return
  129. }
  130. if err := c.InstallSSHKey(user, keyData); err != nil {
  131. glog.Errorf("Failed to install ssh key: %v", err)
  132. return
  133. }
  134. atomic.StoreInt64(&c.lastSSHKeySync, c.clock.Now().Unix())
  135. }, 5*time.Minute, c.stopChan)
  136. }
  137. // nodesSyncLoop lists nodes ever 15 seconds, calling Update() on the TunnelList
  138. // each time (Update() is a noop if no changes are necessary).
  139. func (c *SSHTunneler) nodesSyncLoop() {
  140. // TODO (cjcullen) make this watch.
  141. go wait.Until(func() {
  142. addrs, err := c.getAddresses()
  143. glog.Infof("Calling update w/ addrs: %v", addrs)
  144. if err != nil {
  145. glog.Errorf("Failed to getAddresses: %v", err)
  146. }
  147. c.tunnels.Update(addrs)
  148. atomic.StoreInt64(&c.lastSync, c.clock.Now().Unix())
  149. }, 15*time.Second, c.stopChan)
  150. }
  151. func generateSSHKey(privateKeyfile, publicKeyfile string) error {
  152. private, public, err := ssh.GenerateKey(2048)
  153. if err != nil {
  154. return err
  155. }
  156. // If private keyfile already exists, we must have only made it halfway
  157. // through last time, so delete it.
  158. exists, err := util.FileExists(privateKeyfile)
  159. if err != nil {
  160. glog.Errorf("Error detecting if private key exists: %v", err)
  161. } else if exists {
  162. glog.Infof("Private key exists, but public key does not")
  163. if err := os.Remove(privateKeyfile); err != nil {
  164. glog.Errorf("Failed to remove stale private key: %v", err)
  165. }
  166. }
  167. if err := ioutil.WriteFile(privateKeyfile, ssh.EncodePrivateKey(private), 0600); err != nil {
  168. return err
  169. }
  170. publicKeyBytes, err := ssh.EncodePublicKey(public)
  171. if err != nil {
  172. return err
  173. }
  174. if err := ioutil.WriteFile(publicKeyfile+".tmp", publicKeyBytes, 0600); err != nil {
  175. return err
  176. }
  177. return os.Rename(publicKeyfile+".tmp", publicKeyfile)
  178. }