kube.go 9.9 KB

  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/coreos/flannel/pkg/ip"
  24. "github.com/coreos/flannel/subnet"
  25. "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/strategicpatch"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apimachinery/pkg/watch"
  33. clientset "k8s.io/client-go/kubernetes"
  34. listers "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/pkg/api"
  36. "k8s.io/client-go/pkg/api/v1"
  37. "k8s.io/client-go/rest"
  38. "k8s.io/client-go/tools/cache"
  39. "k8s.io/client-go/tools/clientcmd"
  40. )
  41. var (
  42. ErrUnimplemented = errors.New("unimplemented")
  43. )
  44. const (
  45. resyncPeriod = 5 * time.Minute
  46. nodeControllerSyncTimeout = 10 * time.Minute
  47. subnetKubeManagedAnnotation = "flannel.alpha.coreos.com/kube-subnet-manager"
  48. backendDataAnnotation = "flannel.alpha.coreos.com/backend-data"
  49. backendTypeAnnotation = "flannel.alpha.coreos.com/backend-type"
  50. backendPublicIPAnnotation = "flannel.alpha.coreos.com/public-ip"
  51. backendPublicIPOverwriteAnnotation = "flannel.alpha.coreos.com/public-ip-overwrite"
  52. netConfPath = "/etc/kube-flannel/net-conf.json"
  53. )
  54. type kubeSubnetManager struct {
  55. client clientset.Interface
  56. nodeName string
  57. nodeStore listers.NodeLister
  58. nodeController cache.Controller
  59. subnetConf *subnet.Config
  60. events chan subnet.Event
  61. }
  62. func NewSubnetManager(apiUrl, kubeconfig string) (subnet.Manager, error) {
  63. var cfg *rest.Config
  64. var err error
  65. // Use out of cluster config if the URL or kubeconfig have been specified. Otherwise use incluster config.
  66. if apiUrl != "" || kubeconfig != "" {
  67. cfg, err = clientcmd.BuildConfigFromFlags(apiUrl, kubeconfig)
  68. if err != nil {
  69. return nil, fmt.Errorf("unable to create k8s config: %v", err)
  70. }
  71. } else {
  72. cfg, err = rest.InClusterConfig()
  73. if err != nil {
  74. return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
  75. }
  76. }
  77. c, err := clientset.NewForConfig(cfg)
  78. if err != nil {
  79. return nil, fmt.Errorf("unable to initialize client: %v", err)
  80. }
  81. // The kube subnet mgr needs to know the k8s node name that it's running on so it can annotate it.
  82. // If we're running as a pod then the POD_NAME and POD_NAMESPACE will be populated and can be used to find the node
  83. // name. Otherwise, the environment variable NODE_NAME can be passed in.
  84. nodeName := os.Getenv("NODE_NAME")
  85. if nodeName == "" {
  86. podName := os.Getenv("POD_NAME")
  87. podNamespace := os.Getenv("POD_NAMESPACE")
  88. if podName == "" || podNamespace == "" {
  89. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  90. }
  91. pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
  92. if err != nil {
  93. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  94. }
  95. nodeName = pod.Spec.NodeName
  96. if nodeName == "" {
  97. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  98. }
  99. }
  100. netConf, err := ioutil.ReadFile(netConfPath)
  101. if err != nil {
  102. return nil, fmt.Errorf("failed to read net conf: %v", err)
  103. }
  104. sc, err := subnet.ParseConfig(string(netConf))
  105. if err != nil {
  106. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  107. }
  108. sm, err := newKubeSubnetManager(c, sc, nodeName)
  109. if err != nil {
  110. return nil, fmt.Errorf("error creating network manager: %s", err)
  111. }
  112. go sm.Run(context.Background())
  113. glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
  114. err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
  115. return sm.nodeController.HasSynced(), nil
  116. })
  117. if err != nil {
  118. return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
  119. }
  120. glog.Infof("Node controller sync successful")
  121. return sm, nil
  122. }
  123. func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
  124. var ksm kubeSubnetManager
  125. ksm.client = c
  126. ksm.nodeName = nodeName
  127. ksm.subnetConf = sc
  128. ksm.events = make(chan subnet.Event, 5000)
  129. indexer, controller := cache.NewIndexerInformer(
  130. &cache.ListWatch{
  131. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  132. return ksm.client.CoreV1().Nodes().List(options)
  133. },
  134. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  135. return ksm.client.CoreV1().Nodes().Watch(options)
  136. },
  137. },
  138. &v1.Node{},
  139. resyncPeriod,
  140. cache.ResourceEventHandlerFuncs{
  141. AddFunc: func(obj interface{}) {
  142. ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
  143. },
  144. UpdateFunc: ksm.handleUpdateLeaseEvent,
  145. DeleteFunc: func(obj interface{}) {
  146. ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
  147. },
  148. },
  149. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  150. )
  151. ksm.nodeController = controller
  152. ksm.nodeStore = listers.NewNodeLister(indexer)
  153. return &ksm, nil
  154. }
  155. func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
  156. n := obj.(*v1.Node)
  157. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  158. return
  159. }
  160. l, err := nodeToLease(*n)
  161. if err != nil {
  162. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  163. return
  164. }
  165. ksm.events <- subnet.Event{et, l}
  166. }
  167. func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
  168. o := oldObj.(*v1.Node)
  169. n := newObj.(*v1.Node)
  170. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  171. return
  172. }
  173. if o.Annotations[backendDataAnnotation] == n.Annotations[backendDataAnnotation] &&
  174. o.Annotations[backendTypeAnnotation] == n.Annotations[backendTypeAnnotation] &&
  175. o.Annotations[backendPublicIPAnnotation] == n.Annotations[backendPublicIPAnnotation] {
  176. return // No change to lease
  177. }
  178. l, err := nodeToLease(*n)
  179. if err != nil {
  180. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  181. return
  182. }
  183. ksm.events <- subnet.Event{subnet.EventAdded, l}
  184. }
  185. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
  186. return ksm.subnetConf, nil
  187. }
  188. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  189. cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
  190. if err != nil {
  191. return nil, err
  192. }
  193. nobj, err := api.Scheme.DeepCopy(cachedNode)
  194. if err != nil {
  195. return nil, err
  196. }
  197. n := nobj.(*v1.Node)
  198. if n.Spec.PodCIDR == "" {
  199. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  200. }
  201. bd, err := attrs.BackendData.MarshalJSON()
  202. if err != nil {
  203. return nil, err
  204. }
  205. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  206. if err != nil {
  207. return nil, err
  208. }
  209. if n.Annotations[backendDataAnnotation] != string(bd) ||
  210. n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
  211. n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
  212. n.Annotations[subnetKubeManagedAnnotation] != "true" ||
  213. (n.Annotations[backendPublicIPOverwriteAnnotation] != "" && n.Annotations[backendPublicIPOverwriteAnnotation] != attrs.PublicIP.String()) {
  214. n.Annotations[backendTypeAnnotation] = attrs.BackendType
  215. n.Annotations[backendDataAnnotation] = string(bd)
  216. if n.Annotations[backendPublicIPOverwriteAnnotation] != "" {
  217. n.Annotations[backendPublicIPAnnotation] = n.Annotations[backendPublicIPOverwriteAnnotation]
  218. } else {
  219. n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
  220. }
  221. n.Annotations[subnetKubeManagedAnnotation] = "true"
  222. oldData, err := json.Marshal(cachedNode)
  223. if err != nil {
  224. return nil, err
  225. }
  226. newData, err := json.Marshal(n)
  227. if err != nil {
  228. return nil, err
  229. }
  230. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  231. if err != nil {
  232. return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
  233. }
  234. _, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
  235. if err != nil {
  236. return nil, err
  237. }
  238. }
  239. return &subnet.Lease{
  240. Subnet: ip.FromIPNet(cidr),
  241. Attrs: *attrs,
  242. Expiration: time.Now().Add(24 * time.Hour),
  243. }, nil
  244. }
  245. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
  246. select {
  247. case event := <-ksm.events:
  248. return subnet.LeaseWatchResult{
  249. Events: []subnet.Event{event},
  250. }, nil
  251. case <-ctx.Done():
  252. return subnet.LeaseWatchResult{}, nil
  253. }
  254. }
  255. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  256. glog.Infof("Starting kube subnet manager")
  257. ksm.nodeController.Run(ctx.Done())
  258. }
  259. func nodeToLease(n v1.Node) (l subnet.Lease, err error) {
  260. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[backendPublicIPAnnotation])
  261. if err != nil {
  262. return l, err
  263. }
  264. l.Attrs.BackendType = n.Annotations[backendTypeAnnotation]
  265. l.Attrs.BackendData = json.RawMessage(n.Annotations[backendDataAnnotation])
  266. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  267. if err != nil {
  268. return l, err
  269. }
  270. l.Subnet = ip.FromIPNet(cidr)
  271. return l, nil
  272. }
  273. // unimplemented
  274. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
  275. return ErrUnimplemented
  276. }
  277. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  278. return subnet.LeaseWatchResult{}, ErrUnimplemented
  279. }
  280. func (ksm *kubeSubnetManager) Name() string {
  281. return fmt.Sprintf("Kubernetes Subnet Manager - %s", ksm.nodeName)
  282. }