123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package app does all of the work necessary to create a Kubernetes
- // APIServer by binding together the API, master and APIServer infrastructure.
- // It can be configured and called directly or via the hyperkube framework.
- package app
- import (
- "crypto/tls"
- "net"
- "net/url"
- "strconv"
- "strings"
- "time"
- "github.com/golang/glog"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
- "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
- "k8s.io/kubernetes/pkg/admission"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/unversioned"
- "k8s.io/kubernetes/pkg/apis/autoscaling"
- "k8s.io/kubernetes/pkg/apis/batch"
- "k8s.io/kubernetes/pkg/apis/extensions"
- "k8s.io/kubernetes/pkg/apis/rbac"
- "k8s.io/kubernetes/pkg/apiserver"
- "k8s.io/kubernetes/pkg/apiserver/authenticator"
- "k8s.io/kubernetes/pkg/capabilities"
- "k8s.io/kubernetes/pkg/cloudprovider"
- "k8s.io/kubernetes/pkg/controller/framework/informers"
- serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
- "k8s.io/kubernetes/pkg/genericapiserver"
- "k8s.io/kubernetes/pkg/genericapiserver/authorizer"
- genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
- genericvalidation "k8s.io/kubernetes/pkg/genericapiserver/validation"
- kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
- "k8s.io/kubernetes/pkg/master"
- "k8s.io/kubernetes/pkg/registry/cachesize"
- "k8s.io/kubernetes/pkg/registry/clusterrole"
- clusterroleetcd "k8s.io/kubernetes/pkg/registry/clusterrole/etcd"
- "k8s.io/kubernetes/pkg/registry/clusterrolebinding"
- clusterrolebindingetcd "k8s.io/kubernetes/pkg/registry/clusterrolebinding/etcd"
- "k8s.io/kubernetes/pkg/registry/generic"
- "k8s.io/kubernetes/pkg/registry/role"
- roleetcd "k8s.io/kubernetes/pkg/registry/role/etcd"
- "k8s.io/kubernetes/pkg/registry/rolebinding"
- rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
- "k8s.io/kubernetes/pkg/serviceaccount"
- "k8s.io/kubernetes/pkg/util/wait"
- )
- // NewAPIServerCommand creates a *cobra.Command object with default parameters
- func NewAPIServerCommand() *cobra.Command {
- s := options.NewAPIServer()
- s.AddFlags(pflag.CommandLine)
- cmd := &cobra.Command{
- Use: "kube-apiserver",
- Long: `The Kubernetes API server validates and configures data
- for the api objects which include pods, services, replicationcontrollers, and
- others. The API Server services REST operations and provides the frontend to the
- cluster's shared state through which all other components interact.`,
- Run: func(cmd *cobra.Command, args []string) {
- },
- }
- return cmd
- }
- // Run runs the specified APIServer. This should never exit.
- func Run(s *options.APIServer) error {
- genericvalidation.VerifyEtcdServersList(s.ServerRunOptions)
- genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)
- capabilities.Initialize(capabilities.Capabilities{
- AllowPrivileged: s.AllowPrivileged,
- // TODO(vmarmol): Implement support for HostNetworkSources.
- PrivilegedSources: capabilities.PrivilegedSources{
- HostNetworkSources: []string{},
- HostPIDSources: []string{},
- HostIPCSources: []string{},
- },
- PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
- })
- // Setup tunneler if needed
- var tunneler genericapiserver.Tunneler
- var proxyDialerFn apiserver.ProxyDialerFunc
- if len(s.SSHUser) > 0 {
- // Get ssh key distribution func, if supported
- var installSSH genericapiserver.InstallSSHKey
- cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
- if err != nil {
- glog.Fatalf("Cloud provider could not be initialized: %v", err)
- }
- if cloud != nil {
- if instances, supported := cloud.Instances(); supported {
- installSSH = instances.AddSSHKeyToAllInstances
- }
- }
- if s.KubeletConfig.Port == 0 {
- glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
- }
- // Set up the tunneler
- // TODO(cjcullen): If we want this to handle per-kubelet ports or other
- // kubelet listen-addresses, we need to plumb through options.
- healthCheckPath := &url.URL{
- Scheme: "https",
- Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
- Path: "healthz",
- }
- tunneler = genericapiserver.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)
- // Use the tunneler's dialer to connect to the kubelet
- s.KubeletConfig.Dial = tunneler.Dial
- // Use the tunneler's dialer when proxying to pods, services, and nodes
- proxyDialerFn = tunneler.Dial
- }
- // Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
- proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
- kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig)
- if err != nil {
- glog.Fatalf("Failed to start kubelet client: %v", err)
- }
- storageGroupsToEncodingVersion, err := s.StorageGroupsToEncodingVersion()
- if err != nil {
- glog.Fatalf("error generating storage version map: %s", err)
- }
- storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
- s.StorageConfig, s.DefaultStorageMediaType, api.Codecs,
- genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
- // FIXME: this GroupVersionResource override should be configurable
- []unversioned.GroupVersionResource{batch.Resource("scheduledjobs").WithVersion("v2alpha1")},
- master.DefaultAPIResourceConfigSource(), s.RuntimeConfig)
- if err != nil {
- glog.Fatalf("error in initializing storage factory: %s", err)
- }
- storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
- storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
- for _, override := range s.EtcdServersOverrides {
- tokens := strings.Split(override, "#")
- if len(tokens) != 2 {
- glog.Errorf("invalid value of etcd server overrides: %s", override)
- continue
- }
- apiresource := strings.Split(tokens[0], "/")
- if len(apiresource) != 2 {
- glog.Errorf("invalid resource definition: %s", tokens[0])
- continue
- }
- group := apiresource[0]
- resource := apiresource[1]
- groupResource := unversioned.GroupResource{Group: group, Resource: resource}
- servers := strings.Split(tokens[1], ";")
- storageFactory.SetEtcdLocation(groupResource, servers)
- }
- // Default to the private server key for service account token signing
- if s.ServiceAccountKeyFile == "" && s.TLSPrivateKeyFile != "" {
- if authenticator.IsValidServiceAccountKeyFile(s.TLSPrivateKeyFile) {
- s.ServiceAccountKeyFile = s.TLSPrivateKeyFile
- } else {
- glog.Warning("No RSA key provided, service account token authentication disabled")
- }
- }
- var serviceAccountGetter serviceaccount.ServiceAccountTokenGetter
- if s.ServiceAccountLookup {
- // If we need to look up service accounts and tokens,
- // go directly to etcd to avoid recursive auth insanity
- storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
- if err != nil {
- glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
- }
- serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
- }
- authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
- BasicAuthFile: s.BasicAuthFile,
- ClientCAFile: s.ClientCAFile,
- TokenAuthFile: s.TokenAuthFile,
- OIDCIssuerURL: s.OIDCIssuerURL,
- OIDCClientID: s.OIDCClientID,
- OIDCCAFile: s.OIDCCAFile,
- OIDCUsernameClaim: s.OIDCUsernameClaim,
- OIDCGroupsClaim: s.OIDCGroupsClaim,
- ServiceAccountKeyFile: s.ServiceAccountKeyFile,
- ServiceAccountLookup: s.ServiceAccountLookup,
- ServiceAccountTokenGetter: serviceAccountGetter,
- KeystoneURL: s.KeystoneURL,
- WebhookTokenAuthnConfigFile: s.WebhookTokenAuthnConfigFile,
- WebhookTokenAuthnCacheTTL: s.WebhookTokenAuthnCacheTTL,
- })
- if err != nil {
- glog.Fatalf("Invalid Authentication Config: %v", err)
- }
- authorizationModeNames := strings.Split(s.AuthorizationMode, ",")
- modeEnabled := func(mode string) bool {
- for _, m := range authorizationModeNames {
- if m == mode {
- return true
- }
- }
- return false
- }
- authorizationConfig := authorizer.AuthorizationConfig{
- PolicyFile: s.AuthorizationPolicyFile,
- WebhookConfigFile: s.AuthorizationWebhookConfigFile,
- WebhookCacheAuthorizedTTL: s.AuthorizationWebhookCacheAuthorizedTTL,
- WebhookCacheUnauthorizedTTL: s.AuthorizationWebhookCacheUnauthorizedTTL,
- RBACSuperUser: s.AuthorizationRBACSuperUser,
- }
- if modeEnabled(genericoptions.ModeRBAC) {
- mustGetRESTOptions := func(resource string) generic.RESTOptions {
- config, err := storageFactory.NewConfig(rbac.Resource(resource))
- if err != nil {
- glog.Fatalf("Unable to get %s storage: %v", resource, err)
- }
- return generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
- }
- // For initial bootstrapping go directly to etcd to avoid privillege escalation check.
- authorizationConfig.RBACRoleRegistry = role.NewRegistry(roleetcd.NewREST(mustGetRESTOptions("roles")))
- authorizationConfig.RBACRoleBindingRegistry = rolebinding.NewRegistry(rolebindingetcd.NewREST(mustGetRESTOptions("rolebindings")))
- authorizationConfig.RBACClusterRoleRegistry = clusterrole.NewRegistry(clusterroleetcd.NewREST(mustGetRESTOptions("clusterroles")))
- authorizationConfig.RBACClusterRoleBindingRegistry = clusterrolebinding.NewRegistry(clusterrolebindingetcd.NewREST(mustGetRESTOptions("clusterrolebindings")))
- }
- authorizer, err := authorizer.NewAuthorizerFromAuthorizationConfig(authorizationModeNames, authorizationConfig)
- if err != nil {
- glog.Fatalf("Invalid Authorization Config: %v", err)
- }
- admissionControlPluginNames := strings.Split(s.AdmissionControl, ",")
- client, err := s.NewSelfClient()
- if err != nil {
- glog.Errorf("Failed to create clientset: %v", err)
- }
- sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
- pluginInitializer := admission.NewPluginInitializer(sharedInformers)
- admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
- if err != nil {
- glog.Fatalf("Failed to initialize plugins: %v", err)
- }
- genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
- // TODO: Move the following to generic api server as well.
- genericConfig.StorageFactory = storageFactory
- genericConfig.Authenticator = authenticator
- genericConfig.SupportsBasicAuth = len(s.BasicAuthFile) > 0
- genericConfig.Authorizer = authorizer
- genericConfig.AuthorizerRBACSuperUser = s.AuthorizationRBACSuperUser
- genericConfig.AdmissionControl = admissionController
- genericConfig.APIResourceConfigSource = storageFactory.APIResourceConfigSource
- genericConfig.MasterServiceNamespace = s.MasterServiceNamespace
- genericConfig.ProxyDialer = proxyDialerFn
- genericConfig.ProxyTLSClientConfig = proxyTLSClientConfig
- genericConfig.Serializer = api.Codecs
- genericConfig.OpenAPIInfo.Title = "Kubernetes"
- config := &master.Config{
- Config: genericConfig,
- EnableCoreControllers: true,
- DeleteCollectionWorkers: s.DeleteCollectionWorkers,
- EventTTL: s.EventTTL,
- KubeletClient: kubeletClient,
- Tunneler: tunneler,
- }
- if s.EnableWatchCache {
- glog.V(2).Infof("Initalizing cache sizes based on %dMB limit", s.TargetRAMMB)
- cachesize.InitializeWatchCacheSizes(s.TargetRAMMB)
- cachesize.SetWatchCacheSizes(s.WatchCacheSizes)
- }
- m, err := master.New(config)
- if err != nil {
- return err
- }
- sharedInformers.Start(wait.NeverStop)
- m.Run(s.ServerRunOptions)
- return nil
- }
|