proxysocket.go 9.1 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "fmt"
  16. "io"
  17. "net"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. "github.com/golang/glog"
  23. "k8s.io/kubernetes/pkg/api"
  24. "k8s.io/kubernetes/pkg/proxy"
  25. "k8s.io/kubernetes/pkg/util/runtime"
  26. )
  27. // Abstraction over TCP/UDP sockets which are proxied.
  28. type proxySocket interface {
  29. // Addr gets the net.Addr for a proxySocket.
  30. Addr() net.Addr
  31. // Close stops the proxySocket from accepting incoming connections.
  32. // Each implementation should comment on the impact of calling Close
  33. // while sessions are active.
  34. Close() error
  35. // ProxyLoop proxies incoming connections for the specified service to the service endpoints.
  36. ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier)
  37. // ListenPort returns the host port that the proxySocket is listening on
  38. ListenPort() int
  39. }
  40. func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
  41. host := ""
  42. if ip != nil {
  43. host = ip.String()
  44. }
  45. switch strings.ToUpper(string(protocol)) {
  46. case "TCP":
  47. listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
  48. if err != nil {
  49. return nil, err
  50. }
  51. return &tcpProxySocket{Listener: listener, port: port}, nil
  52. case "UDP":
  53. addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port)))
  54. if err != nil {
  55. return nil, err
  56. }
  57. conn, err := net.ListenUDP("udp", addr)
  58. if err != nil {
  59. return nil, err
  60. }
  61. return &udpProxySocket{UDPConn: conn, port: port}, nil
  62. }
  63. return nil, fmt.Errorf("unknown protocol %q", protocol)
  64. }
  65. // How long we wait for a connection to a backend in seconds
  66. var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
  67. // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
  68. // no new connections are allowed but existing connections are left untouched.
  69. type tcpProxySocket struct {
  70. net.Listener
  71. port int
  72. }
  73. func (tcp *tcpProxySocket) ListenPort() int {
  74. return tcp.port
  75. }
  76. func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
  77. sessionAffinityReset := false
  78. for _, dialTimeout := range endpointDialTimeout {
  79. endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
  80. if err != nil {
  81. glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
  82. return nil, err
  83. }
  84. glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
  85. // TODO: This could spin up a new goroutine to make the outbound connection,
  86. // and keep accepting inbound traffic.
  87. outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
  88. if err != nil {
  89. if isTooManyFDsError(err) {
  90. panic("Dial failed: " + err.Error())
  91. }
  92. glog.Errorf("Dial failed: %v", err)
  93. sessionAffinityReset = true
  94. continue
  95. }
  96. return outConn, nil
  97. }
  98. return nil, fmt.Errorf("failed to connect to an endpoint.")
  99. }
  100. func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
  101. for {
  102. if !myInfo.isAlive() {
  103. // The service port was closed or replaced.
  104. return
  105. }
  106. // Block until a connection is made.
  107. inConn, err := tcp.Accept()
  108. if err != nil {
  109. if isTooManyFDsError(err) {
  110. panic("Accept failed: " + err.Error())
  111. }
  112. if isClosedError(err) {
  113. return
  114. }
  115. if !myInfo.isAlive() {
  116. // Then the service port was just closed so the accept failure is to be expected.
  117. return
  118. }
  119. glog.Errorf("Accept failed: %v", err)
  120. continue
  121. }
  122. glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
  123. outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
  124. if err != nil {
  125. glog.Errorf("Failed to connect to balancer: %v", err)
  126. inConn.Close()
  127. continue
  128. }
  129. // Spin up an async copy loop.
  130. go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
  131. }
  132. }
  133. // proxyTCP proxies data bi-directionally between in and out.
  134. func proxyTCP(in, out *net.TCPConn) {
  135. var wg sync.WaitGroup
  136. wg.Add(2)
  137. glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
  138. in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
  139. go copyBytes("from backend", in, out, &wg)
  140. go copyBytes("to backend", out, in, &wg)
  141. wg.Wait()
  142. }
  143. func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
  144. defer wg.Done()
  145. glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
  146. n, err := io.Copy(dest, src)
  147. if err != nil {
  148. if !isClosedError(err) {
  149. glog.Errorf("I/O error: %v", err)
  150. }
  151. }
  152. glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
  153. dest.Close()
  154. src.Close()
  155. }
  156. // udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called,
  157. // no new connections are allowed and existing connections are broken.
  158. // TODO: We could lame-duck this ourselves, if it becomes important.
  159. type udpProxySocket struct {
  160. *net.UDPConn
  161. port int
  162. }
  163. func (udp *udpProxySocket) ListenPort() int {
  164. return udp.port
  165. }
  166. func (udp *udpProxySocket) Addr() net.Addr {
  167. return udp.LocalAddr()
  168. }
  169. // Holds all the known UDP clients that have not timed out.
  170. type clientCache struct {
  171. mu sync.Mutex
  172. clients map[string]net.Conn // addr string -> connection
  173. }
  174. func newClientCache() *clientCache {
  175. return &clientCache{clients: map[string]net.Conn{}}
  176. }
  177. func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
  178. var buffer [4096]byte // 4KiB should be enough for most whole-packets
  179. for {
  180. if !myInfo.isAlive() {
  181. // The service port was closed or replaced.
  182. break
  183. }
  184. // Block until data arrives.
  185. // TODO: Accumulate a histogram of n or something, to fine tune the buffer size.
  186. n, cliAddr, err := udp.ReadFrom(buffer[0:])
  187. if err != nil {
  188. if e, ok := err.(net.Error); ok {
  189. if e.Temporary() {
  190. glog.V(1).Infof("ReadFrom had a temporary failure: %v", err)
  191. continue
  192. }
  193. }
  194. glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)
  195. break
  196. }
  197. // If this is a client we know already, reuse the connection and goroutine.
  198. svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout)
  199. if err != nil {
  200. continue
  201. }
  202. // TODO: It would be nice to let the goroutine handle this write, but we don't
  203. // really want to copy the buffer. We could do a pool of buffers or something.
  204. _, err = svrConn.Write(buffer[0:n])
  205. if err != nil {
  206. if !logTimeout(err) {
  207. glog.Errorf("Write failed: %v", err)
  208. // TODO: Maybe tear down the goroutine for this client/server pair?
  209. }
  210. continue
  211. }
  212. err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
  213. if err != nil {
  214. glog.Errorf("SetDeadline failed: %v", err)
  215. continue
  216. }
  217. }
  218. }
  219. func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) {
  220. activeClients.mu.Lock()
  221. defer activeClients.mu.Unlock()
  222. svrConn, found := activeClients.clients[cliAddr.String()]
  223. if !found {
  224. // TODO: This could spin up a new goroutine to make the outbound connection,
  225. // and keep accepting inbound traffic.
  226. glog.V(3).Infof("New UDP connection from %s", cliAddr)
  227. var err error
  228. svrConn, err = tryConnect(service, cliAddr, "udp", proxier)
  229. if err != nil {
  230. return nil, err
  231. }
  232. if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil {
  233. glog.Errorf("SetDeadline failed: %v", err)
  234. return nil, err
  235. }
  236. activeClients.clients[cliAddr.String()] = svrConn
  237. go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
  238. defer runtime.HandleCrash()
  239. udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
  240. }(cliAddr, svrConn, activeClients, timeout)
  241. }
  242. return svrConn, nil
  243. }
  244. // This function is expected to be called as a goroutine.
  245. // TODO: Track and log bytes copied, like TCP
  246. func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
  247. defer svrConn.Close()
  248. var buffer [4096]byte
  249. for {
  250. n, err := svrConn.Read(buffer[0:])
  251. if err != nil {
  252. if !logTimeout(err) {
  253. glog.Errorf("Read failed: %v", err)
  254. }
  255. break
  256. }
  257. err = svrConn.SetDeadline(time.Now().Add(timeout))
  258. if err != nil {
  259. glog.Errorf("SetDeadline failed: %v", err)
  260. break
  261. }
  262. n, err = udp.WriteTo(buffer[0:n], cliAddr)
  263. if err != nil {
  264. if !logTimeout(err) {
  265. glog.Errorf("WriteTo failed: %v", err)
  266. }
  267. break
  268. }
  269. }
  270. activeClients.mu.Lock()
  271. delete(activeClients.clients, cliAddr.String())
  272. activeClients.mu.Unlock()
  273. }