123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- package kube
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "net"
- "os"
- "time"
- "github.com/coreos/flannel/pkg/ip"
- "github.com/coreos/flannel/subnet"
- "github.com/golang/glog"
- "golang.org/x/net/context"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- clientset "k8s.io/client-go/kubernetes"
- listers "k8s.io/client-go/listers/core/v1"
- "k8s.io/client-go/pkg/api"
- "k8s.io/client-go/pkg/api/v1"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/clientcmd"
- )
- var (
- ErrUnimplemented = errors.New("unimplemented")
- )
- const (
- resyncPeriod = 5 * time.Minute
- nodeControllerSyncTimeout = 10 * time.Minute
- netConfPath = "/etc/kube-flannel/net-conf.json"
- )
- type kubeSubnetManager struct {
- annotations annotations
- client clientset.Interface
- nodeName string
- nodeStore listers.NodeLister
- nodeController cache.Controller
- subnetConf *subnet.Config
- events chan subnet.Event
- }
- func NewSubnetManager(apiUrl, kubeconfig, prefix string) (subnet.Manager, error) {
- var cfg *rest.Config
- var err error
-
- if apiUrl != "" || kubeconfig != "" {
- cfg, err = clientcmd.BuildConfigFromFlags(apiUrl, kubeconfig)
- if err != nil {
- return nil, fmt.Errorf("unable to create k8s config: %v", err)
- }
- } else {
- cfg, err = rest.InClusterConfig()
- if err != nil {
- return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
- }
- }
- c, err := clientset.NewForConfig(cfg)
- if err != nil {
- return nil, fmt.Errorf("unable to initialize client: %v", err)
- }
-
-
-
- nodeName := os.Getenv("NODE_NAME")
- if nodeName == "" {
- podName := os.Getenv("POD_NAME")
- podNamespace := os.Getenv("POD_NAMESPACE")
- if podName == "" || podNamespace == "" {
- return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
- }
- pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
- if err != nil {
- return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
- }
- nodeName = pod.Spec.NodeName
- if nodeName == "" {
- return nil, fmt.Errorf("node name not present in pod spec '%s/%s'", podNamespace, podName)
- }
- }
- netConf, err := ioutil.ReadFile(netConfPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read net conf: %v", err)
- }
- sc, err := subnet.ParseConfig(string(netConf))
- if err != nil {
- return nil, fmt.Errorf("error parsing subnet config: %s", err)
- }
- sm, err := newKubeSubnetManager(c, sc, nodeName, prefix)
- if err != nil {
- return nil, fmt.Errorf("error creating network manager: %s", err)
- }
- go sm.Run(context.Background())
- glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
- err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
- return sm.nodeController.HasSynced(), nil
- })
- if err != nil {
- return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
- }
- glog.Infof("Node controller sync successful")
- return sm, nil
- }
- func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
- var err error
- var ksm kubeSubnetManager
- ksm.annotations, err = newAnnotations(prefix)
- if err != nil {
- return nil, err
- }
- ksm.client = c
- ksm.nodeName = nodeName
- ksm.subnetConf = sc
- ksm.events = make(chan subnet.Event, 5000)
- indexer, controller := cache.NewIndexerInformer(
- &cache.ListWatch{
- ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
- return ksm.client.CoreV1().Nodes().List(options)
- },
- WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
- return ksm.client.CoreV1().Nodes().Watch(options)
- },
- },
- &v1.Node{},
- resyncPeriod,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
- },
- UpdateFunc: ksm.handleUpdateLeaseEvent,
- DeleteFunc: func(obj interface{}) {
- node, isNode := obj.(*v1.Node)
-
- if !isNode {
- deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- glog.Infof("Error received unexpected object: %v", obj)
- return
- }
- node, ok = deletedState.Obj.(*v1.Node)
- if !ok {
- glog.Infof("Error deletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
- return
- }
- obj = node
- }
- ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
- },
- },
- cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
- )
- ksm.nodeController = controller
- ksm.nodeStore = listers.NewNodeLister(indexer)
- return &ksm, nil
- }
- func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
- n := obj.(*v1.Node)
- if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
- return
- }
- l, err := ksm.nodeToLease(*n)
- if err != nil {
- glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
- return
- }
- ksm.events <- subnet.Event{et, l}
- }
- func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
- o := oldObj.(*v1.Node)
- n := newObj.(*v1.Node)
- if s, ok := n.Annotations[ksm.annotations.SubnetKubeManaged]; !ok || s != "true" {
- return
- }
- if o.Annotations[ksm.annotations.BackendData] == n.Annotations[ksm.annotations.BackendData] &&
- o.Annotations[ksm.annotations.BackendType] == n.Annotations[ksm.annotations.BackendType] &&
- o.Annotations[ksm.annotations.BackendPublicIP] == n.Annotations[ksm.annotations.BackendPublicIP] {
- return
- }
- l, err := ksm.nodeToLease(*n)
- if err != nil {
- glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
- return
- }
- ksm.events <- subnet.Event{subnet.EventAdded, l}
- }
- func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
- return ksm.subnetConf, nil
- }
- func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
- cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
- if err != nil {
- return nil, err
- }
- nobj, err := api.Scheme.DeepCopy(cachedNode)
- if err != nil {
- return nil, err
- }
- n := nobj.(*v1.Node)
- if n.Spec.PodCIDR == "" {
- return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
- }
- bd, err := attrs.BackendData.MarshalJSON()
- if err != nil {
- return nil, err
- }
- _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
- if err != nil {
- return nil, err
- }
- if n.Annotations[ksm.annotations.BackendData] != string(bd) ||
- n.Annotations[ksm.annotations.BackendType] != attrs.BackendType ||
- n.Annotations[ksm.annotations.BackendPublicIP] != attrs.PublicIP.String() ||
- n.Annotations[ksm.annotations.SubnetKubeManaged] != "true" ||
- (n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" && n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != attrs.PublicIP.String()) {
- n.Annotations[ksm.annotations.BackendType] = attrs.BackendType
- n.Annotations[ksm.annotations.BackendData] = string(bd)
- if n.Annotations[ksm.annotations.BackendPublicIPOverwrite] != "" {
- if n.Annotations[ksm.annotations.BackendPublicIP] != n.Annotations[ksm.annotations.BackendPublicIPOverwrite] {
- glog.Infof("Overriding public ip with '%s' from node annotation '%s'",
- n.Annotations[ksm.annotations.BackendPublicIPOverwrite],
- ksm.annotations.BackendPublicIPOverwrite)
- n.Annotations[ksm.annotations.BackendPublicIP] = n.Annotations[ksm.annotations.BackendPublicIPOverwrite]
- }
- } else {
- n.Annotations[ksm.annotations.BackendPublicIP] = attrs.PublicIP.String()
- }
- n.Annotations[ksm.annotations.SubnetKubeManaged] = "true"
- oldData, err := json.Marshal(cachedNode)
- if err != nil {
- return nil, err
- }
- newData, err := json.Marshal(n)
- if err != nil {
- return nil, err
- }
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
- if err != nil {
- return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
- }
- _, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
- if err != nil {
- return nil, err
- }
- }
- return &subnet.Lease{
- Subnet: ip.FromIPNet(cidr),
- Attrs: *attrs,
- Expiration: time.Now().Add(24 * time.Hour),
- }, nil
- }
- func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
- select {
- case event := <-ksm.events:
- return subnet.LeaseWatchResult{
- Events: []subnet.Event{event},
- }, nil
- case <-ctx.Done():
- return subnet.LeaseWatchResult{}, nil
- }
- }
- func (ksm *kubeSubnetManager) Run(ctx context.Context) {
- glog.Infof("Starting kube subnet manager")
- ksm.nodeController.Run(ctx.Done())
- }
- func (ksm *kubeSubnetManager) nodeToLease(n v1.Node) (l subnet.Lease, err error) {
- l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[ksm.annotations.BackendPublicIP])
- if err != nil {
- return l, err
- }
- l.Attrs.BackendType = n.Annotations[ksm.annotations.BackendType]
- l.Attrs.BackendData = json.RawMessage(n.Annotations[ksm.annotations.BackendData])
- _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
- if err != nil {
- return l, err
- }
- l.Subnet = ip.FromIPNet(cidr)
- return l, nil
- }
- func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
- return ErrUnimplemented
- }
- func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
- return subnet.LeaseWatchResult{}, ErrUnimplemented
- }
- func (ksm *kubeSubnetManager) Name() string {
- return fmt.Sprintf("Kubernetes Subnet Manager - %s", ksm.nodeName)
- }
|