kube.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/coreos/flannel/pkg/ip"
  24. "github.com/coreos/flannel/subnet"
  25. "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/apimachinery/pkg/types"
  30. "k8s.io/apimachinery/pkg/util/strategicpatch"
  31. "k8s.io/apimachinery/pkg/util/wait"
  32. "k8s.io/apimachinery/pkg/watch"
  33. clientset "k8s.io/client-go/kubernetes"
  34. listers "k8s.io/client-go/listers/core/v1"
  35. "k8s.io/client-go/pkg/api"
  36. "k8s.io/client-go/pkg/api/v1"
  37. "k8s.io/client-go/rest"
  38. "k8s.io/client-go/tools/cache"
  39. )
  40. var (
  41. ErrUnimplemented = errors.New("unimplemented")
  42. )
  43. const (
  44. resyncPeriod = 5 * time.Minute
  45. nodeControllerSyncTimeout = 10 * time.Minute
  46. subnetKubeManagedAnnotation = "flannel.alpha.coreos.com/kube-subnet-manager"
  47. backendDataAnnotation = "flannel.alpha.coreos.com/backend-data"
  48. backendTypeAnnotation = "flannel.alpha.coreos.com/backend-type"
  49. backendPublicIPAnnotation = "flannel.alpha.coreos.com/public-ip"
  50. netConfPath = "/etc/kube-flannel/net-conf.json"
  51. )
  52. type kubeSubnetManager struct {
  53. client clientset.Interface
  54. nodeName string
  55. nodeStore listers.NodeLister
  56. nodeController cache.Controller
  57. subnetConf *subnet.Config
  58. events chan subnet.Event
  59. selfEvents chan subnet.Event
  60. }
  61. func NewSubnetManager() (subnet.Manager, error) {
  62. cfg, err := rest.InClusterConfig()
  63. if err != nil {
  64. return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
  65. }
  66. c, err := clientset.NewForConfig(cfg)
  67. if err != nil {
  68. return nil, fmt.Errorf("unable to initialize client: %v", err)
  69. }
  70. podName := os.Getenv("POD_NAME")
  71. podNamespace := os.Getenv("POD_NAMESPACE")
  72. if podName == "" || podNamespace == "" {
  73. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  74. }
  75. pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
  76. if err != nil {
  77. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  78. }
  79. nodeName := pod.Spec.NodeName
  80. if nodeName == "" {
  81. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  82. }
  83. netConf, err := ioutil.ReadFile(netConfPath)
  84. if err != nil {
  85. return nil, fmt.Errorf("failed to read net conf: %v", err)
  86. }
  87. sc, err := subnet.ParseConfig(string(netConf))
  88. if err != nil {
  89. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  90. }
  91. sm, err := newKubeSubnetManager(c, sc, nodeName)
  92. if err != nil {
  93. return nil, fmt.Errorf("error creating network manager: %s", err)
  94. }
  95. go sm.Run(context.Background())
  96. glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
  97. err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
  98. return sm.nodeController.HasSynced(), nil
  99. })
  100. if err != nil {
  101. return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
  102. }
  103. glog.Infof("Node controller sync successful")
  104. return sm, nil
  105. }
  106. func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
  107. var ksm kubeSubnetManager
  108. ksm.client = c
  109. ksm.nodeName = nodeName
  110. ksm.subnetConf = sc
  111. ksm.events = make(chan subnet.Event, 100)
  112. ksm.selfEvents = make(chan subnet.Event, 100)
  113. indexer, controller := cache.NewIndexerInformer(
  114. &cache.ListWatch{
  115. ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  116. return ksm.client.Core().Nodes().List(options)
  117. },
  118. WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  119. return ksm.client.Core().Nodes().Watch(options)
  120. },
  121. },
  122. &v1.Node{},
  123. resyncPeriod,
  124. cache.ResourceEventHandlerFuncs{
  125. AddFunc: func(obj interface{}) {
  126. ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
  127. },
  128. UpdateFunc: ksm.handleUpdateLeaseEvent,
  129. DeleteFunc: func(obj interface{}) {
  130. ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
  131. },
  132. },
  133. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  134. )
  135. ksm.nodeController = controller
  136. ksm.nodeStore = listers.NewNodeLister(indexer)
  137. return &ksm, nil
  138. }
  139. func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
  140. n := obj.(*v1.Node)
  141. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  142. return
  143. }
  144. l, err := nodeToLease(*n)
  145. if err != nil {
  146. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  147. return
  148. }
  149. ksm.events <- subnet.Event{et, l}
  150. if n.ObjectMeta.Name == ksm.nodeName {
  151. ksm.selfEvents <- subnet.Event{et, l}
  152. }
  153. }
  154. func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
  155. o := oldObj.(*v1.Node)
  156. n := newObj.(*v1.Node)
  157. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  158. return
  159. }
  160. if o.Annotations[backendDataAnnotation] == n.Annotations[backendDataAnnotation] &&
  161. o.Annotations[backendTypeAnnotation] == n.Annotations[backendTypeAnnotation] &&
  162. o.Annotations[backendPublicIPAnnotation] == n.Annotations[backendPublicIPAnnotation] {
  163. return // No change to lease
  164. }
  165. l, err := nodeToLease(*n)
  166. if err != nil {
  167. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  168. return
  169. }
  170. ksm.events <- subnet.Event{subnet.EventAdded, l}
  171. if n.ObjectMeta.Name == ksm.nodeName {
  172. ksm.selfEvents <- subnet.Event{subnet.EventAdded, l}
  173. }
  174. }
  175. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
  176. return ksm.subnetConf, nil
  177. }
  178. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  179. cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
  180. if err != nil {
  181. return nil, err
  182. }
  183. nobj, err := api.Scheme.DeepCopy(cachedNode)
  184. if err != nil {
  185. return nil, err
  186. }
  187. n := nobj.(*v1.Node)
  188. if n.Spec.PodCIDR == "" {
  189. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  190. }
  191. bd, err := attrs.BackendData.MarshalJSON()
  192. if err != nil {
  193. return nil, err
  194. }
  195. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  196. if err != nil {
  197. return nil, err
  198. }
  199. if n.Annotations[backendDataAnnotation] != string(bd) ||
  200. n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
  201. n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
  202. n.Annotations[subnetKubeManagedAnnotation] != "true" {
  203. n.Annotations[backendTypeAnnotation] = attrs.BackendType
  204. n.Annotations[backendDataAnnotation] = string(bd)
  205. n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
  206. n.Annotations[subnetKubeManagedAnnotation] = "true"
  207. oldData, err := json.Marshal(cachedNode)
  208. if err != nil {
  209. return nil, err
  210. }
  211. newData, err := json.Marshal(n)
  212. if err != nil {
  213. return nil, err
  214. }
  215. patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
  216. if err != nil {
  217. return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
  218. }
  219. _, err = ksm.client.Core().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
  220. if err != nil {
  221. return nil, err
  222. }
  223. }
  224. return &subnet.Lease{
  225. Subnet: ip.FromIPNet(cidr),
  226. Attrs: *attrs,
  227. Expiration: time.Now().Add(24 * time.Hour),
  228. }, nil
  229. }
  230. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
  231. l, err := ksm.AcquireLease(ctx, &lease.Attrs)
  232. if err != nil {
  233. return err
  234. }
  235. lease.Subnet = l.Subnet
  236. lease.Attrs = l.Attrs
  237. lease.Expiration = l.Expiration
  238. return nil
  239. }
  240. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  241. select {
  242. case event := <-ksm.selfEvents:
  243. return subnet.LeaseWatchResult{
  244. Events: []subnet.Event{event},
  245. }, nil
  246. case <-ctx.Done():
  247. return subnet.LeaseWatchResult{}, nil
  248. }
  249. }
  250. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
  251. select {
  252. case event := <-ksm.events:
  253. return subnet.LeaseWatchResult{
  254. Events: []subnet.Event{event},
  255. }, nil
  256. case <-ctx.Done():
  257. return subnet.LeaseWatchResult{}, nil
  258. }
  259. }
  260. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  261. glog.Infof("starting kube subnet manager")
  262. ksm.nodeController.Run(ctx.Done())
  263. }
  264. func nodeToLease(n v1.Node) (l subnet.Lease, err error) {
  265. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[backendPublicIPAnnotation])
  266. if err != nil {
  267. return l, err
  268. }
  269. l.Attrs.BackendType = n.Annotations[backendTypeAnnotation]
  270. l.Attrs.BackendData = json.RawMessage(n.Annotations[backendDataAnnotation])
  271. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  272. if err != nil {
  273. return l, err
  274. }
  275. l.Subnet = ip.FromIPNet(cidr)
  276. l.Expiration = time.Now().Add(24 * time.Hour)
  277. return l, nil
  278. }
  279. // unimplemented
  280. func (ksm *kubeSubnetManager) RevokeLease(ctx context.Context, sn ip.IP4Net) error {
  281. return ErrUnimplemented
  282. }
  283. func (ksm *kubeSubnetManager) AddReservation(ctx context.Context, r *subnet.Reservation) error {
  284. return ErrUnimplemented
  285. }
  286. func (ksm *kubeSubnetManager) RemoveReservation(ctx context.Context, subnet ip.IP4Net) error {
  287. return ErrUnimplemented
  288. }
  289. func (ksm *kubeSubnetManager) ListReservations(ctx context.Context) ([]subnet.Reservation, error) {
  290. return nil, ErrUnimplemented
  291. }