cidr_allocator.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "errors"
  16. "fmt"
  17. "net"
  18. "sync"
  19. "k8s.io/kubernetes/pkg/api"
  20. apierrors "k8s.io/kubernetes/pkg/api/errors"
  21. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  22. "k8s.io/kubernetes/pkg/client/record"
  23. "k8s.io/kubernetes/pkg/util/sets"
  24. "k8s.io/kubernetes/pkg/util/wait"
  25. "github.com/golang/glog"
  26. )
  27. // TODO: figure out the good setting for those constants.
  28. const (
  29. // controls how many NodeSpec updates NC can process concurrently.
  30. cidrUpdateWorkers = 10
  31. cidrUpdateQueueSize = 5000
  32. // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
  33. podCIDRUpdateRetry = 5
  34. )
  35. var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
  36. type nodeAndCIDR struct {
  37. cidr *net.IPNet
  38. nodeName string
  39. }
  40. // CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes.
  41. type CIDRAllocator interface {
  42. AllocateOrOccupyCIDR(node *api.Node) error
  43. ReleaseCIDR(node *api.Node) error
  44. }
  45. type rangeAllocator struct {
  46. client clientset.Interface
  47. cidrs *cidrSet
  48. clusterCIDR *net.IPNet
  49. maxCIDRs int
  50. // Channel that is used to pass updating Nodes with assigned CIDRs to the background
  51. // This increases a throughput of CIDR assignment by not blocking on long operations.
  52. nodeCIDRUpdateChannel chan nodeAndCIDR
  53. recorder record.EventRecorder
  54. // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
  55. sync.Mutex
  56. nodesInProcessing sets.String
  57. }
  58. // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
  59. // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
  60. // Caller must always pass in a list of existing nodes so the new allocator
  61. // can initialize its CIDR map. NodeList is only nil in testing.
  62. func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) {
  63. eventBroadcaster := record.NewBroadcaster()
  64. recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"})
  65. eventBroadcaster.StartLogging(glog.Infof)
  66. ra := &rangeAllocator{
  67. client: client,
  68. cidrs: newCIDRSet(clusterCIDR, subNetMaskSize),
  69. clusterCIDR: clusterCIDR,
  70. nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
  71. recorder: recorder,
  72. nodesInProcessing: sets.NewString(),
  73. }
  74. if serviceCIDR != nil {
  75. ra.filterOutServiceRange(serviceCIDR)
  76. } else {
  77. glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
  78. }
  79. if nodeList != nil {
  80. for _, node := range nodeList.Items {
  81. if node.Spec.PodCIDR == "" {
  82. glog.Infof("Node %v has no CIDR, ignoring", node.Name)
  83. continue
  84. } else {
  85. glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR)
  86. }
  87. if err := ra.occupyCIDR(&node); err != nil {
  88. // This will happen if:
  89. // 1. We find garbage in the podCIDR field. Retrying is useless.
  90. // 2. CIDR out of range: This means a node CIDR has changed.
  91. // This error will keep crashing controller-manager.
  92. return nil, err
  93. }
  94. }
  95. }
  96. for i := 0; i < cidrUpdateWorkers; i++ {
  97. go func(stopChan <-chan struct{}) {
  98. for {
  99. select {
  100. case workItem, ok := <-ra.nodeCIDRUpdateChannel:
  101. if !ok {
  102. glog.Warning("NodeCIDRUpdateChannel read returned false.")
  103. return
  104. }
  105. ra.updateCIDRAllocation(workItem)
  106. case <-stopChan:
  107. return
  108. }
  109. }
  110. }(wait.NeverStop)
  111. }
  112. return ra, nil
  113. }
  114. func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
  115. r.Lock()
  116. defer r.Unlock()
  117. if r.nodesInProcessing.Has(nodeName) {
  118. return false
  119. }
  120. r.nodesInProcessing.Insert(nodeName)
  121. return true
  122. }
  123. func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
  124. r.Lock()
  125. defer r.Unlock()
  126. r.nodesInProcessing.Delete(nodeName)
  127. }
  128. func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
  129. defer r.removeNodeFromProcessing(node.Name)
  130. if node.Spec.PodCIDR == "" {
  131. return nil
  132. }
  133. _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
  134. if err != nil {
  135. return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
  136. }
  137. if err := r.cidrs.occupy(podCIDR); err != nil {
  138. return fmt.Errorf("failed to mark cidr as occupied: %v", err)
  139. }
  140. return nil
  141. }
  142. // AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
  143. // if it doesn't currently have one or mark the CIDR as used if the node already have one.
  144. // WARNING: If you're adding any return calls or defer any more work from this function
  145. // you have to handle correctly nodesInProcessing.
  146. func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
  147. if node == nil {
  148. return nil
  149. }
  150. if !r.insertNodeToProcessing(node.Name) {
  151. glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
  152. return nil
  153. }
  154. if node.Spec.PodCIDR != "" {
  155. return r.occupyCIDR(node)
  156. }
  157. podCIDR, err := r.cidrs.allocateNext()
  158. if err != nil {
  159. r.removeNodeFromProcessing(node.Name)
  160. recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
  161. return fmt.Errorf("failed to allocate cidr: %v", err)
  162. }
  163. glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
  164. r.nodeCIDRUpdateChannel <- nodeAndCIDR{
  165. nodeName: node.Name,
  166. cidr: podCIDR,
  167. }
  168. return nil
  169. }
  170. // ReleaseCIDR releases the CIDR of the removed node
  171. func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
  172. if node == nil || node.Spec.PodCIDR == "" {
  173. return nil
  174. }
  175. _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
  176. if err != nil {
  177. return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
  178. }
  179. glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
  180. if err = r.cidrs.release(podCIDR); err != nil {
  181. return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
  182. }
  183. return err
  184. }
  185. // Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
  186. // so that they won't be assignable.
  187. func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
  188. // Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
  189. // clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
  190. // or vice versa (which means that serviceCIDR contains clusterCIDR).
  191. if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
  192. return
  193. }
  194. if err := r.cidrs.occupy(serviceCIDR); err != nil {
  195. glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
  196. }
  197. }
  198. // Assigns CIDR to Node and sends an update to the API server.
  199. func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
  200. var err error
  201. var node *api.Node
  202. defer r.removeNodeFromProcessing(data.nodeName)
  203. for rep := 0; rep < podCIDRUpdateRetry; rep++ {
  204. // TODO: change it to using PATCH instead of full Node updates.
  205. node, err = r.client.Core().Nodes().Get(data.nodeName)
  206. if err != nil {
  207. glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
  208. continue
  209. }
  210. if node.Spec.PodCIDR != "" {
  211. glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
  212. if node.Spec.PodCIDR != data.cidr.String() {
  213. if err := r.cidrs.release(data.cidr); err != nil {
  214. glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
  215. }
  216. }
  217. return nil
  218. }
  219. node.Spec.PodCIDR = data.cidr.String()
  220. if _, err := r.client.Core().Nodes().Update(node); err != nil {
  221. glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
  222. } else {
  223. break
  224. }
  225. }
  226. if err != nil {
  227. recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
  228. // We accept the fact that we may leek CIDRs here. This is safer than releasing
  229. // them in case when we don't know if request went through.
  230. // NodeController restart will return all falsely allocated CIDRs to the pool.
  231. if !apierrors.IsServerTimeout(err) {
  232. glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
  233. if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
  234. glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
  235. }
  236. }
  237. }
  238. return err
  239. }