endpoints_controller.go 18 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // CAUTION: If you update code in this file, you may need to also update code
  14. // in contrib/mesos/pkg/service/endpoints_controller.go
  15. package endpoint
  16. import (
  17. "reflect"
  18. "strconv"
  19. "time"
  20. "encoding/json"
  21. "github.com/golang/glog"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/endpoints"
  24. "k8s.io/kubernetes/pkg/api/errors"
  25. podutil "k8s.io/kubernetes/pkg/api/pod"
  26. utilpod "k8s.io/kubernetes/pkg/api/pod"
  27. "k8s.io/kubernetes/pkg/client/cache"
  28. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/controller/framework"
  31. "k8s.io/kubernetes/pkg/controller/framework/informers"
  32. "k8s.io/kubernetes/pkg/labels"
  33. "k8s.io/kubernetes/pkg/runtime"
  34. "k8s.io/kubernetes/pkg/util/metrics"
  35. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  36. "k8s.io/kubernetes/pkg/util/sets"
  37. "k8s.io/kubernetes/pkg/util/wait"
  38. "k8s.io/kubernetes/pkg/util/workqueue"
  39. "k8s.io/kubernetes/pkg/watch"
  40. )
  41. const (
  42. // We'll attempt to recompute EVERY service's endpoints at least this
  43. // often. Higher numbers = lower CPU/network load; lower numbers =
  44. // shorter amount of time before a mistaken endpoint is corrected.
  45. FullServiceResyncPeriod = 30 * time.Second
  46. // We must avoid syncing service until the pod store has synced. If it hasn't synced, to
  47. // avoid a hot loop, we'll wait this long between checks.
  48. PodStoreSyncedPollPeriod = 100 * time.Millisecond
  49. // An annotation on the Service denoting if the endpoints controller should
  50. // go ahead and create endpoints for unready pods. This annotation is
  51. // currently only used by PetSets, where we need the pet to be DNS
  52. // resolvable during initialization. In this situation we create a headless
  53. // service just for the PetSet, and clients shouldn't be using this Service
  54. // for anything so unready endpoints don't matter.
  55. TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
  56. )
  57. var (
  58. keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
  59. )
  60. // NewEndpointController returns a new *EndpointController.
  61. func NewEndpointController(podInformer framework.SharedIndexInformer, client *clientset.Clientset) *EndpointController {
  62. if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil {
  63. metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter())
  64. }
  65. e := &EndpointController{
  66. client: client,
  67. queue: workqueue.NewNamed("endpoint"),
  68. }
  69. e.serviceStore.Store, e.serviceController = framework.NewInformer(
  70. &cache.ListWatch{
  71. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  72. return e.client.Core().Services(api.NamespaceAll).List(options)
  73. },
  74. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  75. return e.client.Core().Services(api.NamespaceAll).Watch(options)
  76. },
  77. },
  78. &api.Service{},
  79. // TODO: Can we have much longer period here?
  80. FullServiceResyncPeriod,
  81. framework.ResourceEventHandlerFuncs{
  82. AddFunc: e.enqueueService,
  83. UpdateFunc: func(old, cur interface{}) {
  84. e.enqueueService(cur)
  85. },
  86. DeleteFunc: e.enqueueService,
  87. },
  88. )
  89. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  90. AddFunc: e.addPod,
  91. UpdateFunc: e.updatePod,
  92. DeleteFunc: e.deletePod,
  93. })
  94. e.podStore.Indexer = podInformer.GetIndexer()
  95. e.podController = podInformer.GetController()
  96. e.podStoreSynced = podInformer.HasSynced
  97. return e
  98. }
  99. // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
  100. func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
  101. podInformer := informers.NewPodInformer(client, resyncPeriod())
  102. e := NewEndpointController(podInformer, client)
  103. e.internalPodInformer = podInformer
  104. return e
  105. }
  106. // EndpointController manages selector-based service endpoints.
  107. type EndpointController struct {
  108. client *clientset.Clientset
  109. serviceStore cache.StoreToServiceLister
  110. podStore cache.StoreToPodLister
  111. // internalPodInformer is used to hold a personal informer. If we're using
  112. // a normal shared informer, then the informer will be started for us. If
  113. // we have a personal informer, we must start it ourselves. If you start
  114. // the controller using NewEndpointController(passing SharedInformer), this
  115. // will be null
  116. internalPodInformer framework.SharedIndexInformer
  117. // Services that need to be updated. A channel is inappropriate here,
  118. // because it allows services with lots of pods to be serviced much
  119. // more often than services with few pods; it also would cause a
  120. // service that's inserted multiple times to be processed more than
  121. // necessary.
  122. queue *workqueue.Type
  123. // Since we join two objects, we'll watch both of them with
  124. // controllers.
  125. serviceController *framework.Controller
  126. podController framework.ControllerInterface
  127. // podStoreSynced returns true if the pod store has been synced at least once.
  128. // Added as a member to the struct to allow injection for testing.
  129. podStoreSynced func() bool
  130. }
  131. // Runs e; will not return until stopCh is closed. workers determines how many
  132. // endpoints will be handled in parallel.
  133. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
  134. defer utilruntime.HandleCrash()
  135. go e.serviceController.Run(stopCh)
  136. go e.podController.Run(stopCh)
  137. for i := 0; i < workers; i++ {
  138. go wait.Until(e.worker, time.Second, stopCh)
  139. }
  140. go func() {
  141. defer utilruntime.HandleCrash()
  142. time.Sleep(5 * time.Minute) // give time for our cache to fill
  143. e.checkLeftoverEndpoints()
  144. }()
  145. if e.internalPodInformer != nil {
  146. go e.internalPodInformer.Run(stopCh)
  147. }
  148. <-stopCh
  149. e.queue.ShutDown()
  150. }
  151. func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (sets.String, error) {
  152. set := sets.String{}
  153. services, err := e.serviceStore.GetPodServices(pod)
  154. if err != nil {
  155. // don't log this error because this function makes pointless
  156. // errors when no services match.
  157. return set, nil
  158. }
  159. for i := range services {
  160. key, err := keyFunc(&services[i])
  161. if err != nil {
  162. return nil, err
  163. }
  164. set.Insert(key)
  165. }
  166. return set, nil
  167. }
  168. // When a pod is added, figure out what services it will be a member of and
  169. // enqueue them. obj must have *api.Pod type.
  170. func (e *EndpointController) addPod(obj interface{}) {
  171. pod := obj.(*api.Pod)
  172. services, err := e.getPodServiceMemberships(pod)
  173. if err != nil {
  174. glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
  175. return
  176. }
  177. for key := range services {
  178. e.queue.Add(key)
  179. }
  180. }
  181. // When a pod is updated, figure out what services it used to be a member of
  182. // and what services it will be a member of, and enqueue the union of these.
  183. // old and cur must be *api.Pod types.
  184. func (e *EndpointController) updatePod(old, cur interface{}) {
  185. newPod := cur.(*api.Pod)
  186. oldPod := old.(*api.Pod)
  187. if newPod.ResourceVersion == oldPod.ResourceVersion {
  188. // Periodic resync will send update events for all known pods.
  189. // Two different versions of the same pod will always have different RVs.
  190. return
  191. }
  192. services, err := e.getPodServiceMemberships(newPod)
  193. if err != nil {
  194. glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
  195. return
  196. }
  197. // Only need to get the old services if the labels changed.
  198. if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
  199. !hostNameAndDomainAreEqual(newPod, oldPod) {
  200. oldServices, err := e.getPodServiceMemberships(oldPod)
  201. if err != nil {
  202. glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
  203. return
  204. }
  205. services = services.Union(oldServices)
  206. }
  207. for key := range services {
  208. e.queue.Add(key)
  209. }
  210. }
  211. func hostNameAndDomainAreEqual(pod1, pod2 *api.Pod) bool {
  212. return getHostname(pod1) == getHostname(pod2) &&
  213. getSubdomain(pod1) == getSubdomain(pod2)
  214. }
  215. func getHostname(pod *api.Pod) string {
  216. if len(pod.Spec.Hostname) > 0 {
  217. return pod.Spec.Hostname
  218. }
  219. if pod.Annotations != nil {
  220. return pod.Annotations[utilpod.PodHostnameAnnotation]
  221. }
  222. return ""
  223. }
  224. func getSubdomain(pod *api.Pod) string {
  225. if len(pod.Spec.Subdomain) > 0 {
  226. return pod.Spec.Subdomain
  227. }
  228. if pod.Annotations != nil {
  229. return pod.Annotations[utilpod.PodSubdomainAnnotation]
  230. }
  231. return ""
  232. }
  233. // When a pod is deleted, enqueue the services the pod used to be a member of.
  234. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
  235. func (e *EndpointController) deletePod(obj interface{}) {
  236. if _, ok := obj.(*api.Pod); ok {
  237. // Enqueue all the services that the pod used to be a member
  238. // of. This happens to be exactly the same thing we do when a
  239. // pod is added.
  240. e.addPod(obj)
  241. return
  242. }
  243. podKey, err := keyFunc(obj)
  244. if err != nil {
  245. glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
  246. return
  247. }
  248. glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
  249. // TODO: keep a map of pods to services to handle this condition.
  250. }
  251. // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
  252. func (e *EndpointController) enqueueService(obj interface{}) {
  253. key, err := keyFunc(obj)
  254. if err != nil {
  255. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  256. return
  257. }
  258. e.queue.Add(key)
  259. }
  260. // worker runs a worker thread that just dequeues items, processes them, and
  261. // marks them done. You may run as many of these in parallel as you wish; the
  262. // workqueue guarantees that they will not end up processing the same service
  263. // at the same time.
  264. func (e *EndpointController) worker() {
  265. for {
  266. func() {
  267. key, quit := e.queue.Get()
  268. if quit {
  269. return
  270. }
  271. // Use defer: in the unlikely event that there's a
  272. // panic, we'd still like this to get marked done--
  273. // otherwise the controller will not be able to sync
  274. // this service again until it is restarted.
  275. defer e.queue.Done(key)
  276. e.syncService(key.(string))
  277. }()
  278. }
  279. }
  280. func (e *EndpointController) syncService(key string) {
  281. startTime := time.Now()
  282. defer func() {
  283. glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
  284. }()
  285. if !e.podStoreSynced() {
  286. // Sleep so we give the pod reflector goroutine a chance to run.
  287. time.Sleep(PodStoreSyncedPollPeriod)
  288. glog.Infof("Waiting for pods controller to sync, requeuing service %v", key)
  289. e.queue.Add(key)
  290. return
  291. }
  292. obj, exists, err := e.serviceStore.Store.GetByKey(key)
  293. if err != nil || !exists {
  294. // Delete the corresponding endpoint, as the service has been deleted.
  295. // TODO: Please note that this will delete an endpoint when a
  296. // service is deleted. However, if we're down at the time when
  297. // the service is deleted, we will miss that deletion, so this
  298. // doesn't completely solve the problem. See #6877.
  299. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  300. if err != nil {
  301. glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
  302. // Don't retry, as the key isn't going to magically become understandable.
  303. return
  304. }
  305. err = e.client.Endpoints(namespace).Delete(name, nil)
  306. if err != nil && !errors.IsNotFound(err) {
  307. glog.Errorf("Error deleting endpoint %q: %v", key, err)
  308. e.queue.Add(key) // Retry
  309. }
  310. return
  311. }
  312. service := obj.(*api.Service)
  313. if service.Spec.Selector == nil {
  314. // services without a selector receive no endpoints from this controller;
  315. // these services will receive the endpoints that are created out-of-band via the REST API.
  316. return
  317. }
  318. glog.V(5).Infof("About to update endpoints for service %q", key)
  319. pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
  320. if err != nil {
  321. // Since we're getting stuff from a local cache, it is
  322. // basically impossible to get this error.
  323. glog.Errorf("Error syncing service %q: %v", key, err)
  324. e.queue.Add(key) // Retry
  325. return
  326. }
  327. subsets := []api.EndpointSubset{}
  328. podHostNames := map[string]endpoints.HostRecord{}
  329. var tolerateUnreadyEndpoints bool
  330. if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
  331. b, err := strconv.ParseBool(v)
  332. if err == nil {
  333. tolerateUnreadyEndpoints = b
  334. } else {
  335. glog.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err)
  336. }
  337. }
  338. for i := range pods {
  339. // TODO: Do we need to copy here?
  340. pod := &(*pods[i])
  341. for i := range service.Spec.Ports {
  342. servicePort := &service.Spec.Ports[i]
  343. portName := servicePort.Name
  344. portProto := servicePort.Protocol
  345. portNum, err := podutil.FindPort(pod, servicePort)
  346. if err != nil {
  347. glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
  348. continue
  349. }
  350. if len(pod.Status.PodIP) == 0 {
  351. glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
  352. continue
  353. }
  354. if pod.DeletionTimestamp != nil {
  355. glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
  356. continue
  357. }
  358. epp := api.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
  359. epa := api.EndpointAddress{
  360. IP: pod.Status.PodIP,
  361. NodeName: &pod.Spec.NodeName,
  362. TargetRef: &api.ObjectReference{
  363. Kind: "Pod",
  364. Namespace: pod.ObjectMeta.Namespace,
  365. Name: pod.ObjectMeta.Name,
  366. UID: pod.ObjectMeta.UID,
  367. ResourceVersion: pod.ObjectMeta.ResourceVersion,
  368. }}
  369. hostname := getHostname(pod)
  370. if len(hostname) > 0 &&
  371. getSubdomain(pod) == service.Name &&
  372. service.Namespace == pod.Namespace {
  373. hostRecord := endpoints.HostRecord{
  374. HostName: hostname,
  375. }
  376. // TODO: stop populating podHostNames annotation in 1.4
  377. podHostNames[string(pod.Status.PodIP)] = hostRecord
  378. epa.Hostname = hostname
  379. }
  380. if tolerateUnreadyEndpoints || api.IsPodReady(pod) {
  381. subsets = append(subsets, api.EndpointSubset{
  382. Addresses: []api.EndpointAddress{epa},
  383. Ports: []api.EndpointPort{epp},
  384. })
  385. } else {
  386. glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
  387. subsets = append(subsets, api.EndpointSubset{
  388. NotReadyAddresses: []api.EndpointAddress{epa},
  389. Ports: []api.EndpointPort{epp},
  390. })
  391. }
  392. }
  393. }
  394. subsets = endpoints.RepackSubsets(subsets)
  395. // See if there's actually an update here.
  396. currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
  397. if err != nil {
  398. if errors.IsNotFound(err) {
  399. currentEndpoints = &api.Endpoints{
  400. ObjectMeta: api.ObjectMeta{
  401. Name: service.Name,
  402. Labels: service.Labels,
  403. },
  404. }
  405. } else {
  406. glog.Errorf("Error getting endpoints: %v", err)
  407. e.queue.Add(key) // Retry
  408. return
  409. }
  410. }
  411. serializedPodHostNames := ""
  412. if len(podHostNames) > 0 {
  413. b, err := json.Marshal(podHostNames)
  414. if err != nil {
  415. glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err)
  416. e.queue.Add(key) // Retry
  417. return
  418. }
  419. serializedPodHostNames = string(b)
  420. }
  421. newAnnotations := make(map[string]string)
  422. newAnnotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
  423. if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
  424. reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
  425. glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
  426. return
  427. }
  428. newEndpoints := currentEndpoints
  429. newEndpoints.Subsets = subsets
  430. newEndpoints.Labels = service.Labels
  431. if newEndpoints.Annotations == nil {
  432. newEndpoints.Annotations = make(map[string]string)
  433. }
  434. if len(serializedPodHostNames) == 0 {
  435. delete(newEndpoints.Annotations, endpoints.PodHostnamesAnnotation)
  436. } else {
  437. newEndpoints.Annotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
  438. }
  439. createEndpoints := len(currentEndpoints.ResourceVersion) == 0
  440. if createEndpoints {
  441. // No previous endpoints, create them
  442. _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
  443. } else {
  444. // Pre-existing
  445. _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
  446. }
  447. if err != nil {
  448. if createEndpoints && errors.IsForbidden(err) {
  449. // A request is forbidden primarily for two reasons:
  450. // 1. namespace is terminating, endpoint creation is not allowed by default.
  451. // 2. policy is misconfigured, in which case no service would function anywhere.
  452. // Given the frequency of 1, we log at a lower level.
  453. glog.V(5).Infof("Forbidden from creating endpoints: %v", err)
  454. } else {
  455. utilruntime.HandleError(err)
  456. }
  457. e.queue.Add(key) // Retry
  458. }
  459. }
  460. // checkLeftoverEndpoints lists all currently existing endpoints and adds their
  461. // service to the queue. This will detect endpoints that exist with no
  462. // corresponding service; these endpoints need to be deleted. We only need to
  463. // do this once on startup, because in steady-state these are detected (but
  464. // some stragglers could have been left behind if the endpoint controller
  465. // reboots).
  466. func (e *EndpointController) checkLeftoverEndpoints() {
  467. list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{})
  468. if err != nil {
  469. glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
  470. return
  471. }
  472. for i := range list.Items {
  473. ep := &list.Items[i]
  474. key, err := keyFunc(ep)
  475. if err != nil {
  476. glog.Errorf("Unable to get key for endpoint %#v", ep)
  477. continue
  478. }
  479. e.queue.Add(key)
  480. }
  481. }