123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package service
- import (
- "fmt"
- "sort"
- "sync"
- "time"
- "reflect"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/errors"
- "k8s.io/kubernetes/pkg/client/cache"
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
- "k8s.io/kubernetes/pkg/client/record"
- "k8s.io/kubernetes/pkg/cloudprovider"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/controller/framework"
- "k8s.io/kubernetes/pkg/fields"
- pkg_runtime "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/util/metrics"
- "k8s.io/kubernetes/pkg/util/runtime"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/pkg/util/workqueue"
- "k8s.io/kubernetes/pkg/watch"
- )
- const (
- // Interval of synchoronizing service status from apiserver
- serviceSyncPeriod = 30 * time.Second
- // Interval of synchoronizing node status from apiserver
- nodeSyncPeriod = 100 * time.Second
- // How long to wait before retrying the processing of a service change.
- // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
- // should be changed appropriately.
- minRetryDelay = 5 * time.Second
- maxRetryDelay = 300 * time.Second
- clientRetryCount = 5
- clientRetryInterval = 5 * time.Second
- retryable = true
- notRetryable = false
- doNotRetry = time.Duration(0)
- )
- type cachedService struct {
- // The cached state of the service
- state *api.Service
- // Controls error back-off
- lastRetryDelay time.Duration
- }
- type serviceCache struct {
- mu sync.Mutex // protects serviceMap
- serviceMap map[string]*cachedService
- }
- type ServiceController struct {
- cloud cloudprovider.Interface
- knownHosts []string
- servicesToUpdate []*api.Service
- kubeClient clientset.Interface
- clusterName string
- balancer cloudprovider.LoadBalancer
- zone cloudprovider.Zone
- cache *serviceCache
- // A store of services, populated by the serviceController
- serviceStore cache.StoreToServiceLister
- // Watches changes to all services
- serviceController *framework.Controller
- eventBroadcaster record.EventBroadcaster
- eventRecorder record.EventRecorder
- nodeLister cache.StoreToNodeLister
- // services that need to be synced
- workingQueue workqueue.DelayingInterface
- }
- // New returns a new service controller to keep cloud provider service resources
- // (like load balancers) in sync with the registry.
- func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
- broadcaster := record.NewBroadcaster()
- broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
- recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
- if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
- metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
- }
- s := &ServiceController{
- cloud: cloud,
- knownHosts: []string{},
- kubeClient: kubeClient,
- clusterName: clusterName,
- cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
- eventBroadcaster: broadcaster,
- eventRecorder: recorder,
- nodeLister: cache.StoreToNodeLister{
- Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
- },
- workingQueue: workqueue.NewDelayingQueue(),
- }
- s.serviceStore.Store, s.serviceController = framework.NewInformer(
- &cache.ListWatch{
- ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
- return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
- },
- WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
- return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
- },
- },
- &api.Service{},
- serviceSyncPeriod,
- framework.ResourceEventHandlerFuncs{
- AddFunc: s.enqueueService,
- UpdateFunc: func(old, cur interface{}) {
- oldSvc, ok1 := old.(*api.Service)
- curSvc, ok2 := cur.(*api.Service)
- if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
- s.enqueueService(cur)
- }
- },
- DeleteFunc: s.enqueueService,
- },
- )
- if err := s.init(); err != nil {
- return nil, err
- }
- return s, nil
- }
- // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
- func (s *ServiceController) enqueueService(obj interface{}) {
- key, err := controller.KeyFunc(obj)
- if err != nil {
- glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
- return
- }
- s.workingQueue.Add(key)
- }
- // Run starts a background goroutine that watches for changes to services that
- // have (or had) LoadBalancers=true and ensures that they have
- // load balancers created and deleted appropriately.
- // serviceSyncPeriod controls how often we check the cluster's services to
- // ensure that the correct load balancers exist.
- // nodeSyncPeriod controls how often we check the cluster's nodes to determine
- // if load balancers need to be updated to point to a new set.
- //
- // It's an error to call Run() more than once for a given ServiceController
- // object.
- func (s *ServiceController) Run(workers int) {
- defer runtime.HandleCrash()
- go s.serviceController.Run(wait.NeverStop)
- for i := 0; i < workers; i++ {
- go wait.Until(s.worker, time.Second, wait.NeverStop)
- }
- nodeLW := cache.NewListWatchFromClient(s.kubeClient.(*clientset.Clientset).CoreClient, "nodes", api.NamespaceAll, fields.Everything())
- cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
- go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
- }
- // worker runs a worker thread that just dequeues items, processes them, and marks them done.
- // It enforces that the syncHandler is never invoked concurrently with the same key.
- func (s *ServiceController) worker() {
- for {
- func() {
- key, quit := s.workingQueue.Get()
- if quit {
- return
- }
- defer s.workingQueue.Done(key)
- err := s.syncService(key.(string))
- if err != nil {
- glog.Errorf("Error syncing service: %v", err)
- }
- }()
- }
- }
- func (s *ServiceController) init() error {
- if s.cloud == nil {
- return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail.")
- }
- balancer, ok := s.cloud.LoadBalancer()
- if !ok {
- return fmt.Errorf("the cloud provider does not support external load balancers.")
- }
- s.balancer = balancer
- zones, ok := s.cloud.Zones()
- if !ok {
- return fmt.Errorf("the cloud provider does not support zone enumeration, which is required for creating load balancers.")
- }
- zone, err := zones.GetZone()
- if err != nil {
- return fmt.Errorf("failed to get zone from cloud provider, will not be able to create load balancers: %v", err)
- }
- s.zone = zone
- return nil
- }
- // Returns an error if processing the service update failed, along with a time.Duration
- // indicating whether processing should be retried; zero means no-retry; otherwise
- // we should retry in that Duration.
- func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
- // cache the service, we need the info for service deletion
- cachedService.state = service
- err, retry := s.createLoadBalancerIfNeeded(key, service)
- if err != nil {
- message := "Error creating load balancer"
- if retry {
- message += " (will retry): "
- } else {
- message += " (will not retry): "
- }
- message += err.Error()
- s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
- return err, cachedService.nextRetryDelay()
- }
- // Always update the cache upon success.
- // NOTE: Since we update the cached service if and only if we successfully
- // processed it, a cached service being nil implies that it hasn't yet
- // been successfully processed.
- s.cache.set(key, cachedService)
- cachedService.resetRetryDelay()
- return nil, doNotRetry
- }
- // Returns whatever error occurred along with a boolean indicator of whether it
- // should be retried.
- func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
- // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
- // which may involve service interruption. Also, we would like user-friendly events.
- // Save the state so we can avoid a write if it doesn't change
- previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
- if !wantsLoadBalancer(service) {
- needDelete := true
- _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
- if err != nil {
- return fmt.Errorf("Error getting LB for service %s: %v", key, err), retryable
- }
- if !exists {
- needDelete = false
- }
- if needDelete {
- glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
- s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
- if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
- return err, retryable
- }
- s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
- }
- service.Status.LoadBalancer = api.LoadBalancerStatus{}
- } else {
- glog.V(2).Infof("Ensuring LB for service %s", key)
- // TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
- // The load balancer doesn't exist yet, so create it.
- s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
- err := s.createLoadBalancer(service)
- if err != nil {
- return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
- }
- s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
- }
- // Write the state if changed
- // TODO: Be careful here ... what if there were other changes to the service?
- if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
- if err := s.persistUpdate(service); err != nil {
- return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
- }
- } else {
- glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
- }
- return nil, notRetryable
- }
- func (s *ServiceController) persistUpdate(service *api.Service) error {
- var err error
- for i := 0; i < clientRetryCount; i++ {
- _, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
- if err == nil {
- return nil
- }
- // If the object no longer exists, we don't want to recreate it. Just bail
- // out so that we can process the delete, which we should soon be receiving
- // if we haven't already.
- if errors.IsNotFound(err) {
- glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
- service.Namespace, service.Name, err)
- return nil
- }
- // TODO: Try to resolve the conflict if the change was unrelated to load
- // balancer status. For now, just rely on the fact that we'll
- // also process the update that caused the resource version to change.
- if errors.IsConflict(err) {
- glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
- service.Namespace, service.Name, err)
- return nil
- }
- glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
- service.Namespace, service.Name, err)
- time.Sleep(clientRetryInterval)
- }
- return err
- }
- func (s *ServiceController) createLoadBalancer(service *api.Service) error {
- nodes, err := s.nodeLister.List()
- if err != nil {
- return err
- }
- // - Only one protocol supported per service
- // - Not all cloud providers support all protocols and the next step is expected to return
- // an error for unsupported protocols
- status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
- if err != nil {
- return err
- } else {
- service.Status.LoadBalancer = *status
- }
- return nil
- }
- // ListKeys implements the interface required by DeltaFIFO to list the keys we
- // already know about.
- func (s *serviceCache) ListKeys() []string {
- s.mu.Lock()
- defer s.mu.Unlock()
- keys := make([]string, 0, len(s.serviceMap))
- for k := range s.serviceMap {
- keys = append(keys, k)
- }
- return keys
- }
- // GetByKey returns the value stored in the serviceMap under the given key
- func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if v, ok := s.serviceMap[key]; ok {
- return v, true, nil
- }
- return nil, false, nil
- }
- // ListKeys implements the interface required by DeltaFIFO to list the keys we
- // already know about.
- func (s *serviceCache) allServices() []*api.Service {
- s.mu.Lock()
- defer s.mu.Unlock()
- services := make([]*api.Service, 0, len(s.serviceMap))
- for _, v := range s.serviceMap {
- services = append(services, v.state)
- }
- return services
- }
- func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- service, ok := s.serviceMap[serviceName]
- return service, ok
- }
- func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
- s.mu.Lock()
- defer s.mu.Unlock()
- service, ok := s.serviceMap[serviceName]
- if !ok {
- service = &cachedService{}
- s.serviceMap[serviceName] = service
- }
- return service
- }
- func (s *serviceCache) set(serviceName string, service *cachedService) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.serviceMap[serviceName] = service
- }
- func (s *serviceCache) delete(serviceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
- delete(s.serviceMap, serviceName)
- }
- func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api.Service) bool {
- if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
- return false
- }
- if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
- s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
- oldService.Spec.Type, newService.Spec.Type)
- return true
- }
- if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
- return true
- }
- if !loadBalancerIPsAreEqual(oldService, newService) {
- s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
- oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
- return true
- }
- if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
- s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
- len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
- return true
- }
- for i := range oldService.Spec.ExternalIPs {
- if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
- s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
- newService.Spec.ExternalIPs[i])
- return true
- }
- }
- if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
- return true
- }
- if oldService.UID != newService.UID {
- s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
- oldService.UID, newService.UID)
- return true
- }
- return false
- }
- func (s *ServiceController) loadBalancerName(service *api.Service) string {
- return cloudprovider.GetLoadBalancerName(service)
- }
- func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
- var protocol api.Protocol
- ports := []*api.ServicePort{}
- for i := range service.Spec.Ports {
- sp := &service.Spec.Ports[i]
- // The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
- ports = append(ports, sp)
- if protocol == "" {
- protocol = sp.Protocol
- } else if protocol != sp.Protocol && wantsLoadBalancer(service) {
- // TODO: Convert error messages to use event recorder
- return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
- }
- }
- return ports, nil
- }
- func portsEqualForLB(x, y *api.Service) bool {
- xPorts, err := getPortsForLB(x)
- if err != nil {
- return false
- }
- yPorts, err := getPortsForLB(y)
- if err != nil {
- return false
- }
- return portSlicesEqualForLB(xPorts, yPorts)
- }
- func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
- if len(x) != len(y) {
- return false
- }
- for i := range x {
- if !portEqualForLB(x[i], y[i]) {
- return false
- }
- }
- return true
- }
- func portEqualForLB(x, y *api.ServicePort) bool {
- // TODO: Should we check name? (In theory, an LB could expose it)
- if x.Name != y.Name {
- return false
- }
- if x.Protocol != y.Protocol {
- return false
- }
- if x.Port != y.Port {
- return false
- }
- if x.NodePort != y.NodePort {
- return false
- }
- // We don't check TargetPort; that is not relevant for load balancing
- // TODO: Should we blank it out? Or just check it anyway?
- return true
- }
- func intSlicesEqual(x, y []int) bool {
- if len(x) != len(y) {
- return false
- }
- if !sort.IntsAreSorted(x) {
- sort.Ints(x)
- }
- if !sort.IntsAreSorted(y) {
- sort.Ints(y)
- }
- for i := range x {
- if x[i] != y[i] {
- return false
- }
- }
- return true
- }
- func stringSlicesEqual(x, y []string) bool {
- if len(x) != len(y) {
- return false
- }
- if !sort.StringsAreSorted(x) {
- sort.Strings(x)
- }
- if !sort.StringsAreSorted(y) {
- sort.Strings(y)
- }
- for i := range x {
- if x[i] != y[i] {
- return false
- }
- }
- return true
- }
- func includeNodeFromNodeList(node *api.Node) bool {
- return !node.Spec.Unschedulable
- }
- func hostsFromNodeList(list *api.NodeList) []string {
- result := []string{}
- for ix := range list.Items {
- if includeNodeFromNodeList(&list.Items[ix]) {
- result = append(result, list.Items[ix].Name)
- }
- }
- return result
- }
- func hostsFromNodeSlice(nodes []*api.Node) []string {
- result := []string{}
- for _, node := range nodes {
- if includeNodeFromNodeList(node) {
- result = append(result, node.Name)
- }
- }
- return result
- }
- func getNodeConditionPredicate() cache.NodeConditionPredicate {
- return func(node *api.Node) bool {
- // We add the master to the node list, but its unschedulable. So we use this to filter
- // the master.
- // TODO: Use a node annotation to indicate the master
- if node.Spec.Unschedulable {
- return false
- }
- // If we have no info, don't accept
- if len(node.Status.Conditions) == 0 {
- return false
- }
- for _, cond := range node.Status.Conditions {
- // We consider the node for load balancing only when its NodeReady condition status
- // is ConditionTrue
- if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue {
- glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
- return false
- }
- }
- return true
- }
- }
- // nodeSyncLoop handles updating the hosts pointed to by all load
- // balancers whenever the set of nodes in the cluster changes.
- func (s *ServiceController) nodeSyncLoop() {
- nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
- if err != nil {
- glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
- return
- }
- newHosts := hostsFromNodeSlice(nodes)
- if stringSlicesEqual(newHosts, s.knownHosts) {
- // The set of nodes in the cluster hasn't changed, but we can retry
- // updating any services that we failed to update last time around.
- s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
- return
- }
- glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
- // Try updating all services, and save the ones that fail to try again next
- // round.
- s.servicesToUpdate = s.cache.allServices()
- numServices := len(s.servicesToUpdate)
- s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
- glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
- numServices-len(s.servicesToUpdate), numServices)
- s.knownHosts = newHosts
- }
- // updateLoadBalancerHosts updates all existing load balancers so that
- // they will match the list of hosts provided.
- // Returns the list of services that couldn't be updated.
- func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
- for _, service := range services {
- func() {
- if service == nil {
- return
- }
- if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
- glog.Errorf("External error while updating load balancer: %v.", err)
- servicesToRetry = append(servicesToRetry, service)
- }
- }()
- }
- return servicesToRetry
- }
- // Updates the load balancer of a service, assuming we hold the mutex
- // associated with the service.
- func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error {
- if !wantsLoadBalancer(service) {
- return nil
- }
- // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
- err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
- if err == nil {
- s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
- return nil
- }
- // It's only an actual error if the load balancer still exists.
- if _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service); err != nil {
- glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
- } else if !exists {
- return nil
- }
- s.eventRecorder.Eventf(service, api.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
- return err
- }
- func wantsLoadBalancer(service *api.Service) bool {
- return service.Spec.Type == api.ServiceTypeLoadBalancer
- }
- func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
- return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
- }
- // Computes the next retry, using exponential backoff
- // mutex must be held.
- func (s *cachedService) nextRetryDelay() time.Duration {
- s.lastRetryDelay = s.lastRetryDelay * 2
- if s.lastRetryDelay < minRetryDelay {
- s.lastRetryDelay = minRetryDelay
- }
- if s.lastRetryDelay > maxRetryDelay {
- s.lastRetryDelay = maxRetryDelay
- }
- return s.lastRetryDelay
- }
- // Resets the retry exponential backoff. mutex must be held.
- func (s *cachedService) resetRetryDelay() {
- s.lastRetryDelay = time.Duration(0)
- }
- // syncService will sync the Service with the given key if it has had its expectations fulfilled,
- // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
- // invoked concurrently with the same key.
- func (s *ServiceController) syncService(key string) error {
- startTime := time.Now()
- var cachedService *cachedService
- var retryDelay time.Duration
- defer func() {
- glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
- }()
- // obj holds the latest service info from apiserver
- obj, exists, err := s.serviceStore.Store.GetByKey(key)
- if err != nil {
- glog.Infof("Unable to retrieve service %v from store: %v", key, err)
- s.workingQueue.Add(key)
- return err
- }
- if !exists {
- // service absence in store means watcher caught the deletion, ensure LB info is cleaned
- glog.Infof("Service has been deleted %v", key)
- err, retryDelay = s.processServiceDeletion(key)
- } else {
- service, ok := obj.(*api.Service)
- if ok {
- cachedService = s.cache.getOrCreate(key)
- err, retryDelay = s.processServiceUpdate(cachedService, service, key)
- } else {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
- }
- glog.Infof("Found tombstone for %v", key)
- err, retryDelay = s.processServiceDeletion(tombstone.Key)
- }
- }
- if retryDelay != 0 {
- // Add the failed service back to the queue so we'll retry it.
- glog.Errorf("Failed to process service. Retrying in %s: %v", retryDelay, err)
- go func(obj interface{}, delay time.Duration) {
- // put back the service key to working queue, it is possible that more entries of the service
- // were added into the queue during the delay, but it does not mess as when handling the retry,
- // it always get the last service info from service store
- s.workingQueue.AddAfter(obj, delay)
- }(key, retryDelay)
- } else if err != nil {
- runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
- }
- return nil
- }
- // Returns an error if processing the service deletion failed, along with a time.Duration
- // indicating whether processing should be retried; zero means no-retry; otherwise
- // we should retry after that Duration.
- func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
- cachedService, ok := s.cache.get(key)
- if !ok {
- return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
- }
- service := cachedService.state
- // delete load balancer info only if the service type is LoadBalancer
- if !wantsLoadBalancer(service) {
- return nil, doNotRetry
- }
- s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
- err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
- if err != nil {
- message := "Error deleting load balancer (will retry): " + err.Error()
- s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
- return err, cachedService.nextRetryDelay()
- }
- s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
- s.cache.delete(key)
- cachedService.resetRetryDelay()
- return nil, doNotRetry
- }
|