server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app does all of the work necessary to create a Kubernetes
  14. // APIServer by binding together the API, master and APIServer infrastructure.
  15. // It can be configured and called directly or via the hyperkube framework.
  16. package app
  17. import (
  18. "crypto/tls"
  19. "net"
  20. "net/url"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/golang/glog"
  25. "github.com/spf13/cobra"
  26. "github.com/spf13/pflag"
  27. "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
  28. "k8s.io/kubernetes/pkg/admission"
  29. "k8s.io/kubernetes/pkg/api"
  30. "k8s.io/kubernetes/pkg/api/unversioned"
  31. "k8s.io/kubernetes/pkg/apis/autoscaling"
  32. "k8s.io/kubernetes/pkg/apis/batch"
  33. "k8s.io/kubernetes/pkg/apis/extensions"
  34. "k8s.io/kubernetes/pkg/apis/rbac"
  35. "k8s.io/kubernetes/pkg/apiserver"
  36. "k8s.io/kubernetes/pkg/apiserver/authenticator"
  37. "k8s.io/kubernetes/pkg/capabilities"
  38. "k8s.io/kubernetes/pkg/cloudprovider"
  39. "k8s.io/kubernetes/pkg/controller/framework/informers"
  40. serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
  41. "k8s.io/kubernetes/pkg/genericapiserver"
  42. "k8s.io/kubernetes/pkg/genericapiserver/authorizer"
  43. genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
  44. genericvalidation "k8s.io/kubernetes/pkg/genericapiserver/validation"
  45. kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
  46. "k8s.io/kubernetes/pkg/master"
  47. "k8s.io/kubernetes/pkg/registry/cachesize"
  48. "k8s.io/kubernetes/pkg/registry/clusterrole"
  49. clusterroleetcd "k8s.io/kubernetes/pkg/registry/clusterrole/etcd"
  50. "k8s.io/kubernetes/pkg/registry/clusterrolebinding"
  51. clusterrolebindingetcd "k8s.io/kubernetes/pkg/registry/clusterrolebinding/etcd"
  52. "k8s.io/kubernetes/pkg/registry/generic"
  53. "k8s.io/kubernetes/pkg/registry/role"
  54. roleetcd "k8s.io/kubernetes/pkg/registry/role/etcd"
  55. "k8s.io/kubernetes/pkg/registry/rolebinding"
  56. rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
  57. "k8s.io/kubernetes/pkg/serviceaccount"
  58. "k8s.io/kubernetes/pkg/util/wait"
  59. )
  60. // NewAPIServerCommand creates a *cobra.Command object with default parameters
  61. func NewAPIServerCommand() *cobra.Command {
  62. s := options.NewAPIServer()
  63. s.AddFlags(pflag.CommandLine)
  64. cmd := &cobra.Command{
  65. Use: "kube-apiserver",
  66. Long: `The Kubernetes API server validates and configures data
  67. for the api objects which include pods, services, replicationcontrollers, and
  68. others. The API Server services REST operations and provides the frontend to the
  69. cluster's shared state through which all other components interact.`,
  70. Run: func(cmd *cobra.Command, args []string) {
  71. },
  72. }
  73. return cmd
  74. }
  75. // Run runs the specified APIServer. This should never exit.
  76. func Run(s *options.APIServer) error {
  77. genericvalidation.VerifyEtcdServersList(s.ServerRunOptions)
  78. genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)
  79. capabilities.Initialize(capabilities.Capabilities{
  80. AllowPrivileged: s.AllowPrivileged,
  81. // TODO(vmarmol): Implement support for HostNetworkSources.
  82. PrivilegedSources: capabilities.PrivilegedSources{
  83. HostNetworkSources: []string{},
  84. HostPIDSources: []string{},
  85. HostIPCSources: []string{},
  86. },
  87. PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
  88. })
  89. // Setup tunneler if needed
  90. var tunneler genericapiserver.Tunneler
  91. var proxyDialerFn apiserver.ProxyDialerFunc
  92. if len(s.SSHUser) > 0 {
  93. // Get ssh key distribution func, if supported
  94. var installSSH genericapiserver.InstallSSHKey
  95. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  96. if err != nil {
  97. glog.Fatalf("Cloud provider could not be initialized: %v", err)
  98. }
  99. if cloud != nil {
  100. if instances, supported := cloud.Instances(); supported {
  101. installSSH = instances.AddSSHKeyToAllInstances
  102. }
  103. }
  104. if s.KubeletConfig.Port == 0 {
  105. glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
  106. }
  107. // Set up the tunneler
  108. // TODO(cjcullen): If we want this to handle per-kubelet ports or other
  109. // kubelet listen-addresses, we need to plumb through options.
  110. healthCheckPath := &url.URL{
  111. Scheme: "https",
  112. Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
  113. Path: "healthz",
  114. }
  115. tunneler = genericapiserver.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)
  116. // Use the tunneler's dialer to connect to the kubelet
  117. s.KubeletConfig.Dial = tunneler.Dial
  118. // Use the tunneler's dialer when proxying to pods, services, and nodes
  119. proxyDialerFn = tunneler.Dial
  120. }
  121. // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
  122. proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
  123. kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig)
  124. if err != nil {
  125. glog.Fatalf("Failed to start kubelet client: %v", err)
  126. }
  127. storageGroupsToEncodingVersion, err := s.StorageGroupsToEncodingVersion()
  128. if err != nil {
  129. glog.Fatalf("error generating storage version map: %s", err)
  130. }
  131. storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
  132. s.StorageConfig, s.DefaultStorageMediaType, api.Codecs,
  133. genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
  134. // FIXME: this GroupVersionResource override should be configurable
  135. []unversioned.GroupVersionResource{batch.Resource("scheduledjobs").WithVersion("v2alpha1")},
  136. master.DefaultAPIResourceConfigSource(), s.RuntimeConfig)
  137. if err != nil {
  138. glog.Fatalf("error in initializing storage factory: %s", err)
  139. }
  140. storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
  141. storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
  142. for _, override := range s.EtcdServersOverrides {
  143. tokens := strings.Split(override, "#")
  144. if len(tokens) != 2 {
  145. glog.Errorf("invalid value of etcd server overrides: %s", override)
  146. continue
  147. }
  148. apiresource := strings.Split(tokens[0], "/")
  149. if len(apiresource) != 2 {
  150. glog.Errorf("invalid resource definition: %s", tokens[0])
  151. continue
  152. }
  153. group := apiresource[0]
  154. resource := apiresource[1]
  155. groupResource := unversioned.GroupResource{Group: group, Resource: resource}
  156. servers := strings.Split(tokens[1], ";")
  157. storageFactory.SetEtcdLocation(groupResource, servers)
  158. }
  159. // Default to the private server key for service account token signing
  160. if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" {
  161. if authenticator.IsValidServiceAccountKeyFile(s.TLSPrivateKeyFile) {
  162. s.ServiceAccountKeyFile = s.TLSPrivateKeyFile
  163. } else {
  164. glog.Warning("No RSA key provided, service account token authentication disabled")
  165. }
  166. }
  167. var serviceAccountGetter serviceaccount.ServiceAccountTokenGetter
  168. if s.ServiceAccountLookup {
  169. // If we need to look up service accounts and tokens,
  170. // go directly to etcd to avoid recursive auth insanity
  171. storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
  172. if err != nil {
  173. glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
  174. }
  175. serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
  176. }
  177. authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
  178. BasicAuthFile: s.BasicAuthFile,
  179. ClientCAFile: s.ClientCAFile,
  180. TokenAuthFile: s.TokenAuthFile,
  181. OIDCIssuerURL: s.OIDCIssuerURL,
  182. OIDCClientID: s.OIDCClientID,
  183. OIDCCAFile: s.OIDCCAFile,
  184. OIDCUsernameClaim: s.OIDCUsernameClaim,
  185. OIDCGroupsClaim: s.OIDCGroupsClaim,
  186. ServiceAccountKeyFile: s.ServiceAccountKeyFile,
  187. ServiceAccountLookup: s.ServiceAccountLookup,
  188. ServiceAccountTokenGetter: serviceAccountGetter,
  189. KeystoneURL: s.KeystoneURL,
  190. WebhookTokenAuthnConfigFile: s.WebhookTokenAuthnConfigFile,
  191. WebhookTokenAuthnCacheTTL: s.WebhookTokenAuthnCacheTTL,
  192. })
  193. if err != nil {
  194. glog.Fatalf("Invalid Authentication Config: %v", err)
  195. }
  196. authorizationModeNames := strings.Split(s.AuthorizationMode, ",")
  197. modeEnabled := func(mode string) bool {
  198. for _, m := range authorizationModeNames {
  199. if m == mode {
  200. return true
  201. }
  202. }
  203. return false
  204. }
  205. authorizationConfig := authorizer.AuthorizationConfig{
  206. PolicyFile: s.AuthorizationPolicyFile,
  207. WebhookConfigFile: s.AuthorizationWebhookConfigFile,
  208. WebhookCacheAuthorizedTTL: s.AuthorizationWebhookCacheAuthorizedTTL,
  209. WebhookCacheUnauthorizedTTL: s.AuthorizationWebhookCacheUnauthorizedTTL,
  210. RBACSuperUser: s.AuthorizationRBACSuperUser,
  211. }
  212. if modeEnabled(genericoptions.ModeRBAC) {
  213. mustGetRESTOptions := func(resource string) generic.RESTOptions {
  214. config, err := storageFactory.NewConfig(rbac.Resource(resource))
  215. if err != nil {
  216. glog.Fatalf("Unable to get %s storage: %v", resource, err)
  217. }
  218. return generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
  219. }
  220. // For initial bootstrapping go directly to etcd to avoid privillege escalation check.
  221. authorizationConfig.RBACRoleRegistry = role.NewRegistry(roleetcd.NewREST(mustGetRESTOptions("roles")))
  222. authorizationConfig.RBACRoleBindingRegistry = rolebinding.NewRegistry(rolebindingetcd.NewREST(mustGetRESTOptions("rolebindings")))
  223. authorizationConfig.RBACClusterRoleRegistry = clusterrole.NewRegistry(clusterroleetcd.NewREST(mustGetRESTOptions("clusterroles")))
  224. authorizationConfig.RBACClusterRoleBindingRegistry = clusterrolebinding.NewRegistry(clusterrolebindingetcd.NewREST(mustGetRESTOptions("clusterrolebindings")))
  225. }
  226. authorizer, err := authorizer.NewAuthorizerFromAuthorizationConfig(authorizationModeNames, authorizationConfig)
  227. if err != nil {
  228. glog.Fatalf("Invalid Authorization Config: %v", err)
  229. }
  230. admissionControlPluginNames := strings.Split(s.AdmissionControl, ",")
  231. client, err := s.NewSelfClient()
  232. if err != nil {
  233. glog.Errorf("Failed to create clientset: %v", err)
  234. }
  235. sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
  236. pluginInitializer := admission.NewPluginInitializer(sharedInformers)
  237. admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
  238. if err != nil {
  239. glog.Fatalf("Failed to initialize plugins: %v", err)
  240. }
  241. genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
  242. // TODO: Move the following to generic api server as well.
  243. genericConfig.StorageFactory = storageFactory
  244. genericConfig.Authenticator = authenticator
  245. genericConfig.SupportsBasicAuth = len(s.BasicAuthFile) > 0
  246. genericConfig.Authorizer = authorizer
  247. genericConfig.AuthorizerRBACSuperUser = s.AuthorizationRBACSuperUser
  248. genericConfig.AdmissionControl = admissionController
  249. genericConfig.APIResourceConfigSource = storageFactory.APIResourceConfigSource
  250. genericConfig.MasterServiceNamespace = s.MasterServiceNamespace
  251. genericConfig.ProxyDialer = proxyDialerFn
  252. genericConfig.ProxyTLSClientConfig = proxyTLSClientConfig
  253. genericConfig.Serializer = api.Codecs
  254. genericConfig.OpenAPIInfo.Title = "Kubernetes"
  255. config := &master.Config{
  256. Config: genericConfig,
  257. EnableCoreControllers: true,
  258. DeleteCollectionWorkers: s.DeleteCollectionWorkers,
  259. EventTTL: s.EventTTL,
  260. KubeletClient: kubeletClient,
  261. Tunneler: tunneler,
  262. }
  263. if s.EnableWatchCache {
  264. glog.V(2).Infof("Initalizing cache sizes based on %dMB limit", s.TargetRAMMB)
  265. cachesize.InitializeWatchCacheSizes(s.TargetRAMMB)
  266. cachesize.SetWatchCacheSizes(s.WatchCacheSizes)
  267. }
  268. m, err := master.New(config)
  269. if err != nil {
  270. return err
  271. }
  272. sharedInformers.Start(wait.NeverStop)
  273. m.Run(s.ServerRunOptions)
  274. return nil
  275. }