kube.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/flannel-io/flannel/pkg/ip"
  24. "github.com/flannel-io/flannel/subnet"
  25. "golang.org/x/net/context"
  26. v1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/strategicpatch"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apimachinery/pkg/watch"
  33. clientset "k8s.io/client-go/kubernetes"
  34. listers "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/rest"
  36. "k8s.io/client-go/tools/cache"
  37. "k8s.io/client-go/tools/clientcmd"
  38. log "k8s.io/klog"
  39. )
  40. var (
  41. ErrUnimplemented = errors.New("unimplemented")
  42. )
  43. const (
  44. resyncPeriod = 5 * time.Minute
  45. nodeControllerSyncTimeout = 10 * time.Minute
  46. )
  47. type kubeSubnetManager struct {
  48. enableIPv4 bool
  49. enableIPv6 bool
  50. annotations annotations
  51. client clientset.Interface
  52. nodeName string
  53. nodeStore listers.NodeLister
  54. nodeController cache.Controller
  55. subnetConf *subnet.Config
  56. events chan subnet.Event
  57. setNodeNetworkUnavailable bool
  58. }
  59. func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPath string, setNodeNetworkUnavailable bool) (subnet.Manager, error) {
  60. var cfg *rest.Config
  61. var err error
  62. // Try to build kubernetes config from a master url or a kubeconfig filepath. If neither masterUrl
  63. // or kubeconfigPath are passed in we fall back to inClusterConfig. If inClusterConfig fails,
  64. // we fallback to the default config.
  65. cfg, err = clientcmd.BuildConfigFromFlags(apiUrl, kubeconfig)
  66. if err != nil {
  67. return nil, fmt.Errorf("fail to create kubernetes config: %v", err)
  68. }
  69. c, err := clientset.NewForConfig(cfg)
  70. if err != nil {
  71. return nil, fmt.Errorf("unable to initialize client: %v", err)
  72. }
  73. // The kube subnet mgr needs to know the k8s node name that it's running on so it can annotate it.
  74. // If we're running as a pod then the POD_NAME and POD_NAMESPACE will be populated and can be used to find the node
  75. // name. Otherwise, the environment variable NODE_NAME can be passed in.
  76. nodeName := os.Getenv("NODE_NAME")
  77. if nodeName == "" {
  78. podName := os.Getenv("POD_NAME")
  79. podNamespace := os.Getenv("POD_NAMESPACE")
  80. if podName == "" || podNamespace == "" {
  81. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  82. }
  83. pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
  84. if err != nil {
  85. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  86. }
  87. nodeName = pod.Spec.NodeName
  88. if nodeName == "" {
  89. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  90. }
  91. }
  92. netConf, err := ioutil.ReadFile(netConfPath)
  93. if err != nil {
  94. return nil, fmt.Errorf("failed to read net conf: %v", err)
  95. }
  96. sc, err := subnet.ParseConfig(string(netConf))
  97. if err != nil {
  98. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  99. }
  100. sm, err := newKubeSubnetManager(ctx, c, sc, nodeName, prefix)
  101. if err != nil {
  102. return nil, fmt.Errorf("error creating network manager: %s", err)
  103. }
  104. sm.setNodeNetworkUnavailable = setNodeNetworkUnavailable
  105. go sm.Run(context.Background())
  106. log.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
  107. err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
  108. return sm.nodeController.HasSynced(), nil
  109. })
  110. if err != nil {
  111. return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
  112. }
  113. log.Infof("Node controller sync successful")
  114. return sm, nil
  115. }
  116. func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
  117. var err error
  118. var ksm kubeSubnetManager
  119. ksm.annotations, err = newAnnotations(prefix)
  120. if err != nil {
  121. return nil, err
  122. }
  123. ksm.enableIPv4 = sc.EnableIPv4
  124. ksm.enableIPv6 = sc.EnableIPv6
  125. ksm.client = c
  126. ksm.nodeName = nodeName
  127. ksm.subnetConf = sc
  128. ksm.events = make(chan subnet.Event, 5000)
  129. indexer, controller := cache.NewIndexerInformer(
  130. &cache.ListWatch{
  131. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  132. return ksm.client.CoreV1().Nodes().List(ctx, options)
  133. },
  134. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  135. return ksm.client.CoreV1().Nodes().Watch(ctx, options)
  136. },
  137. },
  138. &v1.Node{},
  139. resyncPeriod,
  140. cache.ResourceEventHandlerFuncs{
  141. AddFunc: func(obj interface{}) {
  142. ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
  143. },
  144. UpdateFunc: ksm.handleUpdateLeaseEvent,
  145. DeleteFunc: func(obj interface{}) {
  146. node, isNode := obj.(*v1.Node)
  147. // We can get DeletedFinalStateUnknown instead of *api.Node here and we need to handle that correctly.
  148. if !isNode {
  149. deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
  150. if !ok {
  151. log.Infof("Error received unexpected object: %v", obj)
  152. return
  153. }
  154. node, ok = deletedState.Obj.(*v1.Node)
  155. if !ok {
  156. log.Infof("Error deletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
  157. return
  158. }
  159. obj = node
  160. }
  161. ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
  162. },
  163. },
  164. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  165. )
  166. ksm.nodeController = controller
  167. ksm.nodeStore = listers.NewNodeLister(indexer)
  168. return &ksm, nil
  169. }
  170. func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
  171. n := obj.(*v1.Node)
  172. if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
  173. return
  174. }
  175. l, err := ksm.nodeToLease(*n)
  176. if err != nil {
  177. log.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  178. return
  179. }
  180. ksm.events <- subnet.Event{et, l}
  181. }
  182. func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
  183. o := oldObj.(*v1.Node)
  184. n := newObj.(*v1.Node)
  185. if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
  186. return
  187. }
  188. var changed = true
  189. if ksm.enableIPv4 && o.Annotations[ksm.annotations.BackendData] == n.Annotations[ksm.annotations.BackendData] &&
  190. o.Annotations[ksm.annotations.BackendType] == n.Annotations[ksm.annotations.BackendType] &&
  191. o.Annotations[ksm.annotations.BackendPublicIP] == n.Annotations[ksm.annotations.BackendPublicIP] {
  192. changed = false
  193. }
  194. if ksm.enableIPv6 && o.Annotations[ksm.annotations.BackendV6Data] == n.Annotations[ksm.annotations.BackendV6Data] &&
  195. o.Annotations[ksm.annotations.BackendType] == n.Annotations[ksm.annotations.BackendType] &&
  196. o.Annotations[ksm.annotations.BackendPublicIPv6] == n.Annotations[ksm.annotations.BackendPublicIPv6] {
  197. changed = false
  198. }
  199. if !changed {
  200. return // No change to lease
  201. }
  202. l, err := ksm.nodeToLease(*n)
  203. if err != nil {
  204. log.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  205. return
  206. }
  207. ksm.events <- subnet.Event{subnet.EventAdded, l}
  208. }
  209. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
  210. return ksm.subnetConf, nil
  211. }
  212. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  213. cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
  214. if err != nil {
  215. return nil, err
  216. }
  217. n := cachedNode.DeepCopy()
  218. if n.Spec.PodCIDR == "" {
  219. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  220. }
  221. var bd, v6Bd []byte
  222. bd, err = attrs.BackendData.MarshalJSON()
  223. if err != nil {
  224. return nil, err
  225. }
  226. v6Bd, err = attrs.BackendV6Data.MarshalJSON()
  227. if err != nil {
  228. return nil, err
  229. }
  230. var cidr, ipv6Cidr *net.IPNet
  231. _, cidr, err = net.ParseCIDR(n.Spec.PodCIDR)
  232. if err != nil {
  233. return nil, err
  234. }
  235. for _, podCidr := range n.Spec.PodCIDRs {
  236. _, parseCidr, err := net.ParseCIDR(podCidr)
  237. if err != nil {
  238. return nil, err
  239. }
  240. if len(parseCidr.IP) == net.IPv6len {
  241. ipv6Cidr = parseCidr
  242. break
  243. }
  244. }
  245. if (n.Annotations[ksm.annotations.BackendData] != string(bd) ||
  246. n.Annotations[ksm.annotations.BackendType] != attrs.BackendType ||
  247. n.Annotations[ksm.annotations.BackendPublicIP] != attrs.PublicIP.String() ||
  248. n.Annotations[ksm.annotations.SubnetKubeManaged] != "true" ||
  249. (n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" && n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != attrs.PublicIP.String())) ||
  250. (n.Annotations[ksm.annotations.BackendV6Data] != string(v6Bd) ||
  251. n.Annotations[ksm.annotations.BackendType] != attrs.BackendType ||
  252. n.Annotations[ksm.annotations.BackendPublicIPv6] != attrs.PublicIPv6.String() ||
  253. n.Annotations[ksm.annotations.SubnetKubeManaged] != "true" ||
  254. (n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite] != "" && n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite] != attrs.PublicIPv6.String())) {
  255. n.Annotations[ksm.annotations.BackendType] = attrs.BackendType
  256. //TODO - temporarily compatible with dual stack,
  257. // only vxlan backend support dual stack now.
  258. if (attrs.BackendType == "vxlan" && string(bd) != "null") || attrs.BackendType != "vxlan" {
  259. n.Annotations[ksm.annotations.BackendData] = string(bd)
  260. if n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" {
  261. if n.Annotations[ksm.annotations.BackendPublicIP] != n.Annotations[ksm.annotations.BackendPublicIPOverwrite] {
  262. log.Infof("Overriding public ip with '%s' from node annotation '%s'",
  263. n.Annotations[ksm.annotations.BackendPublicIPOverwrite],
  264. ksm.annotations.BackendPublicIPOverwrite)
  265. n.Annotations[ksm.annotations.BackendPublicIP] = n.Annotations[ksm.annotations.BackendPublicIPOverwrite]
  266. }
  267. } else {
  268. n.Annotations[ksm.annotations.BackendPublicIP] = attrs.PublicIP.String()
  269. }
  270. }
  271. if string(v6Bd) != "null" {
  272. n.Annotations[ksm.annotations.BackendV6Data] = string(v6Bd)
  273. if n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite] != "" {
  274. if n.Annotations[ksm.annotations.BackendPublicIPv6] != n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite] {
  275. log.Infof("Overriding public ipv6 with '%s' from node annotation '%s'",
  276. n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite],
  277. ksm.annotations.BackendPublicIPv6Overwrite)
  278. n.Annotations[ksm.annotations.BackendPublicIPv6] = n.Annotations[ksm.annotations.BackendPublicIPv6Overwrite]
  279. }
  280. } else {
  281. n.Annotations[ksm.annotations.BackendPublicIPv6] = attrs.PublicIPv6.String()
  282. }
  283. }
  284. n.Annotations[ksm.annotations.SubnetKubeManaged] = "true"
  285. oldData, err := json.Marshal(cachedNode)
  286. if err != nil {
  287. return nil, err
  288. }
  289. newData, err := json.Marshal(n)
  290. if err != nil {
  291. return nil, err
  292. }
  293. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  294. if err != nil {
  295. return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
  296. }
  297. _, err = ksm.client.CoreV1().Nodes().Patch(ctx, ksm.nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
  298. if err != nil {
  299. return nil, err
  300. }
  301. }
  302. if ksm.setNodeNetworkUnavailable {
  303. log.Infoln("Setting NodeNetworkUnavailable")
  304. err = ksm.setNodeNetworkUnavailableFalse(ctx)
  305. if err != nil {
  306. log.Errorf("Unable to set NodeNetworkUnavailable to False for %q: %v", ksm.nodeName, err)
  307. }
  308. } else {
  309. log.Infoln("Skip setting NodeNetworkUnavailable")
  310. }
  311. lease := &subnet.Lease{
  312. Attrs: *attrs,
  313. Expiration: time.Now().Add(24 * time.Hour),
  314. }
  315. if cidr != nil {
  316. lease.Subnet = ip.FromIPNet(cidr)
  317. }
  318. if ipv6Cidr != nil {
  319. lease.IPv6Subnet = ip.FromIP6Net(ipv6Cidr)
  320. }
  321. //TODO - temporarily compatible with dual stack,
  322. // only vxlan backend support dual stack now.
  323. if attrs.BackendType != "vxlan" {
  324. lease.EnableIPv4 = true
  325. lease.EnableIPv6 = false
  326. }
  327. return lease, nil
  328. }
  329. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
  330. select {
  331. case event := <-ksm.events:
  332. return subnet.LeaseWatchResult{
  333. Events: []subnet.Event{event},
  334. }, nil
  335. case <-ctx.Done():
  336. return subnet.LeaseWatchResult{}, context.Canceled
  337. }
  338. }
  339. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  340. log.Infof("Starting kube subnet manager")
  341. ksm.nodeController.Run(ctx.Done())
  342. }
  343. func (ksm *kubeSubnetManager) nodeToLease(n v1.Node) (l subnet.Lease, err error) {
  344. if ksm.enableIPv4 {
  345. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[ksm.annotations.BackendPublicIP])
  346. if err != nil {
  347. return l, err
  348. }
  349. l.Attrs.BackendData = json.RawMessage(n.Annotations[ksm.annotations.BackendData])
  350. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  351. if err != nil {
  352. return l, err
  353. }
  354. l.Subnet = ip.FromIPNet(cidr)
  355. l.EnableIPv4 = ksm.enableIPv4
  356. }
  357. if ksm.enableIPv6 {
  358. l.Attrs.PublicIPv6, err = ip.ParseIP6(n.Annotations[ksm.annotations.BackendPublicIPv6])
  359. if err != nil {
  360. return l, err
  361. }
  362. l.Attrs.BackendV6Data = json.RawMessage(n.Annotations[ksm.annotations.BackendV6Data])
  363. ipv6Cidr := new(net.IPNet)
  364. for _, podCidr := range n.Spec.PodCIDRs {
  365. _, parseCidr, err := net.ParseCIDR(podCidr)
  366. if err != nil {
  367. return l, err
  368. }
  369. if len(parseCidr.IP) == net.IPv6len {
  370. ipv6Cidr = parseCidr
  371. break
  372. }
  373. }
  374. l.IPv6Subnet = ip.FromIP6Net(ipv6Cidr)
  375. l.EnableIPv6 = ksm.enableIPv6
  376. }
  377. l.Attrs.BackendType = n.Annotations[ksm.annotations.BackendType]
  378. return l, nil
  379. }
  380. // RenewLease: unimplemented
  381. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
  382. return ErrUnimplemented
  383. }
  384. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  385. return subnet.LeaseWatchResult{}, ErrUnimplemented
  386. }
  387. func (ksm *kubeSubnetManager) Name() string {
  388. return fmt.Sprintf("Kubernetes Subnet Manager - %s", ksm.nodeName)
  389. }
  390. // Set Kubernetes NodeNetworkUnavailable to false when starting
  391. // https://kubernetes.io/docs/concepts/architecture/nodes/#condition
  392. func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse(ctx context.Context) error {
  393. condition := v1.NodeCondition{
  394. Type: v1.NodeNetworkUnavailable,
  395. Status: v1.ConditionFalse,
  396. Reason: "FlannelIsUp",
  397. Message: "Flannel is running on this node",
  398. LastTransitionTime: metav1.Now(),
  399. LastHeartbeatTime: metav1.Now(),
  400. }
  401. raw, err := json.Marshal(&[]v1.NodeCondition{condition})
  402. if err != nil {
  403. return err
  404. }
  405. patch := []byte(fmt.Sprintf(`{"status":{"conditions":%s}}`, raw))
  406. _, err = ksm.client.CoreV1().Nodes().PatchStatus(ctx, ksm.nodeName, patch)
  407. return err
  408. }