master.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package master
  14. import (
  15. "fmt"
  16. "io"
  17. "net"
  18. "net/http"
  19. "net/url"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "k8s.io/kubernetes/pkg/api"
  25. "k8s.io/kubernetes/pkg/api/meta"
  26. "k8s.io/kubernetes/pkg/api/rest"
  27. "k8s.io/kubernetes/pkg/api/unversioned"
  28. apiv1 "k8s.io/kubernetes/pkg/api/v1"
  29. "k8s.io/kubernetes/pkg/apimachinery/registered"
  30. appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
  31. authenticationv1beta1 "k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
  32. "k8s.io/kubernetes/pkg/apis/authorization"
  33. authorizationapiv1beta1 "k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
  34. "k8s.io/kubernetes/pkg/apis/autoscaling"
  35. autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
  36. "k8s.io/kubernetes/pkg/apis/batch"
  37. batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
  38. "k8s.io/kubernetes/pkg/apis/certificates"
  39. certificatesapiv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
  40. "k8s.io/kubernetes/pkg/apis/extensions"
  41. extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
  42. "k8s.io/kubernetes/pkg/apis/policy"
  43. policyapiv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
  44. "k8s.io/kubernetes/pkg/apis/rbac"
  45. rbacapi "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
  46. "k8s.io/kubernetes/pkg/apiserver"
  47. apiservermetrics "k8s.io/kubernetes/pkg/apiserver/metrics"
  48. "k8s.io/kubernetes/pkg/genericapiserver"
  49. "k8s.io/kubernetes/pkg/healthz"
  50. kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
  51. "k8s.io/kubernetes/pkg/master/ports"
  52. "k8s.io/kubernetes/pkg/registry/componentstatus"
  53. configmapetcd "k8s.io/kubernetes/pkg/registry/configmap/etcd"
  54. controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
  55. "k8s.io/kubernetes/pkg/registry/endpoint"
  56. endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
  57. eventetcd "k8s.io/kubernetes/pkg/registry/event/etcd"
  58. "k8s.io/kubernetes/pkg/registry/generic"
  59. limitrangeetcd "k8s.io/kubernetes/pkg/registry/limitrange/etcd"
  60. "k8s.io/kubernetes/pkg/registry/namespace"
  61. namespaceetcd "k8s.io/kubernetes/pkg/registry/namespace/etcd"
  62. "k8s.io/kubernetes/pkg/registry/node"
  63. nodeetcd "k8s.io/kubernetes/pkg/registry/node/etcd"
  64. pvetcd "k8s.io/kubernetes/pkg/registry/persistentvolume/etcd"
  65. pvcetcd "k8s.io/kubernetes/pkg/registry/persistentvolumeclaim/etcd"
  66. podetcd "k8s.io/kubernetes/pkg/registry/pod/etcd"
  67. podtemplateetcd "k8s.io/kubernetes/pkg/registry/podtemplate/etcd"
  68. "k8s.io/kubernetes/pkg/registry/rangeallocation"
  69. resourcequotaetcd "k8s.io/kubernetes/pkg/registry/resourcequota/etcd"
  70. secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd"
  71. "k8s.io/kubernetes/pkg/registry/service"
  72. etcdallocator "k8s.io/kubernetes/pkg/registry/service/allocator/etcd"
  73. serviceetcd "k8s.io/kubernetes/pkg/registry/service/etcd"
  74. ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
  75. serviceaccountetcd "k8s.io/kubernetes/pkg/registry/serviceaccount/etcd"
  76. "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
  77. thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
  78. "k8s.io/kubernetes/pkg/runtime"
  79. etcdmetrics "k8s.io/kubernetes/pkg/storage/etcd/metrics"
  80. etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
  81. "k8s.io/kubernetes/pkg/storage/storagebackend"
  82. "k8s.io/kubernetes/pkg/util/sets"
  83. "github.com/golang/glog"
  84. "github.com/prometheus/client_golang/prometheus"
  85. "k8s.io/kubernetes/pkg/registry/service/allocator"
  86. "k8s.io/kubernetes/pkg/registry/service/portallocator"
  87. )
  88. const (
  89. // DefaultEndpointReconcilerInterval is the default amount of time for how often the endpoints for
  90. // the kubernetes Service are reconciled.
  91. DefaultEndpointReconcilerInterval = 10 * time.Second
  92. )
  93. type Config struct {
  94. *genericapiserver.Config
  95. EnableCoreControllers bool
  96. EndpointReconcilerConfig EndpointReconcilerConfig
  97. DeleteCollectionWorkers int
  98. EventTTL time.Duration
  99. KubeletClient kubeletclient.KubeletClient
  100. // RESTStorageProviders provides RESTStorage building methods keyed by groupName
  101. RESTStorageProviders map[string]RESTStorageProvider
  102. // Used to start and monitor tunneling
  103. Tunneler genericapiserver.Tunneler
  104. disableThirdPartyControllerForTesting bool
  105. }
  106. // EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
  107. // used by the master.
  108. type EndpointReconcilerConfig struct {
  109. Reconciler EndpointReconciler
  110. Interval time.Duration
  111. }
  112. // Master contains state for a Kubernetes cluster master/api server.
  113. type Master struct {
  114. *genericapiserver.GenericAPIServer
  115. // Map of v1 resources to their REST storages.
  116. v1ResourcesStorage map[string]rest.Storage
  117. enableCoreControllers bool
  118. deleteCollectionWorkers int
  119. // registries are internal client APIs for accessing the storage layer
  120. // TODO: define the internal typed interface in a way that clients can
  121. // also be replaced
  122. nodeRegistry node.Registry
  123. namespaceRegistry namespace.Registry
  124. serviceRegistry service.Registry
  125. endpointRegistry endpoint.Registry
  126. serviceClusterIPAllocator rangeallocation.RangeRegistry
  127. serviceNodePortAllocator rangeallocation.RangeRegistry
  128. // storage for third party objects
  129. thirdPartyStorageConfig *storagebackend.Config
  130. // map from api path to a tuple of (storage for the objects, APIGroup)
  131. thirdPartyResources map[string]*thirdPartyEntry
  132. // protects the map
  133. thirdPartyResourcesLock sync.RWMutex
  134. // Useful for reliable testing. Shouldn't be used otherwise.
  135. disableThirdPartyControllerForTesting bool
  136. // Used to start and monitor tunneling
  137. tunneler genericapiserver.Tunneler
  138. }
  139. // thirdPartyEntry combines objects storage and API group into one struct
  140. // for easy lookup.
  141. type thirdPartyEntry struct {
  142. // Map from plural resource name to entry
  143. storage map[string]*thirdpartyresourcedataetcd.REST
  144. group unversioned.APIGroup
  145. }
  146. type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
  147. type RESTStorageProvider interface {
  148. NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
  149. }
  150. // New returns a new instance of Master from the given config.
  151. // Certain config fields will be set to a default value if unset.
  152. // Certain config fields must be specified, including:
  153. // KubeletClient
  154. func New(c *Config) (*Master, error) {
  155. if c.KubeletClient == nil {
  156. return nil, fmt.Errorf("Master.New() called with config.KubeletClient == nil")
  157. }
  158. s, err := genericapiserver.New(c.Config)
  159. if err != nil {
  160. return nil, err
  161. }
  162. m := &Master{
  163. GenericAPIServer: s,
  164. enableCoreControllers: c.EnableCoreControllers,
  165. deleteCollectionWorkers: c.DeleteCollectionWorkers,
  166. tunneler: c.Tunneler,
  167. disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting,
  168. }
  169. // Add some hardcoded storage for now. Append to the map.
  170. if c.RESTStorageProviders == nil {
  171. c.RESTStorageProviders = map[string]RESTStorageProvider{}
  172. }
  173. c.RESTStorageProviders[appsapi.GroupName] = AppsRESTStorageProvider{}
  174. c.RESTStorageProviders[autoscaling.GroupName] = AutoscalingRESTStorageProvider{}
  175. c.RESTStorageProviders[batch.GroupName] = BatchRESTStorageProvider{}
  176. c.RESTStorageProviders[certificates.GroupName] = CertificatesRESTStorageProvider{}
  177. c.RESTStorageProviders[extensions.GroupName] = ExtensionsRESTStorageProvider{
  178. ResourceInterface: m,
  179. DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting,
  180. }
  181. c.RESTStorageProviders[policy.GroupName] = PolicyRESTStorageProvider{}
  182. c.RESTStorageProviders[rbac.GroupName] = RBACRESTStorageProvider{AuthorizerRBACSuperUser: c.AuthorizerRBACSuperUser}
  183. c.RESTStorageProviders[authenticationv1beta1.GroupName] = AuthenticationRESTStorageProvider{Authenticator: c.Authenticator}
  184. c.RESTStorageProviders[authorization.GroupName] = AuthorizationRESTStorageProvider{Authorizer: c.Authorizer}
  185. m.InstallAPIs(c)
  186. // TODO: Attempt clean shutdown?
  187. if m.enableCoreControllers {
  188. m.NewBootstrapController(c.EndpointReconcilerConfig).Start()
  189. }
  190. return m, nil
  191. }
  192. var defaultMetricsHandler = prometheus.Handler().ServeHTTP
  193. // MetricsWithReset is a handler that resets metrics when DELETE is passed to the endpoint.
  194. func MetricsWithReset(w http.ResponseWriter, req *http.Request) {
  195. if req.Method == "DELETE" {
  196. apiservermetrics.Reset()
  197. etcdmetrics.Reset()
  198. io.WriteString(w, "metrics reset\n")
  199. return
  200. }
  201. defaultMetricsHandler(w, req)
  202. }
  203. func (m *Master) InstallAPIs(c *Config) {
  204. apiGroupsInfo := []genericapiserver.APIGroupInfo{}
  205. // Install v1 unless disabled.
  206. if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
  207. // Install v1 API.
  208. m.initV1ResourcesStorage(c)
  209. apiGroupInfo := genericapiserver.APIGroupInfo{
  210. GroupMeta: *registered.GroupOrDie(api.GroupName),
  211. VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
  212. "v1": m.v1ResourcesStorage,
  213. },
  214. IsLegacyGroup: true,
  215. Scheme: api.Scheme,
  216. ParameterCodec: api.ParameterCodec,
  217. NegotiatedSerializer: api.Codecs,
  218. SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
  219. }
  220. if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
  221. apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
  222. }
  223. if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
  224. apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
  225. }
  226. apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
  227. }
  228. // Run the tunneler.
  229. healthzChecks := []healthz.HealthzChecker{}
  230. if m.tunneler != nil {
  231. m.tunneler.Run(m.getNodeAddresses)
  232. healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
  233. prometheus.NewGaugeFunc(prometheus.GaugeOpts{
  234. Name: "apiserver_proxy_tunnel_sync_latency_secs",
  235. Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
  236. }, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
  237. }
  238. healthz.InstallHandler(m.MuxHelper, healthzChecks...)
  239. if c.EnableProfiling {
  240. m.MuxHelper.HandleFunc("/metrics", MetricsWithReset)
  241. } else {
  242. m.MuxHelper.HandleFunc("/metrics", defaultMetricsHandler)
  243. }
  244. // Install third party resource support if requested
  245. // TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
  246. if c.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
  247. var err error
  248. m.thirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
  249. if err != nil {
  250. glog.Fatalf("Error getting third party storage: %v", err)
  251. }
  252. m.thirdPartyResources = map[string]*thirdPartyEntry{}
  253. }
  254. restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions {
  255. return m.GetRESTOptionsOrDie(c, resource)
  256. }
  257. // stabilize order.
  258. // TODO find a better way to configure priority of groups
  259. for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
  260. if !c.APIResourceConfigSource.AnyResourcesForGroupEnabled(group) {
  261. continue
  262. }
  263. restStorageBuilder := c.RESTStorageProviders[group]
  264. apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(c.APIResourceConfigSource, restOptionsGetter)
  265. if !enabled {
  266. continue
  267. }
  268. // This is here so that, if the policy group is present, the eviction
  269. // subresource handler wil be able to find poddisruptionbudgets
  270. // TODO(lavalamp) find a better way for groups to discover and interact
  271. // with each other
  272. if group == "policy" {
  273. storage := apiGroupsInfo[0].VersionedResourcesStorageMap["v1"]["pods/eviction"]
  274. evictionStorage := storage.(*podetcd.EvictionREST)
  275. storage = apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"]["poddisruptionbudgets"]
  276. evictionStorage.PodDisruptionBudgetLister = storage.(rest.Lister)
  277. evictionStorage.PodDisruptionBudgetUpdater = storage.(rest.Updater)
  278. }
  279. apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
  280. }
  281. if err := m.InstallAPIGroups(apiGroupsInfo); err != nil {
  282. glog.Fatalf("Error in registering group versions: %v", err)
  283. }
  284. }
  285. func (m *Master) initV1ResourcesStorage(c *Config) {
  286. restOptions := func(resource string) generic.RESTOptions {
  287. return m.GetRESTOptionsOrDie(c, api.Resource(resource))
  288. }
  289. podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))
  290. eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds()))
  291. limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges"))
  292. resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas"))
  293. secretStorage := secretetcd.NewREST(restOptions("secrets"))
  294. serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts"))
  295. persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes"))
  296. persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims"))
  297. configMapStorage := configmapetcd.NewREST(restOptions("configMaps"))
  298. namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces"))
  299. m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
  300. endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints"))
  301. m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
  302. nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport)
  303. m.nodeRegistry = node.NewRegistry(nodeStorage.Node)
  304. podStorage := podetcd.NewStorage(
  305. restOptions("pods"),
  306. kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
  307. m.ProxyTransport,
  308. )
  309. serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
  310. m.serviceRegistry = service.NewRegistry(serviceRESTStorage)
  311. var serviceClusterIPRegistry rangeallocation.RangeRegistry
  312. serviceClusterIPRange := m.ServiceClusterIPRange
  313. if serviceClusterIPRange == nil {
  314. glog.Fatalf("service clusterIPRange is nil")
  315. return
  316. }
  317. serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
  318. if err != nil {
  319. glog.Fatal(err.Error())
  320. }
  321. serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
  322. mem := allocator.NewAllocationMap(max, rangeSpec)
  323. // TODO etcdallocator package to return a storage interface via the storageFactory
  324. etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
  325. serviceClusterIPRegistry = etcd
  326. return etcd
  327. })
  328. m.serviceClusterIPAllocator = serviceClusterIPRegistry
  329. var serviceNodePortRegistry rangeallocation.RangeRegistry
  330. serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
  331. mem := allocator.NewAllocationMap(max, rangeSpec)
  332. // TODO etcdallocator package to return a storage interface via the storageFactory
  333. etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
  334. serviceNodePortRegistry = etcd
  335. return etcd
  336. })
  337. m.serviceNodePortAllocator = serviceNodePortRegistry
  338. controllerStorage := controlleretcd.NewStorage(restOptions("replicationControllers"))
  339. serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport)
  340. // TODO: Factor out the core API registration
  341. m.v1ResourcesStorage = map[string]rest.Storage{
  342. "pods": podStorage.Pod,
  343. "pods/attach": podStorage.Attach,
  344. "pods/status": podStorage.Status,
  345. "pods/log": podStorage.Log,
  346. "pods/exec": podStorage.Exec,
  347. "pods/portforward": podStorage.PortForward,
  348. "pods/proxy": podStorage.Proxy,
  349. "pods/binding": podStorage.Binding,
  350. "bindings": podStorage.Binding,
  351. "podTemplates": podTemplateStorage,
  352. "replicationControllers": controllerStorage.Controller,
  353. "replicationControllers/status": controllerStorage.Status,
  354. "services": serviceRest.Service,
  355. "services/proxy": serviceRest.Proxy,
  356. "services/status": serviceStatusStorage,
  357. "endpoints": endpointsStorage,
  358. "nodes": nodeStorage.Node,
  359. "nodes/status": nodeStorage.Status,
  360. "nodes/proxy": nodeStorage.Proxy,
  361. "events": eventStorage,
  362. "limitRanges": limitRangeStorage,
  363. "resourceQuotas": resourceQuotaStorage,
  364. "resourceQuotas/status": resourceQuotaStatusStorage,
  365. "namespaces": namespaceStorage,
  366. "namespaces/status": namespaceStatusStorage,
  367. "namespaces/finalize": namespaceFinalizeStorage,
  368. "secrets": secretStorage,
  369. "serviceAccounts": serviceAccountStorage,
  370. "persistentVolumes": persistentVolumeStorage,
  371. "persistentVolumes/status": persistentVolumeStatusStorage,
  372. "persistentVolumeClaims": persistentVolumeClaimStorage,
  373. "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
  374. "configMaps": configMapStorage,
  375. "componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
  376. }
  377. if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
  378. m.v1ResourcesStorage["replicationControllers/scale"] = controllerStorage.Scale
  379. }
  380. if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
  381. m.v1ResourcesStorage["pods/eviction"] = podStorage.Eviction
  382. }
  383. }
  384. // NewBootstrapController returns a controller for watching the core capabilities of the master. If
  385. // endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval
  386. // will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default
  387. // MasterCountEndpointReconciler will be used.
  388. func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller {
  389. if endpointReconcilerConfig.Interval == 0 {
  390. endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
  391. }
  392. if endpointReconcilerConfig.Reconciler == nil {
  393. // use a default endpoint reconciler if nothing is set
  394. // m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage
  395. endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry)
  396. }
  397. return &Controller{
  398. NamespaceRegistry: m.namespaceRegistry,
  399. ServiceRegistry: m.serviceRegistry,
  400. EndpointReconciler: endpointReconcilerConfig.Reconciler,
  401. EndpointInterval: endpointReconcilerConfig.Interval,
  402. SystemNamespaces: []string{api.NamespaceSystem},
  403. SystemNamespacesInterval: 1 * time.Minute,
  404. ServiceClusterIPRegistry: m.serviceClusterIPAllocator,
  405. ServiceClusterIPRange: m.ServiceClusterIPRange,
  406. ServiceClusterIPInterval: 3 * time.Minute,
  407. ServiceNodePortRegistry: m.serviceNodePortAllocator,
  408. ServiceNodePortRange: m.ServiceNodePortRange,
  409. ServiceNodePortInterval: 3 * time.Minute,
  410. PublicIP: m.ClusterIP,
  411. ServiceIP: m.ServiceReadWriteIP,
  412. ServicePort: m.ServiceReadWritePort,
  413. ExtraServicePorts: m.ExtraServicePorts,
  414. ExtraEndpointPorts: m.ExtraEndpointPorts,
  415. PublicServicePort: m.PublicReadWritePort,
  416. KubernetesServiceNodePort: m.KubernetesServiceNodePort,
  417. }
  418. }
  419. func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
  420. serversToValidate := map[string]apiserver.Server{
  421. "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
  422. "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
  423. }
  424. for ix, machine := range c.StorageFactory.Backends() {
  425. etcdUrl, err := url.Parse(machine)
  426. if err != nil {
  427. glog.Errorf("Failed to parse etcd url for validation: %v", err)
  428. continue
  429. }
  430. var port int
  431. var addr string
  432. if strings.Contains(etcdUrl.Host, ":") {
  433. var portString string
  434. addr, portString, err = net.SplitHostPort(etcdUrl.Host)
  435. if err != nil {
  436. glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
  437. continue
  438. }
  439. port, _ = strconv.Atoi(portString)
  440. } else {
  441. addr = etcdUrl.Host
  442. port = 2379
  443. }
  444. // TODO: etcd health checking should be abstracted in the storage tier
  445. serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{
  446. Addr: addr,
  447. EnableHTTPS: etcdUrl.Scheme == "https",
  448. Port: port,
  449. Path: "/health",
  450. Validate: etcdutil.EtcdHealthCheck,
  451. }
  452. }
  453. return serversToValidate
  454. }
  455. // HasThirdPartyResource returns true if a particular third party resource currently installed.
  456. func (m *Master) HasThirdPartyResource(rsrc *extensions.ThirdPartyResource) (bool, error) {
  457. kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
  458. if err != nil {
  459. return false, err
  460. }
  461. path := makeThirdPartyPath(group)
  462. m.thirdPartyResourcesLock.Lock()
  463. defer m.thirdPartyResourcesLock.Unlock()
  464. entry := m.thirdPartyResources[path]
  465. if entry == nil {
  466. return false, nil
  467. }
  468. plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
  469. Group: group,
  470. Version: rsrc.Versions[0].Name,
  471. Kind: kind,
  472. })
  473. _, found := entry.storage[plural.Resource]
  474. return found, nil
  475. }
  476. func (m *Master) removeThirdPartyStorage(path, resource string) error {
  477. m.thirdPartyResourcesLock.Lock()
  478. defer m.thirdPartyResourcesLock.Unlock()
  479. entry, found := m.thirdPartyResources[path]
  480. if !found {
  481. return nil
  482. }
  483. storage, found := entry.storage[resource]
  484. if !found {
  485. return nil
  486. }
  487. if err := m.removeAllThirdPartyResources(storage); err != nil {
  488. return err
  489. }
  490. delete(entry.storage, resource)
  491. if len(entry.storage) == 0 {
  492. delete(m.thirdPartyResources, path)
  493. m.RemoveAPIGroupForDiscovery(getThirdPartyGroupName(path))
  494. } else {
  495. m.thirdPartyResources[path] = entry
  496. }
  497. return nil
  498. }
  499. // RemoveThirdPartyResource removes all resources matching `path`. Also deletes any stored data
  500. func (m *Master) RemoveThirdPartyResource(path string) error {
  501. ix := strings.LastIndex(path, "/")
  502. if ix == -1 {
  503. return fmt.Errorf("expected <api-group>/<resource-plural-name>, saw: %s", path)
  504. }
  505. resource := path[ix+1:]
  506. path = path[0:ix]
  507. if err := m.removeThirdPartyStorage(path, resource); err != nil {
  508. return err
  509. }
  510. services := m.HandlerContainer.RegisteredWebServices()
  511. for ix := range services {
  512. root := services[ix].RootPath()
  513. if root == path || strings.HasPrefix(root, path+"/") {
  514. m.HandlerContainer.Remove(services[ix])
  515. }
  516. }
  517. return nil
  518. }
  519. func (m *Master) removeAllThirdPartyResources(registry *thirdpartyresourcedataetcd.REST) error {
  520. ctx := api.NewDefaultContext()
  521. existingData, err := registry.List(ctx, nil)
  522. if err != nil {
  523. return err
  524. }
  525. list, ok := existingData.(*extensions.ThirdPartyResourceDataList)
  526. if !ok {
  527. return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list)
  528. }
  529. for ix := range list.Items {
  530. item := &list.Items[ix]
  531. if _, err := registry.Delete(ctx, item.Name, nil); err != nil {
  532. return err
  533. }
  534. }
  535. return nil
  536. }
  537. // ListThirdPartyResources lists all currently installed third party resources
  538. // The format is <path>/<resource-plural-name>
  539. func (m *Master) ListThirdPartyResources() []string {
  540. m.thirdPartyResourcesLock.RLock()
  541. defer m.thirdPartyResourcesLock.RUnlock()
  542. result := []string{}
  543. for key := range m.thirdPartyResources {
  544. for rsrc := range m.thirdPartyResources[key].storage {
  545. result = append(result, key+"/"+rsrc)
  546. }
  547. }
  548. return result
  549. }
  550. func (m *Master) getExistingThirdPartyResources(path string) []unversioned.APIResource {
  551. result := []unversioned.APIResource{}
  552. m.thirdPartyResourcesLock.Lock()
  553. defer m.thirdPartyResourcesLock.Unlock()
  554. entry := m.thirdPartyResources[path]
  555. if entry != nil {
  556. for key, obj := range entry.storage {
  557. result = append(result, unversioned.APIResource{
  558. Name: key,
  559. Namespaced: true,
  560. Kind: obj.Kind(),
  561. })
  562. }
  563. }
  564. return result
  565. }
  566. func (m *Master) hasThirdPartyGroupStorage(path string) bool {
  567. m.thirdPartyResourcesLock.Lock()
  568. defer m.thirdPartyResourcesLock.Unlock()
  569. _, found := m.thirdPartyResources[path]
  570. return found
  571. }
  572. func (m *Master) addThirdPartyResourceStorage(path, resource string, storage *thirdpartyresourcedataetcd.REST, apiGroup unversioned.APIGroup) {
  573. m.thirdPartyResourcesLock.Lock()
  574. defer m.thirdPartyResourcesLock.Unlock()
  575. entry, found := m.thirdPartyResources[path]
  576. if entry == nil {
  577. entry = &thirdPartyEntry{
  578. group: apiGroup,
  579. storage: map[string]*thirdpartyresourcedataetcd.REST{},
  580. }
  581. m.thirdPartyResources[path] = entry
  582. }
  583. entry.storage[resource] = storage
  584. if !found {
  585. m.AddAPIGroupForDiscovery(apiGroup)
  586. }
  587. }
  588. // InstallThirdPartyResource installs a third party resource specified by 'rsrc'. When a resource is
  589. // installed a corresponding RESTful resource is added as a valid path in the web service provided by
  590. // the master.
  591. //
  592. // For example, if you install a resource ThirdPartyResource{ Name: "foo.company.com", Versions: {"v1"} }
  593. // then the following RESTful resource is created on the server:
  594. // http://<host>/apis/company.com/v1/foos/...
  595. func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource) error {
  596. kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(rsrc)
  597. if err != nil {
  598. return err
  599. }
  600. plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
  601. Group: group,
  602. Version: rsrc.Versions[0].Name,
  603. Kind: kind,
  604. })
  605. path := makeThirdPartyPath(group)
  606. groupVersion := unversioned.GroupVersionForDiscovery{
  607. GroupVersion: group + "/" + rsrc.Versions[0].Name,
  608. Version: rsrc.Versions[0].Name,
  609. }
  610. apiGroup := unversioned.APIGroup{
  611. Name: group,
  612. Versions: []unversioned.GroupVersionForDiscovery{groupVersion},
  613. PreferredVersion: groupVersion,
  614. }
  615. thirdparty := m.thirdpartyapi(group, kind, rsrc.Versions[0].Name, plural.Resource)
  616. // If storage exists, this group has already been added, just update
  617. // the group with the new API
  618. if m.hasThirdPartyGroupStorage(path) {
  619. m.addThirdPartyResourceStorage(path, plural.Resource, thirdparty.Storage[plural.Resource].(*thirdpartyresourcedataetcd.REST), apiGroup)
  620. return thirdparty.UpdateREST(m.HandlerContainer)
  621. }
  622. if err := thirdparty.InstallREST(m.HandlerContainer); err != nil {
  623. glog.Errorf("Unable to setup thirdparty api: %v", err)
  624. }
  625. apiserver.AddGroupWebService(api.Codecs, m.HandlerContainer, path, apiGroup)
  626. m.addThirdPartyResourceStorage(path, plural.Resource, thirdparty.Storage[plural.Resource].(*thirdpartyresourcedataetcd.REST), apiGroup)
  627. apiserver.InstallServiceErrorHandler(api.Codecs, m.HandlerContainer, m.NewRequestInfoResolver(), []string{thirdparty.GroupVersion.String()})
  628. return nil
  629. }
  630. func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *apiserver.APIGroupVersion {
  631. resourceStorage := thirdpartyresourcedataetcd.NewREST(
  632. generic.RESTOptions{
  633. StorageConfig: m.thirdPartyStorageConfig,
  634. Decorator: generic.UndecoratedStorage,
  635. DeleteCollectionWorkers: m.deleteCollectionWorkers,
  636. },
  637. group,
  638. kind,
  639. )
  640. storage := map[string]rest.Storage{
  641. pluralResource: resourceStorage,
  642. }
  643. optionsExternalVersion := registered.GroupOrDie(api.GroupName).GroupVersion
  644. internalVersion := unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}
  645. externalVersion := unversioned.GroupVersion{Group: group, Version: version}
  646. apiRoot := makeThirdPartyPath("")
  647. return &apiserver.APIGroupVersion{
  648. Root: apiRoot,
  649. GroupVersion: externalVersion,
  650. RequestInfoResolver: m.NewRequestInfoResolver(),
  651. Creater: thirdpartyresourcedata.NewObjectCreator(group, version, api.Scheme),
  652. Convertor: api.Scheme,
  653. Copier: api.Scheme,
  654. Typer: api.Scheme,
  655. Mapper: thirdpartyresourcedata.NewMapper(registered.GroupOrDie(extensions.GroupName).RESTMapper, kind, version, group),
  656. Linker: registered.GroupOrDie(extensions.GroupName).SelfLinker,
  657. Storage: storage,
  658. OptionsExternalVersion: &optionsExternalVersion,
  659. Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
  660. ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
  661. Context: m.RequestContextMapper,
  662. MinRequestTimeout: m.MinRequestTimeout,
  663. ResourceLister: dynamicLister{m, makeThirdPartyPath(group)},
  664. }
  665. }
  666. func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions {
  667. storageConfig, err := c.StorageFactory.NewConfig(resource)
  668. if err != nil {
  669. glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
  670. }
  671. return generic.RESTOptions{
  672. StorageConfig: storageConfig,
  673. Decorator: m.StorageDecorator(),
  674. DeleteCollectionWorkers: m.deleteCollectionWorkers,
  675. ResourcePrefix: c.StorageFactory.ResourcePrefix(resource),
  676. }
  677. }
  678. // findExternalAddress returns ExternalIP of provided node with fallback to LegacyHostIP.
  679. func findExternalAddress(node *api.Node) (string, error) {
  680. var fallback string
  681. for ix := range node.Status.Addresses {
  682. addr := &node.Status.Addresses[ix]
  683. if addr.Type == api.NodeExternalIP {
  684. return addr.Address, nil
  685. }
  686. if fallback == "" && addr.Type == api.NodeLegacyHostIP {
  687. fallback = addr.Address
  688. }
  689. }
  690. if fallback != "" {
  691. return fallback, nil
  692. }
  693. return "", fmt.Errorf("Couldn't find external address: %v", node)
  694. }
  695. func (m *Master) getNodeAddresses() ([]string, error) {
  696. nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
  697. if err != nil {
  698. return nil, err
  699. }
  700. addrs := []string{}
  701. for ix := range nodes.Items {
  702. node := &nodes.Items[ix]
  703. addr, err := findExternalAddress(node)
  704. if err != nil {
  705. return nil, err
  706. }
  707. addrs = append(addrs, addr)
  708. }
  709. return addrs, nil
  710. }
  711. func (m *Master) IsTunnelSyncHealthy(req *http.Request) error {
  712. if m.tunneler == nil {
  713. return nil
  714. }
  715. lag := m.tunneler.SecondsSinceSync()
  716. if lag > 600 {
  717. return fmt.Errorf("Tunnel sync is taking to long: %d", lag)
  718. }
  719. sshKeyLag := m.tunneler.SecondsSinceSSHKeySync()
  720. if sshKeyLag > 600 {
  721. return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag)
  722. }
  723. return nil
  724. }
  725. func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
  726. ret := genericapiserver.NewResourceConfig()
  727. ret.EnableVersions(
  728. apiv1.SchemeGroupVersion,
  729. extensionsapiv1beta1.SchemeGroupVersion,
  730. batchapiv1.SchemeGroupVersion,
  731. authenticationv1beta1.SchemeGroupVersion,
  732. autoscalingapiv1.SchemeGroupVersion,
  733. appsapi.SchemeGroupVersion,
  734. policyapiv1alpha1.SchemeGroupVersion,
  735. rbacapi.SchemeGroupVersion,
  736. certificatesapiv1alpha1.SchemeGroupVersion,
  737. authorizationapiv1beta1.SchemeGroupVersion,
  738. )
  739. // all extensions resources except these are disabled by default
  740. ret.EnableResources(
  741. extensionsapiv1beta1.SchemeGroupVersion.WithResource("daemonsets"),
  742. extensionsapiv1beta1.SchemeGroupVersion.WithResource("deployments"),
  743. extensionsapiv1beta1.SchemeGroupVersion.WithResource("horizontalpodautoscalers"),
  744. extensionsapiv1beta1.SchemeGroupVersion.WithResource("ingresses"),
  745. extensionsapiv1beta1.SchemeGroupVersion.WithResource("jobs"),
  746. extensionsapiv1beta1.SchemeGroupVersion.WithResource("networkpolicies"),
  747. extensionsapiv1beta1.SchemeGroupVersion.WithResource("replicasets"),
  748. extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources"),
  749. extensionsapiv1beta1.SchemeGroupVersion.WithResource("storageclasses"),
  750. )
  751. return ret
  752. }