server.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to configure and run a
  14. // Kubernetes app process.
  15. package app
  16. import (
  17. "errors"
  18. "fmt"
  19. "net"
  20. "net/http"
  21. _ "net/http/pprof"
  22. "runtime"
  23. "strconv"
  24. "time"
  25. "k8s.io/kubernetes/cmd/kube-proxy/app/options"
  26. "k8s.io/kubernetes/pkg/api"
  27. "k8s.io/kubernetes/pkg/client/record"
  28. kubeclient "k8s.io/kubernetes/pkg/client/unversioned"
  29. "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
  30. clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
  31. "k8s.io/kubernetes/pkg/proxy"
  32. proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
  33. "k8s.io/kubernetes/pkg/proxy/iptables"
  34. "k8s.io/kubernetes/pkg/proxy/userspace"
  35. "k8s.io/kubernetes/pkg/types"
  36. "k8s.io/kubernetes/pkg/util/configz"
  37. utildbus "k8s.io/kubernetes/pkg/util/dbus"
  38. "k8s.io/kubernetes/pkg/util/exec"
  39. utiliptables "k8s.io/kubernetes/pkg/util/iptables"
  40. utilnet "k8s.io/kubernetes/pkg/util/net"
  41. nodeutil "k8s.io/kubernetes/pkg/util/node"
  42. "k8s.io/kubernetes/pkg/util/oom"
  43. "k8s.io/kubernetes/pkg/util/resourcecontainer"
  44. utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
  45. "k8s.io/kubernetes/pkg/util/wait"
  46. "github.com/golang/glog"
  47. "github.com/spf13/cobra"
  48. "github.com/spf13/pflag"
  49. )
  50. type ProxyServer struct {
  51. Client *kubeclient.Client
  52. Config *options.ProxyServerConfig
  53. IptInterface utiliptables.Interface
  54. Proxier proxy.ProxyProvider
  55. Broadcaster record.EventBroadcaster
  56. Recorder record.EventRecorder
  57. Conntracker Conntracker // if nil, ignored
  58. ProxyMode string
  59. }
  60. const (
  61. proxyModeUserspace = "userspace"
  62. proxyModeIptables = "iptables"
  63. experimentalProxyModeAnnotation = options.ExperimentalProxyModeAnnotation
  64. betaProxyModeAnnotation = "net.beta.kubernetes.io/proxy-mode"
  65. )
  66. func checkKnownProxyMode(proxyMode string) bool {
  67. switch proxyMode {
  68. case "", proxyModeUserspace, proxyModeIptables:
  69. return true
  70. }
  71. return false
  72. }
  73. func NewProxyServer(
  74. client *kubeclient.Client,
  75. config *options.ProxyServerConfig,
  76. iptInterface utiliptables.Interface,
  77. proxier proxy.ProxyProvider,
  78. broadcaster record.EventBroadcaster,
  79. recorder record.EventRecorder,
  80. conntracker Conntracker,
  81. proxyMode string,
  82. ) (*ProxyServer, error) {
  83. return &ProxyServer{
  84. Client: client,
  85. Config: config,
  86. IptInterface: iptInterface,
  87. Proxier: proxier,
  88. Broadcaster: broadcaster,
  89. Recorder: recorder,
  90. Conntracker: conntracker,
  91. ProxyMode: proxyMode,
  92. }, nil
  93. }
  94. // NewProxyCommand creates a *cobra.Command object with default parameters
  95. func NewProxyCommand() *cobra.Command {
  96. s := options.NewProxyConfig()
  97. s.AddFlags(pflag.CommandLine)
  98. cmd := &cobra.Command{
  99. Use: "kube-proxy",
  100. Long: `The Kubernetes network proxy runs on each node. This
  101. reflects services as defined in the Kubernetes API on each node and can do simple
  102. TCP,UDP stream forwarding or round robin TCP,UDP forwarding across a set of backends.
  103. Service cluster ips and ports are currently found through Docker-links-compatible
  104. environment variables specifying ports opened by the service proxy. There is an optional
  105. addon that provides cluster DNS for these cluster IPs. The user must create a service
  106. with the apiserver API to configure the proxy.`,
  107. Run: func(cmd *cobra.Command, args []string) {
  108. },
  109. }
  110. return cmd
  111. }
  112. // NewProxyServerDefault creates a new ProxyServer object with default parameters.
  113. func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) {
  114. if c, err := configz.New("componentconfig"); err == nil {
  115. c.Set(config.KubeProxyConfiguration)
  116. } else {
  117. glog.Errorf("unable to register configz: %s", err)
  118. }
  119. protocol := utiliptables.ProtocolIpv4
  120. if net.ParseIP(config.BindAddress).To4() == nil {
  121. protocol = utiliptables.ProtocolIpv6
  122. }
  123. // Create a iptables utils.
  124. execer := exec.New()
  125. dbus := utildbus.New()
  126. iptInterface := utiliptables.New(execer, dbus, protocol)
  127. // We omit creation of pretty much everything if we run in cleanup mode
  128. if config.CleanupAndExit {
  129. return &ProxyServer{
  130. Config: config,
  131. IptInterface: iptInterface,
  132. }, nil
  133. }
  134. // TODO(vmarmol): Use container config for this.
  135. var oomAdjuster *oom.OOMAdjuster
  136. if config.OOMScoreAdj != nil {
  137. oomAdjuster = oom.NewOOMAdjuster()
  138. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil {
  139. glog.V(2).Info(err)
  140. }
  141. }
  142. if config.ResourceContainer != "" {
  143. // Run in its own container.
  144. if err := resourcecontainer.RunInResourceContainer(config.ResourceContainer); err != nil {
  145. glog.Warningf("Failed to start in resource-only container %q: %v", config.ResourceContainer, err)
  146. } else {
  147. glog.V(2).Infof("Running in resource-only container %q", config.ResourceContainer)
  148. }
  149. }
  150. // Create a Kube Client
  151. // define api config source
  152. if config.Kubeconfig == "" && config.Master == "" {
  153. glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
  154. }
  155. // This creates a client, first loading any specified kubeconfig
  156. // file, and then overriding the Master flag, if non-empty.
  157. kubeconfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  158. &clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
  159. &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: config.Master}}).ClientConfig()
  160. if err != nil {
  161. return nil, err
  162. }
  163. kubeconfig.ContentType = config.ContentType
  164. // Override kubeconfig qps/burst settings from flags
  165. kubeconfig.QPS = config.KubeAPIQPS
  166. kubeconfig.Burst = int(config.KubeAPIBurst)
  167. client, err := kubeclient.New(kubeconfig)
  168. if err != nil {
  169. glog.Fatalf("Invalid API configuration: %v", err)
  170. }
  171. // Create event recorder
  172. hostname := nodeutil.GetHostname(config.HostnameOverride)
  173. eventBroadcaster := record.NewBroadcaster()
  174. recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "kube-proxy", Host: hostname})
  175. var proxier proxy.ProxyProvider
  176. var endpointsHandler proxyconfig.EndpointsConfigHandler
  177. proxyMode := getProxyMode(string(config.Mode), client.Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
  178. if proxyMode == proxyModeIptables {
  179. glog.V(0).Info("Using iptables Proxier.")
  180. if config.IPTablesMasqueradeBit == nil {
  181. // IPTablesMasqueradeBit must be specified or defaulted.
  182. return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
  183. }
  184. proxierIptables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
  185. if err != nil {
  186. glog.Fatalf("Unable to create proxier: %v", err)
  187. }
  188. proxier = proxierIptables
  189. endpointsHandler = proxierIptables
  190. // No turning back. Remove artifacts that might still exist from the userspace Proxier.
  191. glog.V(0).Info("Tearing down userspace rules.")
  192. userspace.CleanupLeftovers(iptInterface)
  193. } else {
  194. glog.V(0).Info("Using userspace Proxier.")
  195. // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
  196. // our config.EndpointsConfigHandler.
  197. loadBalancer := userspace.NewLoadBalancerRR()
  198. // set EndpointsConfigHandler to our loadBalancer
  199. endpointsHandler = loadBalancer
  200. proxierUserspace, err := userspace.NewProxier(
  201. loadBalancer,
  202. net.ParseIP(config.BindAddress),
  203. iptInterface,
  204. *utilnet.ParsePortRangeOrDie(config.PortRange),
  205. config.IPTablesSyncPeriod.Duration,
  206. config.UDPIdleTimeout.Duration,
  207. )
  208. if err != nil {
  209. glog.Fatalf("Unable to create proxier: %v", err)
  210. }
  211. proxier = proxierUserspace
  212. // Remove artifacts from the pure-iptables Proxier.
  213. glog.V(0).Info("Tearing down pure-iptables proxy rules.")
  214. iptables.CleanupLeftovers(iptInterface)
  215. }
  216. iptInterface.AddReloadFunc(proxier.Sync)
  217. // Create configs (i.e. Watches for Services and Endpoints)
  218. // Note: RegisterHandler() calls need to happen before creation of Sources because sources
  219. // only notify on changes, and the initial update (on process start) may be lost if no handlers
  220. // are registered yet.
  221. serviceConfig := proxyconfig.NewServiceConfig()
  222. serviceConfig.RegisterHandler(proxier)
  223. endpointsConfig := proxyconfig.NewEndpointsConfig()
  224. endpointsConfig.RegisterHandler(endpointsHandler)
  225. proxyconfig.NewSourceAPI(
  226. client,
  227. config.ConfigSyncPeriod,
  228. serviceConfig.Channel("api"),
  229. endpointsConfig.Channel("api"),
  230. )
  231. config.NodeRef = &api.ObjectReference{
  232. Kind: "Node",
  233. Name: hostname,
  234. UID: types.UID(hostname),
  235. Namespace: "",
  236. }
  237. conntracker := realConntracker{}
  238. return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode)
  239. }
  240. // Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
  241. func (s *ProxyServer) Run() error {
  242. // remove iptables rules and exit
  243. if s.Config.CleanupAndExit {
  244. encounteredError := userspace.CleanupLeftovers(s.IptInterface)
  245. encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
  246. if encounteredError {
  247. return errors.New("Encountered an error while tearing down rules.")
  248. }
  249. return nil
  250. }
  251. s.Broadcaster.StartRecordingToSink(s.Client.Events(""))
  252. // Start up a webserver if requested
  253. if s.Config.HealthzPort > 0 {
  254. http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
  255. fmt.Fprintf(w, "%s", s.ProxyMode)
  256. })
  257. configz.InstallHandler(http.DefaultServeMux)
  258. go wait.Until(func() {
  259. err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil)
  260. if err != nil {
  261. glog.Errorf("Starting health server failed: %v", err)
  262. }
  263. }, 5*time.Second, wait.NeverStop)
  264. }
  265. // Tune conntrack, if requested
  266. if s.Conntracker != nil {
  267. max, err := getConntrackMax(s.Config)
  268. if err != nil {
  269. return err
  270. }
  271. if max > 0 {
  272. err := s.Conntracker.SetMax(max)
  273. if err != nil {
  274. if err != readOnlySysFSError {
  275. return err
  276. }
  277. // readOnlySysFSError is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
  278. // the only remediation we know is to restart the docker daemon.
  279. // Here we'll send an node event with specific reason and message, the
  280. // administrator should decide whether and how to handle this issue,
  281. // whether to drain the node and restart docker.
  282. // TODO(random-liu): Remove this when the docker bug is fixed.
  283. const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: can't raise conntrack limits, problems may arise later."
  284. s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeWarning, err.Error(), message)
  285. }
  286. }
  287. if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
  288. if err := s.Conntracker.SetTCPEstablishedTimeout(int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)); err != nil {
  289. return err
  290. }
  291. }
  292. }
  293. // Birth Cry after the birth is successful
  294. s.birthCry()
  295. // Just loop forever for now...
  296. s.Proxier.SyncLoop()
  297. return nil
  298. }
  299. func getConntrackMax(config *options.ProxyServerConfig) (int, error) {
  300. if config.ConntrackMax > 0 && config.ConntrackMaxPerCore > 0 {
  301. return -1, fmt.Errorf("invalid config: ConntrackMax and ConntrackMaxPerCore are mutually exclusive")
  302. }
  303. if config.ConntrackMax > 0 {
  304. return int(config.ConntrackMax), nil
  305. } else if config.ConntrackMaxPerCore > 0 {
  306. return (int(config.ConntrackMaxPerCore) * runtime.NumCPU()), nil
  307. }
  308. return 0, nil
  309. }
  310. type nodeGetter interface {
  311. Get(hostname string) (*api.Node, error)
  312. }
  313. func getProxyMode(proxyMode string, client nodeGetter, hostname string, iptver iptables.IptablesVersioner, kcompat iptables.KernelCompatTester) string {
  314. if proxyMode == proxyModeUserspace {
  315. return proxyModeUserspace
  316. } else if proxyMode == proxyModeIptables {
  317. return tryIptablesProxy(iptver, kcompat)
  318. } else if proxyMode != "" {
  319. glog.Warningf("Flag proxy-mode=%q unknown, assuming iptables proxy", proxyMode)
  320. return tryIptablesProxy(iptver, kcompat)
  321. }
  322. // proxyMode == "" - choose the best option.
  323. if client == nil {
  324. glog.Errorf("nodeGetter is nil: assuming iptables proxy")
  325. return tryIptablesProxy(iptver, kcompat)
  326. }
  327. node, err := client.Get(hostname)
  328. if err != nil {
  329. glog.Errorf("Can't get Node %q, assuming iptables proxy, err: %v", hostname, err)
  330. return tryIptablesProxy(iptver, kcompat)
  331. }
  332. if node == nil {
  333. glog.Errorf("Got nil Node %q, assuming iptables proxy", hostname)
  334. return tryIptablesProxy(iptver, kcompat)
  335. }
  336. proxyMode, found := node.Annotations[betaProxyModeAnnotation]
  337. if found {
  338. glog.V(1).Infof("Found beta annotation %q = %q", betaProxyModeAnnotation, proxyMode)
  339. } else {
  340. // We already published some information about this annotation with the "experimental" name, so we will respect it.
  341. proxyMode, found = node.Annotations[experimentalProxyModeAnnotation]
  342. if found {
  343. glog.V(1).Infof("Found experimental annotation %q = %q", experimentalProxyModeAnnotation, proxyMode)
  344. }
  345. }
  346. if proxyMode == proxyModeUserspace {
  347. glog.V(1).Infof("Annotation demands userspace proxy")
  348. return proxyModeUserspace
  349. }
  350. return tryIptablesProxy(iptver, kcompat)
  351. }
  352. func tryIptablesProxy(iptver iptables.IptablesVersioner, kcompat iptables.KernelCompatTester) string {
  353. var err error
  354. // guaranteed false on error, error only necessary for debugging
  355. useIptablesProxy, err := iptables.CanUseIptablesProxier(iptver, kcompat)
  356. if err != nil {
  357. glog.Errorf("Can't determine whether to use iptables proxy, using userspace proxier: %v", err)
  358. return proxyModeUserspace
  359. }
  360. if useIptablesProxy {
  361. return proxyModeIptables
  362. }
  363. // Fallback.
  364. glog.V(1).Infof("Can't use iptables proxy, using userspace proxier: %v", err)
  365. return proxyModeUserspace
  366. }
  367. func (s *ProxyServer) birthCry() {
  368. s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
  369. }
  370. func getNodeIP(client *kubeclient.Client, hostname string) net.IP {
  371. var nodeIP net.IP
  372. node, err := client.Nodes().Get(hostname)
  373. if err != nil {
  374. glog.Warningf("Failed to retrieve node info: %v", err)
  375. return nil
  376. }
  377. nodeIP, err = nodeutil.GetNodeHostIP(node)
  378. if err != nil {
  379. glog.Warningf("Failed to retrieve node IP: %v", err)
  380. return nil
  381. }
  382. return nodeIP
  383. }