servicecontroller.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package service
  14. import (
  15. "fmt"
  16. "sort"
  17. "sync"
  18. "time"
  19. "reflect"
  20. "github.com/golang/glog"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/client/cache"
  24. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  25. unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  26. "k8s.io/kubernetes/pkg/client/record"
  27. "k8s.io/kubernetes/pkg/cloudprovider"
  28. "k8s.io/kubernetes/pkg/controller"
  29. "k8s.io/kubernetes/pkg/controller/framework"
  30. "k8s.io/kubernetes/pkg/fields"
  31. pkg_runtime "k8s.io/kubernetes/pkg/runtime"
  32. "k8s.io/kubernetes/pkg/util/metrics"
  33. "k8s.io/kubernetes/pkg/util/runtime"
  34. "k8s.io/kubernetes/pkg/util/wait"
  35. "k8s.io/kubernetes/pkg/util/workqueue"
  36. "k8s.io/kubernetes/pkg/watch"
  37. )
  38. const (
  39. // Interval of synchoronizing service status from apiserver
  40. serviceSyncPeriod = 30 * time.Second
  41. // Interval of synchoronizing node status from apiserver
  42. nodeSyncPeriod = 100 * time.Second
  43. // How long to wait before retrying the processing of a service change.
  44. // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
  45. // should be changed appropriately.
  46. minRetryDelay = 5 * time.Second
  47. maxRetryDelay = 300 * time.Second
  48. clientRetryCount = 5
  49. clientRetryInterval = 5 * time.Second
  50. retryable = true
  51. notRetryable = false
  52. doNotRetry = time.Duration(0)
  53. )
  54. type cachedService struct {
  55. // The cached state of the service
  56. state *api.Service
  57. // Controls error back-off
  58. lastRetryDelay time.Duration
  59. }
  60. type serviceCache struct {
  61. mu sync.Mutex // protects serviceMap
  62. serviceMap map[string]*cachedService
  63. }
  64. type ServiceController struct {
  65. cloud cloudprovider.Interface
  66. knownHosts []string
  67. servicesToUpdate []*api.Service
  68. kubeClient clientset.Interface
  69. clusterName string
  70. balancer cloudprovider.LoadBalancer
  71. zone cloudprovider.Zone
  72. cache *serviceCache
  73. // A store of services, populated by the serviceController
  74. serviceStore cache.StoreToServiceLister
  75. // Watches changes to all services
  76. serviceController *framework.Controller
  77. eventBroadcaster record.EventBroadcaster
  78. eventRecorder record.EventRecorder
  79. nodeLister cache.StoreToNodeLister
  80. // services that need to be synced
  81. workingQueue workqueue.DelayingInterface
  82. }
  83. // New returns a new service controller to keep cloud provider service resources
  84. // (like load balancers) in sync with the registry.
  85. func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
  86. broadcaster := record.NewBroadcaster()
  87. broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  88. recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
  89. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  90. metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  91. }
  92. s := &ServiceController{
  93. cloud: cloud,
  94. knownHosts: []string{},
  95. kubeClient: kubeClient,
  96. clusterName: clusterName,
  97. cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
  98. eventBroadcaster: broadcaster,
  99. eventRecorder: recorder,
  100. nodeLister: cache.StoreToNodeLister{
  101. Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
  102. },
  103. workingQueue: workqueue.NewDelayingQueue(),
  104. }
  105. s.serviceStore.Store, s.serviceController = framework.NewInformer(
  106. &cache.ListWatch{
  107. ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
  108. return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
  109. },
  110. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  111. return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
  112. },
  113. },
  114. &api.Service{},
  115. serviceSyncPeriod,
  116. framework.ResourceEventHandlerFuncs{
  117. AddFunc: s.enqueueService,
  118. UpdateFunc: func(old, cur interface{}) {
  119. oldSvc, ok1 := old.(*api.Service)
  120. curSvc, ok2 := cur.(*api.Service)
  121. if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
  122. s.enqueueService(cur)
  123. }
  124. },
  125. DeleteFunc: s.enqueueService,
  126. },
  127. )
  128. if err := s.init(); err != nil {
  129. return nil, err
  130. }
  131. return s, nil
  132. }
  133. // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
  134. func (s *ServiceController) enqueueService(obj interface{}) {
  135. key, err := controller.KeyFunc(obj)
  136. if err != nil {
  137. glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
  138. return
  139. }
  140. s.workingQueue.Add(key)
  141. }
  142. // Run starts a background goroutine that watches for changes to services that
  143. // have (or had) LoadBalancers=true and ensures that they have
  144. // load balancers created and deleted appropriately.
  145. // serviceSyncPeriod controls how often we check the cluster's services to
  146. // ensure that the correct load balancers exist.
  147. // nodeSyncPeriod controls how often we check the cluster's nodes to determine
  148. // if load balancers need to be updated to point to a new set.
  149. //
  150. // It's an error to call Run() more than once for a given ServiceController
  151. // object.
  152. func (s *ServiceController) Run(workers int) {
  153. defer runtime.HandleCrash()
  154. go s.serviceController.Run(wait.NeverStop)
  155. for i := 0; i < workers; i++ {
  156. go wait.Until(s.worker, time.Second, wait.NeverStop)
  157. }
  158. nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "nodes", api.NamespaceAll, fields.Everything())
  159. cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
  160. go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
  161. }
  162. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  163. // It enforces that the syncHandler is never invoked concurrently with the same key.
  164. func (s *ServiceController) worker() {
  165. for {
  166. func() {
  167. key, quit := s.workingQueue.Get()
  168. if quit {
  169. return
  170. }
  171. defer s.workingQueue.Done(key)
  172. err := s.syncService(key.(string))
  173. if err != nil {
  174. glog.Errorf("Error syncing service: %v", err)
  175. }
  176. }()
  177. }
  178. }
  179. func (s *ServiceController) init() error {
  180. if s.cloud == nil {
  181. return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail.")
  182. }
  183. balancer, ok := s.cloud.LoadBalancer()
  184. if !ok {
  185. return fmt.Errorf("the cloud provider does not support external load balancers.")
  186. }
  187. s.balancer = balancer
  188. zones, ok := s.cloud.Zones()
  189. if !ok {
  190. return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating load balancers.")
  191. }
  192. zone, err := zones.GetZone()
  193. if err != nil {
  194. return fmt.Errorf("failed to get zone from cloud provider, will not be able to create load balancers: %v", err)
  195. }
  196. s.zone = zone
  197. return nil
  198. }
  199. // Returns an error if processing the service update failed, along with a time.Duration
  200. // indicating whether processing should be retried; zero means no-retry; otherwise
  201. // we should retry in that Duration.
  202. func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
  203. // cache the service, we need the info for service deletion
  204. cachedService.state = service
  205. err, retry := s.createLoadBalancerIfNeeded(key, service)
  206. if err != nil {
  207. message := "Error creating load balancer"
  208. if retry {
  209. message += " (will retry): "
  210. } else {
  211. message += " (will not retry): "
  212. }
  213. message += err.Error()
  214. s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
  215. return err, cachedService.nextRetryDelay()
  216. }
  217. // Always update the cache upon success.
  218. // NOTE: Since we update the cached service if and only if we successfully
  219. // processed it, a cached service being nil implies that it hasn't yet
  220. // been successfully processed.
  221. s.cache.set(key, cachedService)
  222. cachedService.resetRetryDelay()
  223. return nil, doNotRetry
  224. }
  225. // Returns whatever error occurred along with a boolean indicator of whether it
  226. // should be retried.
  227. func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
  228. // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
  229. // which may involve service interruption. Also, we would like user-friendly events.
  230. // Save the state so we can avoid a write if it doesn't change
  231. previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
  232. if !wantsLoadBalancer(service) {
  233. needDelete := true
  234. _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
  235. if err != nil {
  236. return fmt.Errorf("Error getting LB for service %s: %v", key, err), retryable
  237. }
  238. if !exists {
  239. needDelete = false
  240. }
  241. if needDelete {
  242. glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
  243. s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  244. if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
  245. return err, retryable
  246. }
  247. s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  248. }
  249. service.Status.LoadBalancer = api.LoadBalancerStatus{}
  250. } else {
  251. glog.V(2).Infof("Ensuring LB for service %s", key)
  252. // TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
  253. // The load balancer doesn't exist yet, so create it.
  254. s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
  255. err := s.createLoadBalancer(service)
  256. if err != nil {
  257. return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
  258. }
  259. s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
  260. }
  261. // Write the state if changed
  262. // TODO: Be careful here ... what if there were other changes to the service?
  263. if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
  264. if err := s.persistUpdate(service); err != nil {
  265. return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
  266. }
  267. } else {
  268. glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
  269. }
  270. return nil, notRetryable
  271. }
  272. func (s *ServiceController) persistUpdate(service *api.Service) error {
  273. var err error
  274. for i := 0; i < clientRetryCount; i++ {
  275. _, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
  276. if err == nil {
  277. return nil
  278. }
  279. // If the object no longer exists, we don't want to recreate it. Just bail
  280. // out so that we can process the delete, which we should soon be receiving
  281. // if we haven't already.
  282. if errors.IsNotFound(err) {
  283. glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
  284. service.Namespace, service.Name, err)
  285. return nil
  286. }
  287. // TODO: Try to resolve the conflict if the change was unrelated to load
  288. // balancer status. For now, just rely on the fact that we'll
  289. // also process the update that caused the resource version to change.
  290. if errors.IsConflict(err) {
  291. glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
  292. service.Namespace, service.Name, err)
  293. return nil
  294. }
  295. glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
  296. service.Namespace, service.Name, err)
  297. time.Sleep(clientRetryInterval)
  298. }
  299. return err
  300. }
  301. func (s *ServiceController) createLoadBalancer(service *api.Service) error {
  302. nodes, err := s.nodeLister.List()
  303. if err != nil {
  304. return err
  305. }
  306. // - Only one protocol supported per service
  307. // - Not all cloud providers support all protocols and the next step is expected to return
  308. // an error for unsupported protocols
  309. status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
  310. if err != nil {
  311. return err
  312. } else {
  313. service.Status.LoadBalancer = *status
  314. }
  315. return nil
  316. }
  317. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  318. // already know about.
  319. func (s *serviceCache) ListKeys() []string {
  320. s.mu.Lock()
  321. defer s.mu.Unlock()
  322. keys := make([]string, 0, len(s.serviceMap))
  323. for k := range s.serviceMap {
  324. keys = append(keys, k)
  325. }
  326. return keys
  327. }
  328. // GetByKey returns the value stored in the serviceMap under the given key
  329. func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
  330. s.mu.Lock()
  331. defer s.mu.Unlock()
  332. if v, ok := s.serviceMap[key]; ok {
  333. return v, true, nil
  334. }
  335. return nil, false, nil
  336. }
  337. // ListKeys implements the interface required by DeltaFIFO to list the keys we
  338. // already know about.
  339. func (s *serviceCache) allServices() []*api.Service {
  340. s.mu.Lock()
  341. defer s.mu.Unlock()
  342. services := make([]*api.Service, 0, len(s.serviceMap))
  343. for _, v := range s.serviceMap {
  344. services = append(services, v.state)
  345. }
  346. return services
  347. }
  348. func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
  349. s.mu.Lock()
  350. defer s.mu.Unlock()
  351. service, ok := s.serviceMap[serviceName]
  352. return service, ok
  353. }
  354. func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
  355. s.mu.Lock()
  356. defer s.mu.Unlock()
  357. service, ok := s.serviceMap[serviceName]
  358. if !ok {
  359. service = &cachedService{}
  360. s.serviceMap[serviceName] = service
  361. }
  362. return service
  363. }
  364. func (s *serviceCache) set(serviceName string, service *cachedService) {
  365. s.mu.Lock()
  366. defer s.mu.Unlock()
  367. s.serviceMap[serviceName] = service
  368. }
  369. func (s *serviceCache) delete(serviceName string) {
  370. s.mu.Lock()
  371. defer s.mu.Unlock()
  372. delete(s.serviceMap, serviceName)
  373. }
  374. func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api.Service) bool {
  375. if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
  376. return false
  377. }
  378. if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
  379. s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
  380. oldService.Spec.Type, newService.Spec.Type)
  381. return true
  382. }
  383. if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
  384. return true
  385. }
  386. if !loadBalancerIPsAreEqual(oldService, newService) {
  387. s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
  388. oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
  389. return true
  390. }
  391. if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
  392. s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
  393. len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
  394. return true
  395. }
  396. for i := range oldService.Spec.ExternalIPs {
  397. if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
  398. s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
  399. newService.Spec.ExternalIPs[i])
  400. return true
  401. }
  402. }
  403. if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
  404. return true
  405. }
  406. if oldService.UID != newService.UID {
  407. s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
  408. oldService.UID, newService.UID)
  409. return true
  410. }
  411. return false
  412. }
  413. func (s *ServiceController) loadBalancerName(service *api.Service) string {
  414. return cloudprovider.GetLoadBalancerName(service)
  415. }
  416. func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
  417. var protocol api.Protocol
  418. ports := []*api.ServicePort{}
  419. for i := range service.Spec.Ports {
  420. sp := &service.Spec.Ports[i]
  421. // The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
  422. ports = append(ports, sp)
  423. if protocol == "" {
  424. protocol = sp.Protocol
  425. } else if protocol != sp.Protocol && wantsLoadBalancer(service) {
  426. // TODO: Convert error messages to use event recorder
  427. return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
  428. }
  429. }
  430. return ports, nil
  431. }
  432. func portsEqualForLB(x, y *api.Service) bool {
  433. xPorts, err := getPortsForLB(x)
  434. if err != nil {
  435. return false
  436. }
  437. yPorts, err := getPortsForLB(y)
  438. if err != nil {
  439. return false
  440. }
  441. return portSlicesEqualForLB(xPorts, yPorts)
  442. }
  443. func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
  444. if len(x) != len(y) {
  445. return false
  446. }
  447. for i := range x {
  448. if !portEqualForLB(x[i], y[i]) {
  449. return false
  450. }
  451. }
  452. return true
  453. }
  454. func portEqualForLB(x, y *api.ServicePort) bool {
  455. // TODO: Should we check name? (In theory, an LB could expose it)
  456. if x.Name != y.Name {
  457. return false
  458. }
  459. if x.Protocol != y.Protocol {
  460. return false
  461. }
  462. if x.Port != y.Port {
  463. return false
  464. }
  465. if x.NodePort != y.NodePort {
  466. return false
  467. }
  468. // We don't check TargetPort; that is not relevant for load balancing
  469. // TODO: Should we blank it out? Or just check it anyway?
  470. return true
  471. }
  472. func intSlicesEqual(x, y []int) bool {
  473. if len(x) != len(y) {
  474. return false
  475. }
  476. if !sort.IntsAreSorted(x) {
  477. sort.Ints(x)
  478. }
  479. if !sort.IntsAreSorted(y) {
  480. sort.Ints(y)
  481. }
  482. for i := range x {
  483. if x[i] != y[i] {
  484. return false
  485. }
  486. }
  487. return true
  488. }
  489. func stringSlicesEqual(x, y []string) bool {
  490. if len(x) != len(y) {
  491. return false
  492. }
  493. if !sort.StringsAreSorted(x) {
  494. sort.Strings(x)
  495. }
  496. if !sort.StringsAreSorted(y) {
  497. sort.Strings(y)
  498. }
  499. for i := range x {
  500. if x[i] != y[i] {
  501. return false
  502. }
  503. }
  504. return true
  505. }
  506. func includeNodeFromNodeList(node *api.Node) bool {
  507. return !node.Spec.Unschedulable
  508. }
  509. func hostsFromNodeList(list *api.NodeList) []string {
  510. result := []string{}
  511. for ix := range list.Items {
  512. if includeNodeFromNodeList(&list.Items[ix]) {
  513. result = append(result, list.Items[ix].Name)
  514. }
  515. }
  516. return result
  517. }
  518. func hostsFromNodeSlice(nodes []*api.Node) []string {
  519. result := []string{}
  520. for _, node := range nodes {
  521. if includeNodeFromNodeList(node) {
  522. result = append(result, node.Name)
  523. }
  524. }
  525. return result
  526. }
  527. func getNodeConditionPredicate() cache.NodeConditionPredicate {
  528. return func(node *api.Node) bool {
  529. // We add the master to the node list, but its unschedulable. So we use this to filter
  530. // the master.
  531. // TODO: Use a node annotation to indicate the master
  532. if node.Spec.Unschedulable {
  533. return false
  534. }
  535. // If we have no info, don't accept
  536. if len(node.Status.Conditions) == 0 {
  537. return false
  538. }
  539. for _, cond := range node.Status.Conditions {
  540. // We consider the node for load balancing only when its NodeReady condition status
  541. // is ConditionTrue
  542. if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue {
  543. glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
  544. return false
  545. }
  546. }
  547. return true
  548. }
  549. }
  550. // nodeSyncLoop handles updating the hosts pointed to by all load
  551. // balancers whenever the set of nodes in the cluster changes.
  552. func (s *ServiceController) nodeSyncLoop() {
  553. nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
  554. if err != nil {
  555. glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
  556. return
  557. }
  558. newHosts := hostsFromNodeSlice(nodes)
  559. if stringSlicesEqual(newHosts, s.knownHosts) {
  560. // The set of nodes in the cluster hasn't changed, but we can retry
  561. // updating any services that we failed to update last time around.
  562. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  563. return
  564. }
  565. glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
  566. // Try updating all services, and save the ones that fail to try again next
  567. // round.
  568. s.servicesToUpdate = s.cache.allServices()
  569. numServices := len(s.servicesToUpdate)
  570. s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
  571. glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
  572. numServices-len(s.servicesToUpdate), numServices)
  573. s.knownHosts = newHosts
  574. }
  575. // updateLoadBalancerHosts updates all existing load balancers so that
  576. // they will match the list of hosts provided.
  577. // Returns the list of services that couldn't be updated.
  578. func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
  579. for _, service := range services {
  580. func() {
  581. if service == nil {
  582. return
  583. }
  584. if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
  585. glog.Errorf("External error while updating load balancer: %v.", err)
  586. servicesToRetry = append(servicesToRetry, service)
  587. }
  588. }()
  589. }
  590. return servicesToRetry
  591. }
  592. // Updates the load balancer of a service, assuming we hold the mutex
  593. // associated with the service.
  594. func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error {
  595. if !wantsLoadBalancer(service) {
  596. return nil
  597. }
  598. // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
  599. err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
  600. if err == nil {
  601. s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
  602. return nil
  603. }
  604. // It's only an actual error if the load balancer still exists.
  605. if _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service); err != nil {
  606. glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
  607. } else if !exists {
  608. return nil
  609. }
  610. s.eventRecorder.Eventf(service, api.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
  611. return err
  612. }
  613. func wantsLoadBalancer(service *api.Service) bool {
  614. return service.Spec.Type == api.ServiceTypeLoadBalancer
  615. }
  616. func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
  617. return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
  618. }
  619. // Computes the next retry, using exponential backoff
  620. // mutex must be held.
  621. func (s *cachedService) nextRetryDelay() time.Duration {
  622. s.lastRetryDelay = s.lastRetryDelay * 2
  623. if s.lastRetryDelay < minRetryDelay {
  624. s.lastRetryDelay = minRetryDelay
  625. }
  626. if s.lastRetryDelay > maxRetryDelay {
  627. s.lastRetryDelay = maxRetryDelay
  628. }
  629. return s.lastRetryDelay
  630. }
  631. // Resets the retry exponential backoff. mutex must be held.
  632. func (s *cachedService) resetRetryDelay() {
  633. s.lastRetryDelay = time.Duration(0)
  634. }
  635. // syncService will sync the Service with the given key if it has had its expectations fulfilled,
  636. // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
  637. // invoked concurrently with the same key.
  638. func (s *ServiceController) syncService(key string) error {
  639. startTime := time.Now()
  640. var cachedService *cachedService
  641. var retryDelay time.Duration
  642. defer func() {
  643. glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
  644. }()
  645. // obj holds the latest service info from apiserver
  646. obj, exists, err := s.serviceStore.Store.GetByKey(key)
  647. if err != nil {
  648. glog.Infof("Unable to retrieve service %v from store: %v", key, err)
  649. s.workingQueue.Add(key)
  650. return err
  651. }
  652. if !exists {
  653. // service absence in store means watcher caught the deletion, ensure LB info is cleaned
  654. glog.Infof("Service has been deleted %v", key)
  655. err, retryDelay = s.processServiceDeletion(key)
  656. } else {
  657. service, ok := obj.(*api.Service)
  658. if ok {
  659. cachedService = s.cache.getOrCreate(key)
  660. err, retryDelay = s.processServiceUpdate(cachedService, service, key)
  661. } else {
  662. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  663. if !ok {
  664. return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
  665. }
  666. glog.Infof("Found tombstone for %v", key)
  667. err, retryDelay = s.processServiceDeletion(tombstone.Key)
  668. }
  669. }
  670. if retryDelay != 0 {
  671. // Add the failed service back to the queue so we'll retry it.
  672. glog.Errorf("Failed to process service. Retrying in %s: %v", retryDelay, err)
  673. go func(obj interface{}, delay time.Duration) {
  674. // put back the service key to working queue, it is possible that more entries of the service
  675. // were added into the queue during the delay, but it does not mess as when handling the retry,
  676. // it always get the last service info from service store
  677. s.workingQueue.AddAfter(obj, delay)
  678. }(key, retryDelay)
  679. } else if err != nil {
  680. runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
  681. }
  682. return nil
  683. }
  684. // Returns an error if processing the service deletion failed, along with a time.Duration
  685. // indicating whether processing should be retried; zero means no-retry; otherwise
  686. // we should retry after that Duration.
  687. func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
  688. cachedService, ok := s.cache.get(key)
  689. if !ok {
  690. return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
  691. }
  692. service := cachedService.state
  693. // delete load balancer info only if the service type is LoadBalancer
  694. if !wantsLoadBalancer(service) {
  695. return nil, doNotRetry
  696. }
  697. s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
  698. err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
  699. if err != nil {
  700. message := "Error deleting load balancer (will retry): " + err.Error()
  701. s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
  702. return err, cachedService.nextRetryDelay()
  703. }
  704. s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
  705. s.cache.delete(key)
  706. cachedService.resetRetryDelay()
  707. return nil, doNotRetry
  708. }