kube.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/coreos/flannel/pkg/ip"
  24. "github.com/coreos/flannel/subnet"
  25. "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. "k8s.io/kubernetes/pkg/api"
  28. "k8s.io/kubernetes/pkg/client/cache"
  29. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  30. "k8s.io/kubernetes/pkg/client/restclient"
  31. "k8s.io/kubernetes/pkg/controller/framework"
  32. "k8s.io/kubernetes/pkg/runtime"
  33. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  34. "k8s.io/kubernetes/pkg/util/wait"
  35. "k8s.io/kubernetes/pkg/watch"
  36. )
  37. var (
  38. ErrUnimplemented = errors.New("unimplemented")
  39. kubeSubnetCfg *subnet.Config
  40. )
  41. const (
  42. resyncPeriod = 5 * time.Minute
  43. subnetKubeManagedAnnotation = "flannel.alpha.coreos.com/kube-subnet-manager"
  44. backendDataAnnotation = "flannel.alpha.coreos.com/backend-data"
  45. backendTypeAnnotation = "flannel.alpha.coreos.com/backend-type"
  46. backendPublicIPAnnotation = "flannel.alpha.coreos.com/public-ip"
  47. netConfPath = "/etc/kube-flannel/net-conf.json"
  48. )
  49. type kubeSubnetManager struct {
  50. client clientset.Interface
  51. nodeName string
  52. nodeStore cache.StoreToNodeLister
  53. nodeController *framework.Controller
  54. subnetConf *subnet.Config
  55. events chan subnet.Event
  56. selfEvents chan subnet.Event
  57. }
  58. func NewSubnetManager() (subnet.Manager, error) {
  59. cfg, err := restclient.InClusterConfig()
  60. if err != nil {
  61. return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
  62. }
  63. c, err := clientset.NewForConfig(cfg)
  64. if err != nil {
  65. return nil, fmt.Errorf("unable to initialize client: %v", err)
  66. }
  67. podName := os.Getenv("POD_NAME")
  68. podNamespace := os.Getenv("POD_NAMESPACE")
  69. if podName == "" || podNamespace == "" {
  70. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  71. }
  72. pod, err := c.Pods(podNamespace).Get(podName)
  73. if err != nil {
  74. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  75. }
  76. nodeName := pod.Spec.NodeName
  77. if nodeName == "" {
  78. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  79. }
  80. netConf, err := ioutil.ReadFile(netConfPath)
  81. if err != nil {
  82. return nil, fmt.Errorf("failed to read net conf: %v", err)
  83. }
  84. sc, err := subnet.ParseConfig(string(netConf))
  85. if err != nil {
  86. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  87. }
  88. sm, err := newKubeSubnetManager(c, sc, nodeName)
  89. if err != nil {
  90. return nil, fmt.Errorf("error creating network manager: %s", err)
  91. }
  92. go sm.Run(context.Background())
  93. err = wait.Poll(time.Second, 10*time.Second, func() (bool, error) {
  94. return sm.nodeController.HasSynced(), nil
  95. })
  96. if err != nil {
  97. return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
  98. }
  99. return sm, nil
  100. }
  101. func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
  102. var ksm kubeSubnetManager
  103. ksm.client = c
  104. ksm.nodeName = nodeName
  105. ksm.subnetConf = sc
  106. ksm.events = make(chan subnet.Event, 100)
  107. ksm.selfEvents = make(chan subnet.Event, 100)
  108. ksm.nodeStore.Store, ksm.nodeController = framework.NewInformer(
  109. &cache.ListWatch{
  110. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  111. return ksm.client.Core().Nodes().List(options)
  112. },
  113. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  114. return ksm.client.Core().Nodes().Watch(options)
  115. },
  116. },
  117. &api.Node{},
  118. resyncPeriod,
  119. framework.ResourceEventHandlerFuncs{
  120. AddFunc: func(obj interface{}) {
  121. ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
  122. },
  123. UpdateFunc: ksm.handleUpdateLeaseEvent,
  124. DeleteFunc: func(obj interface{}) {
  125. ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
  126. },
  127. },
  128. )
  129. return &ksm, nil
  130. }
  131. func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
  132. n := obj.(*api.Node)
  133. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  134. return
  135. }
  136. l, err := nodeToLease(*n)
  137. if err != nil {
  138. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  139. return
  140. }
  141. ksm.events <- subnet.Event{et, l, ""}
  142. if n.ObjectMeta.Name == ksm.nodeName {
  143. ksm.selfEvents <- subnet.Event{et, l, ""}
  144. }
  145. }
  146. func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
  147. o := oldObj.(*api.Node)
  148. n := newObj.(*api.Node)
  149. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  150. return
  151. }
  152. if o.Annotations[backendDataAnnotation] == n.Annotations[backendDataAnnotation] &&
  153. o.Annotations[backendTypeAnnotation] == n.Annotations[backendTypeAnnotation] &&
  154. o.Annotations[backendPublicIPAnnotation] == n.Annotations[backendPublicIPAnnotation] {
  155. return // No change to lease
  156. }
  157. l, err := nodeToLease(*n)
  158. if err != nil {
  159. glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  160. return
  161. }
  162. ksm.events <- subnet.Event{subnet.EventAdded, l, ""}
  163. if n.ObjectMeta.Name == ksm.nodeName {
  164. ksm.selfEvents <- subnet.Event{subnet.EventAdded, l, ""}
  165. }
  166. }
  167. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
  168. return ksm.subnetConf, nil
  169. }
  170. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  171. nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
  172. if err != nil {
  173. return nil, err
  174. }
  175. if !found {
  176. return nil, fmt.Errorf("node %q not found", ksm.nodeName)
  177. }
  178. n, ok := nobj.(*api.Node)
  179. if !ok {
  180. return nil, fmt.Errorf("nobj was not a *api.Node")
  181. }
  182. if n.Spec.PodCIDR == "" {
  183. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  184. }
  185. bd, err := attrs.BackendData.MarshalJSON()
  186. if err != nil {
  187. return nil, err
  188. }
  189. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  190. if err != nil {
  191. return nil, err
  192. }
  193. if n.Annotations[backendDataAnnotation] != string(bd) ||
  194. n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
  195. n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
  196. n.Annotations[subnetKubeManagedAnnotation] != "true" {
  197. n.Annotations[backendTypeAnnotation] = attrs.BackendType
  198. n.Annotations[backendDataAnnotation] = string(bd)
  199. n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
  200. n.Annotations[subnetKubeManagedAnnotation] = "true"
  201. n, err = ksm.client.Core().Nodes().Update(n)
  202. if err != nil {
  203. return nil, err
  204. }
  205. }
  206. return &subnet.Lease{
  207. Subnet: ip.FromIPNet(cidr),
  208. Attrs: *attrs,
  209. Expiration: time.Now().Add(24 * time.Hour),
  210. }, nil
  211. }
  212. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
  213. l, err := ksm.AcquireLease(ctx, network, &lease.Attrs)
  214. if err != nil {
  215. return err
  216. }
  217. lease.Subnet = l.Subnet
  218. lease.Attrs = l.Attrs
  219. lease.Expiration = l.Expiration
  220. return nil
  221. }
  222. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  223. select {
  224. case event := <-ksm.selfEvents:
  225. return subnet.LeaseWatchResult{
  226. Events: []subnet.Event{event},
  227. }, nil
  228. case <-ctx.Done():
  229. return subnet.LeaseWatchResult{}, nil
  230. }
  231. }
  232. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
  233. select {
  234. case event := <-ksm.events:
  235. return subnet.LeaseWatchResult{
  236. Events: []subnet.Event{event},
  237. }, nil
  238. case <-ctx.Done():
  239. return subnet.LeaseWatchResult{}, nil
  240. }
  241. }
  242. func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {
  243. time.Sleep(time.Second)
  244. return subnet.NetworkWatchResult{
  245. Snapshot: []string{""},
  246. }, nil
  247. }
  248. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  249. defer utilruntime.HandleCrash()
  250. glog.Infof("starting kube subnet manager")
  251. ksm.nodeController.Run(ctx.Done())
  252. }
  253. func nodeToLease(n api.Node) (l subnet.Lease, err error) {
  254. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[backendPublicIPAnnotation])
  255. if err != nil {
  256. return l, err
  257. }
  258. l.Attrs.BackendType = n.Annotations[backendTypeAnnotation]
  259. l.Attrs.BackendData = json.RawMessage(n.Annotations[backendDataAnnotation])
  260. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  261. if err != nil {
  262. return l, err
  263. }
  264. l.Subnet = ip.FromIPNet(cidr)
  265. l.Expiration = time.Now().Add(24 * time.Hour)
  266. return l, nil
  267. }
  268. // unimplemented
  269. func (ksm *kubeSubnetManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
  270. return ErrUnimplemented
  271. }
  272. func (ksm *kubeSubnetManager) AddReservation(ctx context.Context, network string, r *subnet.Reservation) error {
  273. return ErrUnimplemented
  274. }
  275. func (ksm *kubeSubnetManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
  276. return ErrUnimplemented
  277. }
  278. func (ksm *kubeSubnetManager) ListReservations(ctx context.Context, network string) ([]subnet.Reservation, error) {
  279. return nil, ErrUnimplemented
  280. }