kube.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright 2016 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package kube
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "time"
  23. "github.com/coreos/flannel/pkg/ip"
  24. "github.com/coreos/flannel/subnet"
  25. "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. "k8s.io/kubernetes/pkg/api"
  28. "k8s.io/kubernetes/pkg/client/cache"
  29. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  30. "k8s.io/kubernetes/pkg/client/restclient"
  31. "k8s.io/kubernetes/pkg/controller/framework"
  32. "k8s.io/kubernetes/pkg/runtime"
  33. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  34. "k8s.io/kubernetes/pkg/watch"
  35. )
  36. var (
  37. ErrUnimplemented = errors.New("unimplemented")
  38. kubeSubnetCfg *subnet.Config
  39. )
  40. const (
  41. resyncPeriod = 5 * time.Minute
  42. subnetKubeManagedAnnotation = "flannel.alpha.coreos.com/kube-subnet-manager"
  43. backendDataAnnotation = "flannel.alpha.coreos.com/backend-data"
  44. backendTypeAnnotation = "flannel.alpha.coreos.com/backend-type"
  45. backendPublicIPAnnotation = "flannel.alpha.coreos.com/public-ip"
  46. netConfPath = "/etc/kube-flannel/net-conf.json"
  47. )
  48. type kubeSubnetManager struct {
  49. client clientset.Interface
  50. nodeName string
  51. nodeStore cache.StoreToNodeLister
  52. nodeController *framework.Controller
  53. subnetConf *subnet.Config
  54. }
  55. func NewSubnetManager() (subnet.Manager, error) {
  56. cfg, err := restclient.InClusterConfig()
  57. if err != nil {
  58. return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
  59. }
  60. c, err := clientset.NewForConfig(cfg)
  61. if err != nil {
  62. return nil, fmt.Errorf("unable to initialize client: %v", err)
  63. }
  64. podName := os.Getenv("POD_NAME")
  65. podNamespace := os.Getenv("POD_NAMESPACE")
  66. if podName == "" || podNamespace == "" {
  67. return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
  68. }
  69. pod, err := c.Pods(podNamespace).Get(podName)
  70. if err != nil {
  71. return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
  72. }
  73. nodeName := pod.Spec.NodeName
  74. if nodeName == "" {
  75. return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
  76. }
  77. netConf, err := ioutil.ReadFile(netConfPath)
  78. if err != nil {
  79. return nil, fmt.Errorf("failed to read net conf: %v", err)
  80. }
  81. sc, err := subnet.ParseConfig(string(netConf))
  82. if err != nil {
  83. return nil, fmt.Errorf("error parsing subnet config: %s", err)
  84. }
  85. sm, err := newKubeSubnetManager(c, sc, nodeName)
  86. if err != nil {
  87. return nil, fmt.Errorf("error creating network manager: %s", err)
  88. }
  89. go sm.Run(context.Background())
  90. return sm, err
  91. }
  92. func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
  93. var ksm kubeSubnetManager
  94. ksm.client = c
  95. ksm.nodeName = nodeName
  96. ksm.subnetConf = sc
  97. ksm.nodeStore.Store, ksm.nodeController = framework.NewInformer(
  98. &cache.ListWatch{
  99. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  100. return ksm.client.Core().Nodes().List(options)
  101. },
  102. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  103. return ksm.client.Core().Nodes().Watch(options)
  104. },
  105. },
  106. &api.Node{},
  107. resyncPeriod,
  108. framework.ResourceEventHandlerFuncs{},
  109. )
  110. return &ksm, nil
  111. }
  112. func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
  113. return ksm.subnetConf, nil
  114. }
  115. func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
  116. nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
  117. if err != nil {
  118. return nil, err
  119. }
  120. if !found {
  121. return nil, fmt.Errorf("node %q not found", ksm.nodeName)
  122. }
  123. n, ok := nobj.(*api.Node)
  124. if !ok {
  125. return nil, fmt.Errorf("nobj was not a *api.Node")
  126. }
  127. if n.Spec.PodCIDR == "" {
  128. return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
  129. }
  130. bd, err := attrs.BackendData.MarshalJSON()
  131. if err != nil {
  132. return nil, err
  133. }
  134. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  135. if err != nil {
  136. return nil, err
  137. }
  138. if n.Annotations[backendDataAnnotation] != string(bd) ||
  139. n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
  140. n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
  141. n.Annotations[subnetKubeManagedAnnotation] != "true" {
  142. n.Annotations[backendTypeAnnotation] = attrs.BackendType
  143. n.Annotations[backendDataAnnotation] = string(bd)
  144. n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
  145. n.Annotations[subnetKubeManagedAnnotation] = "true"
  146. n, err = ksm.client.Core().Nodes().Update(n)
  147. if err != nil {
  148. return nil, err
  149. }
  150. }
  151. return &subnet.Lease{
  152. Subnet: ip.FromIPNet(cidr),
  153. Attrs: *attrs,
  154. Expiration: time.Now().Add(24 * time.Hour),
  155. }, nil
  156. }
  157. func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
  158. return nil
  159. }
  160. func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
  161. time.Sleep(time.Second)
  162. nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
  163. if err != nil {
  164. return subnet.LeaseWatchResult{}, err
  165. }
  166. if !found {
  167. return subnet.LeaseWatchResult{}, fmt.Errorf("node %q not found", ksm.nodeName)
  168. }
  169. n, ok := nobj.(*api.Node)
  170. if !ok {
  171. return subnet.LeaseWatchResult{}, fmt.Errorf("nobj was not a *api.Node")
  172. }
  173. l, err := nodeToLease(*n)
  174. if err != nil {
  175. return subnet.LeaseWatchResult{}, err
  176. }
  177. return subnet.LeaseWatchResult{
  178. Snapshot: []subnet.Lease{l},
  179. }, nil
  180. }
  181. func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
  182. time.Sleep(time.Second)
  183. leases := make([]subnet.Lease, 0)
  184. nl, err := ksm.nodeStore.List()
  185. if err != nil {
  186. return subnet.LeaseWatchResult{}, err
  187. }
  188. for _, n := range nl.Items {
  189. if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
  190. continue
  191. }
  192. l, err := nodeToLease(n)
  193. if err != nil {
  194. glog.Infof("error turning node %q to lease: %v", n.ObjectMeta.Name, err)
  195. continue
  196. }
  197. leases = append(leases, l)
  198. }
  199. return subnet.LeaseWatchResult{
  200. Snapshot: leases,
  201. }, nil
  202. }
  203. func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {
  204. time.Sleep(time.Second)
  205. return subnet.NetworkWatchResult{
  206. Snapshot: []string{""},
  207. }, nil
  208. }
  209. func (ksm *kubeSubnetManager) Run(ctx context.Context) {
  210. defer utilruntime.HandleCrash()
  211. glog.Infof("starting kube subnet manager")
  212. ksm.nodeController.Run(ctx.Done())
  213. }
  214. func nodeToLease(n api.Node) (l subnet.Lease, err error) {
  215. l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[backendPublicIPAnnotation])
  216. if err != nil {
  217. return l, err
  218. }
  219. l.Attrs.BackendType = n.Annotations[backendTypeAnnotation]
  220. l.Attrs.BackendData = json.RawMessage(n.Annotations[backendDataAnnotation])
  221. _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
  222. if err != nil {
  223. return l, err
  224. }
  225. l.Subnet = ip.FromIPNet(cidr)
  226. l.Expiration = time.Now().Add(24 * time.Hour)
  227. return l, nil
  228. }
  229. // unimplemented
  230. func (ksm *kubeSubnetManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
  231. return ErrUnimplemented
  232. }
  233. func (ksm *kubeSubnetManager) AddReservation(ctx context.Context, network string, r *subnet.Reservation) error {
  234. return ErrUnimplemented
  235. }
  236. func (ksm *kubeSubnetManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
  237. return ErrUnimplemented
  238. }
  239. func (ksm *kubeSubnetManager) ListReservations(ctx context.Context, network string) ([]subnet.Reservation, error) {
  240. return nil, ErrUnimplemented
  241. }