proxier.go 41 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "fmt"
  16. "net"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/golang/glog"
  23. "k8s.io/kubernetes/pkg/api"
  24. "k8s.io/kubernetes/pkg/proxy"
  25. "k8s.io/kubernetes/pkg/types"
  26. utilnet "k8s.io/kubernetes/pkg/util/net"
  27. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  28. "k8s.io/kubernetes/pkg/util/iptables"
  29. "k8s.io/kubernetes/pkg/util/runtime"
  30. )
  31. type portal struct {
  32. ip net.IP
  33. port int
  34. isExternal bool
  35. }
  36. type serviceInfo struct {
  37. isAliveAtomic int32 // Only access this with atomic ops
  38. portal portal
  39. protocol api.Protocol
  40. proxyPort int
  41. socket proxySocket
  42. timeout time.Duration
  43. activeClients *clientCache
  44. nodePort int
  45. loadBalancerStatus api.LoadBalancerStatus
  46. sessionAffinityType api.ServiceAffinity
  47. stickyMaxAgeMinutes int
  48. // Deprecated, but required for back-compat (including e2e)
  49. externalIPs []string
  50. }
  51. func (info *serviceInfo) setAlive(b bool) {
  52. var i int32
  53. if b {
  54. i = 1
  55. }
  56. atomic.StoreInt32(&info.isAliveAtomic, i)
  57. }
  58. func (info *serviceInfo) isAlive() bool {
  59. return atomic.LoadInt32(&info.isAliveAtomic) != 0
  60. }
  61. func logTimeout(err error) bool {
  62. if e, ok := err.(net.Error); ok {
  63. if e.Timeout() {
  64. glog.V(3).Infof("connection to endpoint closed due to inactivity")
  65. return true
  66. }
  67. }
  68. return false
  69. }
  70. // Proxier is a simple proxy for TCP connections between a localhost:lport
  71. // and services that provide the actual implementations.
  72. type Proxier struct {
  73. loadBalancer LoadBalancer
  74. mu sync.Mutex // protects serviceMap
  75. serviceMap map[proxy.ServicePortName]*serviceInfo
  76. syncPeriod time.Duration
  77. udpIdleTimeout time.Duration
  78. portMapMutex sync.Mutex
  79. portMap map[portMapKey]*portMapValue
  80. numProxyLoops int32 // use atomic ops to access this; mostly for testing
  81. listenIP net.IP
  82. iptables iptables.Interface
  83. hostIP net.IP
  84. proxyPorts PortAllocator
  85. }
  86. // assert Proxier is a ProxyProvider
  87. var _ proxy.ProxyProvider = &Proxier{}
  88. // A key for the portMap. The ip has to be a string because slices can't be map
  89. // keys.
  90. type portMapKey struct {
  91. ip string
  92. port int
  93. protocol api.Protocol
  94. }
  95. func (k *portMapKey) String() string {
  96. return fmt.Sprintf("%s:%d/%s", k.ip, k.port, k.protocol)
  97. }
  98. // A value for the portMap
  99. type portMapValue struct {
  100. owner proxy.ServicePortName
  101. socket interface {
  102. Close() error
  103. }
  104. }
  105. var (
  106. // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
  107. // the loopback address. May be checked for by callers of NewProxier to know whether
  108. // the caller provided invalid input.
  109. ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
  110. )
  111. // IsProxyLocked returns true if the proxy could not acquire the lock on iptables.
  112. func IsProxyLocked(err error) bool {
  113. return strings.Contains(err.Error(), "holding the xtables lock")
  114. }
  115. // NewProxier returns a new Proxier given a LoadBalancer and an address on
  116. // which to listen. Because of the iptables logic, It is assumed that there
  117. // is only a single Proxier active on a machine. An error will be returned if
  118. // the proxier cannot be started due to an invalid ListenIP (loopback) or
  119. // if iptables fails to update or acquire the initial lock. Once a proxier is
  120. // created, it will keep iptables up to date in the background and will not
  121. // terminate if a particular iptables call fails.
  122. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  123. if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
  124. return nil, ErrProxyOnLocalhost
  125. }
  126. hostIP, err := utilnet.ChooseHostInterface()
  127. if err != nil {
  128. return nil, fmt.Errorf("failed to select a host interface: %v", err)
  129. }
  130. err = setRLimit(64 * 1000)
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to set open file handler limit: %v", err)
  133. }
  134. proxyPorts := newPortAllocator(pr)
  135. glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
  136. return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout)
  137. }
  138. func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
  139. // convenient to pass nil for tests..
  140. if proxyPorts == nil {
  141. proxyPorts = newPortAllocator(utilnet.PortRange{})
  142. }
  143. // Set up the iptables foundations we need.
  144. if err := iptablesInit(iptables); err != nil {
  145. return nil, fmt.Errorf("failed to initialize iptables: %v", err)
  146. }
  147. // Flush old iptables rules (since the bound ports will be invalid after a restart).
  148. // When OnUpdate() is first called, the rules will be recreated.
  149. if err := iptablesFlush(iptables); err != nil {
  150. return nil, fmt.Errorf("failed to flush iptables: %v", err)
  151. }
  152. return &Proxier{
  153. loadBalancer: loadBalancer,
  154. serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
  155. portMap: make(map[portMapKey]*portMapValue),
  156. syncPeriod: syncPeriod,
  157. udpIdleTimeout: udpIdleTimeout,
  158. listenIP: listenIP,
  159. iptables: iptables,
  160. hostIP: hostIP,
  161. proxyPorts: proxyPorts,
  162. }, nil
  163. }
  164. // CleanupLeftovers removes all iptables rules and chains created by the Proxier
  165. // It returns true if an error was encountered. Errors are logged.
  166. func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
  167. // NOTE: Warning, this needs to be kept in sync with the userspace Proxier,
  168. // we want to ensure we remove all of the iptables rules it creates.
  169. // Currently they are all in iptablesInit()
  170. // Delete Rules first, then Flush and Delete Chains
  171. args := []string{"-m", "comment", "--comment", "handle ClusterIPs; NOTE: this must be before the NodePort rules"}
  172. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
  173. if !iptables.IsNotFoundError(err) {
  174. glog.Errorf("Error removing userspace rule: %v", err)
  175. encounteredError = true
  176. }
  177. }
  178. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
  179. if !iptables.IsNotFoundError(err) {
  180. glog.Errorf("Error removing userspace rule: %v", err)
  181. encounteredError = true
  182. }
  183. }
  184. args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
  185. args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
  186. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
  187. if !iptables.IsNotFoundError(err) {
  188. glog.Errorf("Error removing userspace rule: %v", err)
  189. encounteredError = true
  190. }
  191. }
  192. if err := ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
  193. if !iptables.IsNotFoundError(err) {
  194. glog.Errorf("Error removing userspace rule: %v", err)
  195. encounteredError = true
  196. }
  197. }
  198. args = []string{"-m", "comment", "--comment", "Ensure that non-local NodePort traffic can flow"}
  199. if err := ipt.DeleteRule(iptables.TableFilter, iptables.ChainInput, append(args, "-j", string(iptablesNonLocalNodePortChain))...); err != nil {
  200. if !iptables.IsNotFoundError(err) {
  201. glog.Errorf("Error removing userspace rule: %v", err)
  202. encounteredError = true
  203. }
  204. }
  205. // flush and delete chains.
  206. tableChains := map[iptables.Table][]iptables.Chain{
  207. iptables.TableNAT: {iptablesContainerPortalChain, iptablesHostPortalChain, iptablesHostNodePortChain, iptablesContainerNodePortChain},
  208. iptables.TableFilter: {iptablesNonLocalNodePortChain},
  209. }
  210. for table, chains := range tableChains {
  211. for _, c := range chains {
  212. // flush chain, then if successful delete, delete will fail if flush fails.
  213. if err := ipt.FlushChain(table, c); err != nil {
  214. if !iptables.IsNotFoundError(err) {
  215. glog.Errorf("Error flushing userspace chain: %v", err)
  216. encounteredError = true
  217. }
  218. } else {
  219. if err = ipt.DeleteChain(table, c); err != nil {
  220. if !iptables.IsNotFoundError(err) {
  221. glog.Errorf("Error deleting userspace chain: %v", err)
  222. encounteredError = true
  223. }
  224. }
  225. }
  226. }
  227. }
  228. return encounteredError
  229. }
  230. // Sync is called to immediately synchronize the proxier state to iptables
  231. func (proxier *Proxier) Sync() {
  232. if err := iptablesInit(proxier.iptables); err != nil {
  233. glog.Errorf("Failed to ensure iptables: %v", err)
  234. }
  235. proxier.ensurePortals()
  236. proxier.cleanupStaleStickySessions()
  237. }
  238. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
  239. func (proxier *Proxier) SyncLoop() {
  240. t := time.NewTicker(proxier.syncPeriod)
  241. defer t.Stop()
  242. for {
  243. <-t.C
  244. glog.V(6).Infof("Periodic sync")
  245. proxier.Sync()
  246. }
  247. }
  248. // Ensure that portals exist for all services.
  249. func (proxier *Proxier) ensurePortals() {
  250. proxier.mu.Lock()
  251. defer proxier.mu.Unlock()
  252. // NB: This does not remove rules that should not be present.
  253. for name, info := range proxier.serviceMap {
  254. err := proxier.openPortal(name, info)
  255. if err != nil {
  256. glog.Errorf("Failed to ensure portal for %q: %v", name, err)
  257. }
  258. }
  259. }
  260. // clean up any stale sticky session records in the hash map.
  261. func (proxier *Proxier) cleanupStaleStickySessions() {
  262. proxier.mu.Lock()
  263. defer proxier.mu.Unlock()
  264. for name := range proxier.serviceMap {
  265. proxier.loadBalancer.CleanupStaleStickySessions(name)
  266. }
  267. }
  268. // This assumes proxier.mu is not locked.
  269. func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error {
  270. proxier.mu.Lock()
  271. defer proxier.mu.Unlock()
  272. return proxier.stopProxyInternal(service, info)
  273. }
  274. // This assumes proxier.mu is locked.
  275. func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
  276. delete(proxier.serviceMap, service)
  277. info.setAlive(false)
  278. err := info.socket.Close()
  279. port := info.socket.ListenPort()
  280. proxier.proxyPorts.Release(port)
  281. return err
  282. }
  283. func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) {
  284. proxier.mu.Lock()
  285. defer proxier.mu.Unlock()
  286. info, ok := proxier.serviceMap[service]
  287. return info, ok
  288. }
  289. func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) {
  290. proxier.mu.Lock()
  291. defer proxier.mu.Unlock()
  292. proxier.serviceMap[service] = info
  293. }
  294. // addServiceOnPort starts listening for a new service, returning the serviceInfo.
  295. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
  296. // connections, for now.
  297. func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
  298. sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
  299. if err != nil {
  300. return nil, err
  301. }
  302. _, portStr, err := net.SplitHostPort(sock.Addr().String())
  303. if err != nil {
  304. sock.Close()
  305. return nil, err
  306. }
  307. portNum, err := strconv.Atoi(portStr)
  308. if err != nil {
  309. sock.Close()
  310. return nil, err
  311. }
  312. si := &serviceInfo{
  313. isAliveAtomic: 1,
  314. proxyPort: portNum,
  315. protocol: protocol,
  316. socket: sock,
  317. timeout: timeout,
  318. activeClients: newClientCache(),
  319. sessionAffinityType: api.ServiceAffinityNone, // default
  320. stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
  321. }
  322. proxier.setServiceInfo(service, si)
  323. glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
  324. go func(service proxy.ServicePortName, proxier *Proxier) {
  325. defer runtime.HandleCrash()
  326. atomic.AddInt32(&proxier.numProxyLoops, 1)
  327. sock.ProxyLoop(service, si, proxier)
  328. atomic.AddInt32(&proxier.numProxyLoops, -1)
  329. }(service, proxier)
  330. return si, nil
  331. }
  332. // OnServiceUpdate manages the active set of service proxies.
  333. // Active service proxies are reinitialized if found in the update set or
  334. // shutdown if missing from the update set.
  335. func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
  336. glog.V(4).Infof("Received update notice: %+v", services)
  337. activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
  338. for i := range services {
  339. service := &services[i]
  340. // if ClusterIP is "None" or empty, skip proxying
  341. if !api.IsServiceIPSet(service) {
  342. glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
  343. continue
  344. }
  345. for i := range service.Spec.Ports {
  346. servicePort := &service.Spec.Ports[i]
  347. serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
  348. activeServices[serviceName] = true
  349. serviceIP := net.ParseIP(service.Spec.ClusterIP)
  350. info, exists := proxier.getServiceInfo(serviceName)
  351. // TODO: check health of the socket? What if ProxyLoop exited?
  352. if exists && sameConfig(info, service, servicePort) {
  353. // Nothing changed.
  354. continue
  355. }
  356. if exists {
  357. glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
  358. err := proxier.closePortal(serviceName, info)
  359. if err != nil {
  360. glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
  361. }
  362. err = proxier.stopProxy(serviceName, info)
  363. if err != nil {
  364. glog.Errorf("Failed to stop service %q: %v", serviceName, err)
  365. }
  366. }
  367. proxyPort, err := proxier.proxyPorts.AllocateNext()
  368. if err != nil {
  369. glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
  370. continue
  371. }
  372. glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
  373. info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
  374. if err != nil {
  375. glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
  376. continue
  377. }
  378. info.portal.ip = serviceIP
  379. info.portal.port = int(servicePort.Port)
  380. info.externalIPs = service.Spec.ExternalIPs
  381. // Deep-copy in case the service instance changes
  382. info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
  383. info.nodePort = int(servicePort.NodePort)
  384. info.sessionAffinityType = service.Spec.SessionAffinity
  385. glog.V(4).Infof("info: %#v", info)
  386. err = proxier.openPortal(serviceName, info)
  387. if err != nil {
  388. glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
  389. }
  390. proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
  391. }
  392. }
  393. proxier.mu.Lock()
  394. defer proxier.mu.Unlock()
  395. for name, info := range proxier.serviceMap {
  396. if !activeServices[name] {
  397. glog.V(1).Infof("Stopping service %q", name)
  398. err := proxier.closePortal(name, info)
  399. if err != nil {
  400. glog.Errorf("Failed to close portal for %q: %v", name, err)
  401. }
  402. err = proxier.stopProxyInternal(name, info)
  403. if err != nil {
  404. glog.Errorf("Failed to stop service %q: %v", name, err)
  405. }
  406. proxier.loadBalancer.DeleteService(name)
  407. }
  408. }
  409. }
  410. func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
  411. if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
  412. return false
  413. }
  414. if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) {
  415. return false
  416. }
  417. if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
  418. return false
  419. }
  420. if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
  421. return false
  422. }
  423. if info.sessionAffinityType != service.Spec.SessionAffinity {
  424. return false
  425. }
  426. return true
  427. }
  428. func ipsEqual(lhs, rhs []string) bool {
  429. if len(lhs) != len(rhs) {
  430. return false
  431. }
  432. for i := range lhs {
  433. if lhs[i] != rhs[i] {
  434. return false
  435. }
  436. }
  437. return true
  438. }
  439. func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {
  440. err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
  441. if err != nil {
  442. return err
  443. }
  444. for _, publicIP := range info.externalIPs {
  445. err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)
  446. if err != nil {
  447. return err
  448. }
  449. }
  450. for _, ingress := range info.loadBalancerStatus.Ingress {
  451. if ingress.IP != "" {
  452. err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)
  453. if err != nil {
  454. return err
  455. }
  456. }
  457. }
  458. if info.nodePort != 0 {
  459. err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)
  460. if err != nil {
  461. return err
  462. }
  463. }
  464. return nil
  465. }
  466. func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
  467. if local, err := isLocalIP(portal.ip); err != nil {
  468. return fmt.Errorf("can't determine if IP is local, assuming not: %v", err)
  469. } else if local {
  470. err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)
  471. if err != nil {
  472. return err
  473. }
  474. }
  475. // Handle traffic from containers.
  476. args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
  477. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
  478. if err != nil {
  479. glog.Errorf("Failed to install iptables %s rule for service %q, args:%v", iptablesContainerPortalChain, name, args)
  480. return err
  481. }
  482. if !existed {
  483. glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
  484. }
  485. if portal.isExternal {
  486. args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name)
  487. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
  488. if err != nil {
  489. glog.Errorf("Failed to install iptables %s rule that opens service %q for local traffic, args:%v", iptablesContainerPortalChain, name, args)
  490. return err
  491. }
  492. if !existed {
  493. glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d for local traffic", name, protocol, portal.ip, portal.port)
  494. }
  495. args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)
  496. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
  497. if err != nil {
  498. glog.Errorf("Failed to install iptables %s rule for service %q for dst-local traffic", iptablesHostPortalChain, name)
  499. return err
  500. }
  501. if !existed {
  502. glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d for dst-local traffic", name, protocol, portal.ip, portal.port)
  503. }
  504. return nil
  505. }
  506. // Handle traffic from the host.
  507. args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)
  508. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostPortalChain, args...)
  509. if err != nil {
  510. glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
  511. return err
  512. }
  513. if !existed {
  514. glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portal.ip, portal.port)
  515. }
  516. return nil
  517. }
  518. // Marks a port as being owned by a particular service, or returns error if already claimed.
  519. // Idempotent: reclaiming with the same owner is not an error
  520. func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
  521. proxier.portMapMutex.Lock()
  522. defer proxier.portMapMutex.Unlock()
  523. // TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
  524. key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
  525. existing, found := proxier.portMap[key]
  526. if !found {
  527. // Hold the actual port open, even though we use iptables to redirect
  528. // it. This ensures that a) it's safe to take and b) that stays true.
  529. // NOTE: We should not need to have a real listen()ing socket - bind()
  530. // should be enough, but I can't figure out a way to e2e test without
  531. // it. Tools like 'ss' and 'netstat' do not show sockets that are
  532. // bind()ed but not listen()ed, and at least the default debian netcat
  533. // has no way to avoid about 10 seconds of retries.
  534. socket, err := newProxySocket(protocol, ip, port)
  535. if err != nil {
  536. return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
  537. }
  538. proxier.portMap[key] = &portMapValue{owner: owner, socket: socket}
  539. glog.V(2).Infof("Claimed local port %s", key.String())
  540. return nil
  541. }
  542. if existing.owner == owner {
  543. // We are idempotent
  544. return nil
  545. }
  546. return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
  547. }
  548. // Release a claim on a port. Returns an error if the owner does not match the claim.
  549. // Tolerates release on an unclaimed port, to simplify .
  550. func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
  551. proxier.portMapMutex.Lock()
  552. defer proxier.portMapMutex.Unlock()
  553. key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
  554. existing, found := proxier.portMap[key]
  555. if !found {
  556. // We tolerate this, it happens if we are cleaning up a failed allocation
  557. glog.Infof("Ignoring release on unowned port: %v", key)
  558. return nil
  559. }
  560. if existing.owner != owner {
  561. return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
  562. }
  563. delete(proxier.portMap, key)
  564. existing.socket.Close()
  565. return nil
  566. }
  567. func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
  568. // TODO: Do we want to allow containers to access public services? Probably yes.
  569. // TODO: We could refactor this to be the same code as portal, but with IP == nil
  570. err := proxier.claimNodePort(nil, nodePort, protocol, name)
  571. if err != nil {
  572. return err
  573. }
  574. // Handle traffic from containers.
  575. args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  576. existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerNodePortChain, args...)
  577. if err != nil {
  578. glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerNodePortChain, name)
  579. return err
  580. }
  581. if !existed {
  582. glog.Infof("Opened iptables from-containers public port for service %q on %s port %d", name, protocol, nodePort)
  583. }
  584. // Handle traffic from the host.
  585. args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  586. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesHostNodePortChain, args...)
  587. if err != nil {
  588. glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostNodePortChain, name)
  589. return err
  590. }
  591. if !existed {
  592. glog.Infof("Opened iptables from-host public port for service %q on %s port %d", name, protocol, nodePort)
  593. }
  594. args = proxier.iptablesNonLocalNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  595. existed, err = proxier.iptables.EnsureRule(iptables.Append, iptables.TableFilter, iptablesNonLocalNodePortChain, args...)
  596. if err != nil {
  597. glog.Errorf("Failed to install iptables %s rule for service %q", iptablesNonLocalNodePortChain, name)
  598. return err
  599. }
  600. if !existed {
  601. glog.Infof("Opened iptables from-non-local public port for service %q on %s port %d", name, protocol, nodePort)
  602. }
  603. return nil
  604. }
  605. func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error {
  606. // Collect errors and report them all at the end.
  607. el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
  608. for _, publicIP := range info.externalIPs {
  609. el = append(el, proxier.closeOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  610. }
  611. for _, ingress := range info.loadBalancerStatus.Ingress {
  612. if ingress.IP != "" {
  613. el = append(el, proxier.closeOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  614. }
  615. }
  616. if info.nodePort != 0 {
  617. el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
  618. }
  619. if len(el) == 0 {
  620. glog.V(3).Infof("Closed iptables portals for service %q", service)
  621. } else {
  622. glog.Errorf("Some errors closing iptables portals for service %q", service)
  623. }
  624. return utilerrors.NewAggregate(el)
  625. }
  626. func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
  627. el := []error{}
  628. if local, err := isLocalIP(portal.ip); err != nil {
  629. el = append(el, fmt.Errorf("can't determine if IP is local, assuming not: %v", err))
  630. } else if local {
  631. if err := proxier.releaseNodePort(portal.ip, portal.port, protocol, name); err != nil {
  632. el = append(el, err)
  633. }
  634. }
  635. // Handle traffic from containers.
  636. args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
  637. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
  638. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerPortalChain, name)
  639. el = append(el, err)
  640. }
  641. if portal.isExternal {
  642. args := proxier.iptablesContainerPortalArgs(portal.ip, false, true, portal.port, protocol, proxyIP, proxyPort, name)
  643. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
  644. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerPortalChain, name)
  645. el = append(el, err)
  646. }
  647. args = proxier.iptablesHostPortalArgs(portal.ip, true, portal.port, protocol, proxyIP, proxyPort, name)
  648. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostPortalChain, args...); err != nil {
  649. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostPortalChain, name)
  650. el = append(el, err)
  651. }
  652. return el
  653. }
  654. // Handle traffic from the host (portalIP is not external).
  655. args = proxier.iptablesHostPortalArgs(portal.ip, false, portal.port, protocol, proxyIP, proxyPort, name)
  656. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostPortalChain, args...); err != nil {
  657. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostPortalChain, name)
  658. el = append(el, err)
  659. }
  660. return el
  661. }
  662. func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
  663. el := []error{}
  664. // Handle traffic from containers.
  665. args := proxier.iptablesContainerNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  666. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerNodePortChain, args...); err != nil {
  667. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerNodePortChain, name)
  668. el = append(el, err)
  669. }
  670. // Handle traffic from the host.
  671. args = proxier.iptablesHostNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  672. if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostNodePortChain, args...); err != nil {
  673. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostNodePortChain, name)
  674. el = append(el, err)
  675. }
  676. // Handle traffic not local to the host
  677. args = proxier.iptablesNonLocalNodePortArgs(nodePort, protocol, proxyIP, proxyPort, name)
  678. if err := proxier.iptables.DeleteRule(iptables.TableFilter, iptablesNonLocalNodePortChain, args...); err != nil {
  679. glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesNonLocalNodePortChain, name)
  680. el = append(el, err)
  681. }
  682. if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil {
  683. el = append(el, err)
  684. }
  685. return el
  686. }
  687. func isLocalIP(ip net.IP) (bool, error) {
  688. addrs, err := net.InterfaceAddrs()
  689. if err != nil {
  690. return false, err
  691. }
  692. for i := range addrs {
  693. intf, _, err := net.ParseCIDR(addrs[i].String())
  694. if err != nil {
  695. return false, err
  696. }
  697. if ip.Equal(intf) {
  698. return true, nil
  699. }
  700. }
  701. return false, nil
  702. }
  703. // See comments in the *PortalArgs() functions for some details about why we
  704. // use two chains for portals.
  705. var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
  706. var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST"
  707. // Chains for NodePort services
  708. var iptablesContainerNodePortChain iptables.Chain = "KUBE-NODEPORT-CONTAINER"
  709. var iptablesHostNodePortChain iptables.Chain = "KUBE-NODEPORT-HOST"
  710. var iptablesNonLocalNodePortChain iptables.Chain = "KUBE-NODEPORT-NON-LOCAL"
  711. // Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
  712. func iptablesInit(ipt iptables.Interface) error {
  713. // TODO: There is almost certainly room for optimization here. E.g. If
  714. // we knew the service-cluster-ip-range CIDR we could fast-track outbound packets not
  715. // destined for a service. There's probably more, help wanted.
  716. // Danger - order of these rules matters here:
  717. //
  718. // We match portal rules first, then NodePort rules. For NodePort rules, we filter primarily on --dst-type LOCAL,
  719. // because we want to listen on all local addresses, but don't match internet traffic with the same dst port number.
  720. //
  721. // There is one complication (per thockin):
  722. // -m addrtype --dst-type LOCAL is what we want except that it is broken (by intent without foresight to our usecase)
  723. // on at least GCE. Specifically, GCE machines have a daemon which learns what external IPs are forwarded to that
  724. // machine, and configure a local route for that IP, making a match for --dst-type LOCAL when we don't want it to.
  725. // Removing the route gives correct behavior until the daemon recreates it.
  726. // Killing the daemon is an option, but means that any non-kubernetes use of the machine with external IP will be broken.
  727. //
  728. // This applies to IPs on GCE that are actually from a load-balancer; they will be categorized as LOCAL.
  729. // _If_ the chains were in the wrong order, and the LB traffic had dst-port == a NodePort on some other service,
  730. // the NodePort would take priority (incorrectly).
  731. // This is unlikely (and would only affect outgoing traffic from the cluster to the load balancer, which seems
  732. // doubly-unlikely), but we need to be careful to keep the rules in the right order.
  733. args := []string{ /* service-cluster-ip-range matching could go here */ }
  734. args = append(args, "-m", "comment", "--comment", "handle ClusterIPs; NOTE: this must be before the NodePort rules")
  735. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
  736. return err
  737. }
  738. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerPortalChain))...); err != nil {
  739. return err
  740. }
  741. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
  742. return err
  743. }
  744. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostPortalChain))...); err != nil {
  745. return err
  746. }
  747. // This set of rules matches broadly (addrtype & destination port), and therefore must come after the portal rules
  748. args = []string{"-m", "addrtype", "--dst-type", "LOCAL"}
  749. args = append(args, "-m", "comment", "--comment", "handle service NodePorts; NOTE: this must be the last rule in the chain")
  750. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
  751. return err
  752. }
  753. if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainPrerouting, append(args, "-j", string(iptablesContainerNodePortChain))...); err != nil {
  754. return err
  755. }
  756. if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
  757. return err
  758. }
  759. if _, err := ipt.EnsureRule(iptables.Append, iptables.TableNAT, iptables.ChainOutput, append(args, "-j", string(iptablesHostNodePortChain))...); err != nil {
  760. return err
  761. }
  762. // Create a chain intended to explicitly allow non-local NodePort
  763. // traffic to work around default-deny iptables configurations
  764. // that would otherwise reject such traffic.
  765. args = []string{"-m", "comment", "--comment", "Ensure that non-local NodePort traffic can flow"}
  766. if _, err := ipt.EnsureChain(iptables.TableFilter, iptablesNonLocalNodePortChain); err != nil {
  767. return err
  768. }
  769. if _, err := ipt.EnsureRule(iptables.Prepend, iptables.TableFilter, iptables.ChainInput, append(args, "-j", string(iptablesNonLocalNodePortChain))...); err != nil {
  770. return err
  771. }
  772. // TODO: Verify order of rules.
  773. return nil
  774. }
  775. // Flush all of our custom iptables rules.
  776. func iptablesFlush(ipt iptables.Interface) error {
  777. el := []error{}
  778. if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
  779. el = append(el, err)
  780. }
  781. if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
  782. el = append(el, err)
  783. }
  784. if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerNodePortChain); err != nil {
  785. el = append(el, err)
  786. }
  787. if err := ipt.FlushChain(iptables.TableNAT, iptablesHostNodePortChain); err != nil {
  788. el = append(el, err)
  789. }
  790. if err := ipt.FlushChain(iptables.TableFilter, iptablesNonLocalNodePortChain); err != nil {
  791. el = append(el, err)
  792. }
  793. if len(el) != 0 {
  794. glog.Errorf("Some errors flushing old iptables portals: %v", el)
  795. }
  796. return utilerrors.NewAggregate(el)
  797. }
  798. // Used below.
  799. var zeroIPv4 = net.ParseIP("0.0.0.0")
  800. var localhostIPv4 = net.ParseIP("127.0.0.1")
  801. var zeroIPv6 = net.ParseIP("::0")
  802. var localhostIPv6 = net.ParseIP("::1")
  803. // Build a slice of iptables args that are common to from-container and from-host portal rules.
  804. func iptablesCommonPortalArgs(destIP net.IP, addPhysicalInterfaceMatch bool, addDstLocalMatch bool, destPort int, protocol api.Protocol, service proxy.ServicePortName) []string {
  805. // This list needs to include all fields as they are eventually spit out
  806. // by iptables-save. This is because some systems do not support the
  807. // 'iptables -C' arg, and so fall back on parsing iptables-save output.
  808. // If this does not match, it will not pass the check. For example:
  809. // adding the /32 on the destination IP arg is not strictly required,
  810. // but causes this list to not match the final iptables-save output.
  811. // This is fragile and I hope one day we can stop supporting such old
  812. // iptables versions.
  813. args := []string{
  814. "-m", "comment",
  815. "--comment", service.String(),
  816. "-p", strings.ToLower(string(protocol)),
  817. "-m", strings.ToLower(string(protocol)),
  818. "--dport", fmt.Sprintf("%d", destPort),
  819. }
  820. if destIP != nil {
  821. args = append(args, "-d", fmt.Sprintf("%s/32", destIP.String()))
  822. }
  823. if addPhysicalInterfaceMatch {
  824. args = append(args, "-m", "physdev", "!", "--physdev-is-in")
  825. }
  826. if addDstLocalMatch {
  827. args = append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  828. }
  829. return args
  830. }
  831. // Build a slice of iptables args for a from-container portal rule.
  832. func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, addPhysicalInterfaceMatch bool, addDstLocalMatch bool, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  833. args := iptablesCommonPortalArgs(destIP, addPhysicalInterfaceMatch, addDstLocalMatch, destPort, protocol, service)
  834. // This is tricky.
  835. //
  836. // If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
  837. // interface") we want to use REDIRECT, which sends traffic to the
  838. // "primary address of the incoming interface" which means the container
  839. // bridge, if there is one. When the response comes, it comes from that
  840. // same interface, so the NAT matches and the response packet is
  841. // correct. This matters for UDP, since there is no per-connection port
  842. // number.
  843. //
  844. // The alternative would be to use DNAT, except that it doesn't work
  845. // (empirically):
  846. // * DNAT to 127.0.0.1 = Packets just disappear - this seems to be a
  847. // well-known limitation of iptables.
  848. // * DNAT to eth0's IP = Response packets come from the bridge, which
  849. // breaks the NAT, and makes things like DNS not accept them. If
  850. // this could be resolved, it would simplify all of this code.
  851. //
  852. // If the proxy is bound to a specific IP, then we have to use DNAT to
  853. // that IP. Unlike the previous case, this works because the proxy is
  854. // ONLY listening on that IP, not the bridge.
  855. //
  856. // Why would anyone bind to an address that is not inclusive of
  857. // localhost? Apparently some cloud environments have their public IP
  858. // exposed as a real network interface AND do not have firewalling. We
  859. // don't want to expose everything out to the world.
  860. //
  861. // Unfortunately, I don't know of any way to listen on some (N > 1)
  862. // interfaces but not ALL interfaces, short of doing it manually, and
  863. // this is simpler than that.
  864. //
  865. // If the proxy is bound to localhost only, all of this is broken. Not
  866. // allowed.
  867. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  868. // TODO: Can we REDIRECT with IPv6?
  869. args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
  870. } else {
  871. // TODO: Can we DNAT with IPv6?
  872. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  873. }
  874. return args
  875. }
  876. // Build a slice of iptables args for a from-host portal rule.
  877. func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, addDstLocalMatch bool, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  878. args := iptablesCommonPortalArgs(destIP, false, addDstLocalMatch, destPort, protocol, service)
  879. // This is tricky.
  880. //
  881. // If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
  882. // interface") we want to do the same as from-container traffic and use
  883. // REDIRECT. Except that it doesn't work (empirically). REDIRECT on
  884. // local packets sends the traffic to localhost (special case, but it is
  885. // documented) but the response comes from the eth0 IP (not sure why,
  886. // truthfully), which makes DNS unhappy.
  887. //
  888. // So we have to use DNAT. DNAT to 127.0.0.1 can't work for the same
  889. // reason.
  890. //
  891. // So we do our best to find an interface that is not a loopback and
  892. // DNAT to that. This works (again, empirically).
  893. //
  894. // If the proxy is bound to a specific IP, then we have to use DNAT to
  895. // that IP. Unlike the previous case, this works because the proxy is
  896. // ONLY listening on that IP, not the bridge.
  897. //
  898. // If the proxy is bound to localhost only, this should work, but we
  899. // don't allow it for now.
  900. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  901. proxyIP = proxier.hostIP
  902. }
  903. // TODO: Can we DNAT with IPv6?
  904. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  905. return args
  906. }
  907. // Build a slice of iptables args for a from-container public-port rule.
  908. // See iptablesContainerPortalArgs
  909. // TODO: Should we just reuse iptablesContainerPortalArgs?
  910. func (proxier *Proxier) iptablesContainerNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  911. args := iptablesCommonPortalArgs(nil, false, false, nodePort, protocol, service)
  912. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  913. // TODO: Can we REDIRECT with IPv6?
  914. args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
  915. } else {
  916. // TODO: Can we DNAT with IPv6?
  917. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  918. }
  919. return args
  920. }
  921. // Build a slice of iptables args for a from-host public-port rule.
  922. // See iptablesHostPortalArgs
  923. // TODO: Should we just reuse iptablesHostPortalArgs?
  924. func (proxier *Proxier) iptablesHostNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  925. args := iptablesCommonPortalArgs(nil, false, false, nodePort, protocol, service)
  926. if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
  927. proxyIP = proxier.hostIP
  928. }
  929. // TODO: Can we DNAT with IPv6?
  930. args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
  931. return args
  932. }
  933. // Build a slice of iptables args for an from-non-local public-port rule.
  934. func (proxier *Proxier) iptablesNonLocalNodePortArgs(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string {
  935. args := iptablesCommonPortalArgs(nil, false, false, proxyPort, protocol, service)
  936. args = append(args, "-m", "comment", "--comment", service.String(), "-m", "state", "--state", "NEW", "-j", "ACCEPT")
  937. return args
  938. }
  939. func isTooManyFDsError(err error) bool {
  940. return strings.Contains(err.Error(), "too many open files")
  941. }
  942. func isClosedError(err error) bool {
  943. // A brief discussion about handling closed error here:
  944. // https://code.google.com/p/go/issues/detail?id=4373#c14
  945. // TODO: maybe create a stoppable TCP listener that returns a StoppedError
  946. return strings.HasSuffix(err.Error(), "use of closed network connection")
  947. }