kube.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/coreos/flannel/pkg/ip"
  24. "github.com/coreos/flannel/subnet"
  25. "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/strategicpatch"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apimachinery/pkg/watch"
  33. clientset "k8s.io/client-go/kubernetes"
  34. listers "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/pkg/api"
  36. "k8s.io/client-go/pkg/api/v1"
  37. "k8s.io/client-go/rest"
  38. "k8s.io/client-go/tools/cache"
  39. "k8s.io/client-go/tools/clientcmd"
  40. )
  41. var (
  42. ErrUnimplemented = errors.New("unimplemented")
  43. )
  44. const (
  45. resyncPeriod = 5 * time.Minute
  46. nodeControllerSyncTimeout = 10 * time.Minute
  47. netConfPath = "/etc/kube-flannel/net-conf.json"
  48. )
  49. type kubeSubnetManager struct {
  50. annotations annotations
  51. client clientset.Interface
  52. nodeName string
  53. nodeStore listers.NodeLister
  54. nodeController cache.Controller
  55. subnetConf *subnet.Config
  56. events chan subnet.Event
  57. }
  58. func NewSubnetManager(apiUrl, kubeconfig, prefix string) (subnet.Manager, error) {
  59. var cfg *rest.Config
  60. var err error
  61. // Use out of cluster config if the URL or kubeconfig have been specified. Otherwise use incluster config.
  62. if apiUrl != "" || kubeconfig != "" {
  63. cfg, err = clientcmd.BuildConfigFromFlags(apiUrl, kubeconfig)
  64. if err != nil {
  65. return nil, fmt.Errorf("unable to create k8s config: %v", err)
  66. }
  67. } else {
  68. cfg, err = rest.InClusterConfig()
  69. if err != nil {
  70. return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
  71. }
  72. }
  73. c, err := clientset.NewForConfig(cfg)
  74. if err != nil {
  75. return nil, fmt.Errorf("unable to initialize client: %v", err)
  76. }
  77. // The kube subnet mgr needs to know the k8s node name that it's running on so it can annotate it.
  78. // If we're running as a pod then the POD_NAME and POD_NAMESPACE will be populated and can be used to find the node
  79. // name. Otherwise, the environment variable NODE_NAME can be passed in.
  80. nodeName := os.Getenv("NODE_NAME")
  81. if nodeName == "" {
  82. podName := os.Getenv("POD_NAME")
  83. podNamespace := os.Getenv("POD_NAMESPACE")
  84. if podName == "" || podNamespace == "" {
  85. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  86. }
  87. pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
  88. if err != nil {
  89. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  90. }
  91. nodeName = pod.Spec.NodeName
  92. if nodeName == "" {
  93. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  94. }
  95. }
  96. netConf, err := ioutil.ReadFile(netConfPath)
  97. if err != nil {
  98. return nil, fmt.Errorf("failed to read net conf: %v", err)
  99. }
  100. sc, err := subnet.ParseConfig(string(netConf))
  101. if err != nil {
  102. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  103. }
  104. sm, err := newKubeSubnetManager(c, sc, nodeName, prefix)
  105. if err != nil {
  106. return nil, fmt.Errorf("error creating network manager: %s", err)
  107. }
  108. go sm.Run(context.Background())
  109. glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
  110. err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
  111. return sm.nodeController.HasSynced(), nil
  112. })
  113. if err != nil {
  114. return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
  115. }
  116. glog.Infof("Node controller sync successful")
  117. return sm, nil
  118. }
  119. func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
  120. var err error
  121. var ksm kubeSubnetManager
  122. ksm.annotations, err = newAnnotations(prefix)
  123. if err != nil {
  124. return nil, err
  125. }
  126. ksm.client = c
  127. ksm.nodeName = nodeName
  128. ksm.subnetConf = sc
  129. ksm.events = make(chan subnet.Event, 5000)
  130. indexer, controller := cache.NewIndexerInformer(
  131. &cache.ListWatch{
  132. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  133. return ksm.client.CoreV1().Nodes().List(options)
  134. },
  135. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  136. return ksm.client.CoreV1().Nodes().Watch(options)
  137. },
  138. },
  139. &v1.Node{},
  140. resyncPeriod,
  141. cache.ResourceEventHandlerFuncs{
  142. AddFunc: func(obj interface{}) {
  143. ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
  144. },
  145. UpdateFunc: ksm.handleUpdateLeaseEvent,
  146. DeleteFunc: func(obj interface{}) {
  147. ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
  148. },
  149. },
  150. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  151. )
  152. ksm.nodeController = controller
  153. ksm.nodeStore = listers.NewNodeLister(indexer)
  154. return &ksm, nil
  155. }
  156. func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
  157. n := obj.(*v1.Node)
  158. if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
  159. return
  160. }
  161. l, err := ksm.nodeToLease(*n)
  162. if err != nil {
  163. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  164. return
  165. }
  166. ksm.events <- subnet.Event{et, l}
  167. }
  168. func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
  169. o := oldObj.(*v1.Node)
  170. n := newObj.(*v1.Node)
  171. if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
  172. return
  173. }
  174. if o.Annotations[ksm.annotations.BackendData] == n.Annotations[ksm.annotations.BackendData] &&
  175. o.Annotations[ksm.annotations.BackendType] == n.Annotations[ksm.annotations.BackendType] &&
  176. o.Annotations[ksm.annotations.BackendPublicIP] == n.Annotations[ksm.annotations.BackendPublicIP] {
  177. return // No change to lease
  178. }
  179. l, err := ksm.nodeToLease(*n)
  180. if err != nil {
  181. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  182. return
  183. }
  184. ksm.events <- subnet.Event{subnet.EventAdded, l}
  185. }
  186. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
  187. return ksm.subnetConf, nil
  188. }
  189. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  190. cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
  191. if err != nil {
  192. return nil, err
  193. }
  194. nobj, err := api.Scheme.DeepCopy(cachedNode)
  195. if err != nil {
  196. return nil, err
  197. }
  198. n := nobj.(*v1.Node)
  199. if n.Spec.PodCIDR == "" {
  200. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  201. }
  202. bd, err := attrs.BackendData.MarshalJSON()
  203. if err != nil {
  204. return nil, err
  205. }
  206. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  207. if err != nil {
  208. return nil, err
  209. }
  210. if n.Annotations[ksm.annotations.BackendData] != string(bd) ||
  211. n.Annotations[ksm.annotations.BackendType] != attrs.BackendType ||
  212. n.Annotations[ksm.annotations.BackendPublicIP] != attrs.PublicIP.String() ||
  213. n.Annotations[ksm.annotations.SubnetKubeManaged] != "true" ||
  214. (n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" && n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != attrs.PublicIP.String()) {
  215. n.Annotations[ksm.annotations.BackendType] = attrs.BackendType
  216. n.Annotations[ksm.annotations.BackendData] = string(bd)
  217. if n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" {
  218. if n.Annotations[ksm.annotations.BackendPublicIP] != n.Annotations[ksm.annotations.BackendPublicIPOverwrite] {
  219. glog.Infof("Overriding public ip with '%s' from node annotation '%s'",
  220. n.Annotations[ksm.annotations.BackendPublicIPOverwrite],
  221. ksm.annotations.BackendPublicIPOverwrite)
  222. n.Annotations[ksm.annotations.BackendPublicIP] = n.Annotations[ksm.annotations.BackendPublicIPOverwrite]
  223. }
  224. } else {
  225. n.Annotations[ksm.annotations.BackendPublicIP] = attrs.PublicIP.String()
  226. }
  227. n.Annotations[ksm.annotations.SubnetKubeManaged] = "true"
  228. oldData, err := json.Marshal(cachedNode)
  229. if err != nil {
  230. return nil, err
  231. }
  232. newData, err := json.Marshal(n)
  233. if err != nil {
  234. return nil, err
  235. }
  236. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  237. if err != nil {
  238. return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
  239. }
  240. _, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
  241. if err != nil {
  242. return nil, err
  243. }
  244. }
  245. return &subnet.Lease{
  246. Subnet: ip.FromIPNet(cidr),
  247. Attrs: *attrs,
  248. Expiration: time.Now().Add(24 * time.Hour),
  249. }, nil
  250. }
  251. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
  252. select {
  253. case event := <-ksm.events:
  254. return subnet.LeaseWatchResult{
  255. Events: []subnet.Event{event},
  256. }, nil
  257. case <-ctx.Done():
  258. return subnet.LeaseWatchResult{}, nil
  259. }
  260. }
  261. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  262. glog.Infof("Starting kube subnet manager")
  263. ksm.nodeController.Run(ctx.Done())
  264. }
  265. func (ksm *kubeSubnetManager) nodeToLease(n v1.Node) (l subnet.Lease, err error) {
  266. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[ksm.annotations.BackendPublicIP])
  267. if err != nil {
  268. return l, err
  269. }
  270. l.Attrs.BackendType = n.Annotations[ksm.annotations.BackendType]
  271. l.Attrs.BackendData = json.RawMessage(n.Annotations[ksm.annotations.BackendData])
  272. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  273. if err != nil {
  274. return l, err
  275. }
  276. l.Subnet = ip.FromIPNet(cidr)
  277. return l, nil
  278. }
  279. // unimplemented
  280. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
  281. return ErrUnimplemented
  282. }
  283. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  284. return subnet.LeaseWatchResult{}, ErrUnimplemented
  285. }
  286. func (ksm *kubeSubnetManager) Name() string {
  287. return fmt.Sprintf("Kubernetes Subnet Manager - %s", ksm.nodeName)
  288. }