common.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package backend
  15. import (
  16. "bytes"
  17. "net"
  18. "sync"
  19. "time"
  20. "golang.org/x/net/context"
  21. "github.com/coreos/flannel/subnet"
  22. log "github.com/golang/glog"
  23. "github.com/vishvananda/netlink"
  24. )
  25. type ExternalInterface struct {
  26. Iface *net.Interface
  27. IfaceAddr net.IP
  28. ExtAddr net.IP
  29. }
  30. // Besides the entry points in the Backend interface, the backend's New()
  31. // function receives static network interface information (like internal and
  32. // external IP addresses, MTU, etc) which it should cache for later use if
  33. // needed.
  34. type Backend interface {
  35. // Called when the backend should create or begin managing a new network
  36. RegisterNetwork(ctx context.Context, config *subnet.Config) (Network, error)
  37. }
  38. type Network interface {
  39. Lease() *subnet.Lease
  40. MTU() int
  41. Run(ctx context.Context)
  42. }
  43. type BackendCtor func(sm subnet.Manager, ei *ExternalInterface) (Backend, error)
  44. type SimpleNetwork struct {
  45. SubnetLease *subnet.Lease
  46. ExtIface *ExternalInterface
  47. }
  48. func (n *SimpleNetwork) Lease() *subnet.Lease {
  49. return n.SubnetLease
  50. }
  51. func (n *SimpleNetwork) MTU() int {
  52. return n.ExtIface.Iface.MTU
  53. }
  54. func (_ *SimpleNetwork) Run(ctx context.Context) {
  55. <-ctx.Done()
  56. }
  57. const (
  58. routeCheckRetries = 10
  59. )
  60. type RouteNetwork struct {
  61. SimpleNetwork
  62. BackendType string
  63. routes []netlink.Route
  64. SM subnet.Manager
  65. GetRoute func(lease *subnet.Lease) *netlink.Route
  66. Mtu int
  67. LinkIndex int
  68. }
  69. func (n *RouteNetwork) MTU() int {
  70. return n.Mtu
  71. }
  72. func (n *RouteNetwork) Run(ctx context.Context) {
  73. wg := sync.WaitGroup{}
  74. log.Info("Watching for new subnet leases")
  75. evts := make(chan []subnet.Event)
  76. wg.Add(1)
  77. go func() {
  78. subnet.WatchLeases(ctx, n.SM, n.SubnetLease, evts)
  79. wg.Done()
  80. }()
  81. n.routes = make([]netlink.Route, 0, 10)
  82. wg.Add(1)
  83. go func() {
  84. n.routeCheck(ctx)
  85. wg.Done()
  86. }()
  87. defer wg.Wait()
  88. for {
  89. select {
  90. case evtBatch := <-evts:
  91. n.handleSubnetEvents(evtBatch)
  92. case <-ctx.Done():
  93. return
  94. }
  95. }
  96. }
  97. func (n *RouteNetwork) handleSubnetEvents(batch []subnet.Event) {
  98. for _, evt := range batch {
  99. switch evt.Type {
  100. case subnet.EventAdded:
  101. log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)
  102. if evt.Lease.Attrs.BackendType != n.BackendType {
  103. log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
  104. continue
  105. }
  106. route := n.GetRoute(&evt.Lease)
  107. n.addToRouteList(*route)
  108. // Check if route exists before attempting to add it
  109. routeList, err := netlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{Dst: route.Dst}, netlink.RT_FILTER_DST)
  110. if err != nil {
  111. log.Warningf("Unable to list routes: %v", err)
  112. }
  113. if len(routeList) > 0 && !routeEqual(routeList[0], *route) {
  114. // Same Dst different Gw or different link index. Remove it, correct route will be added below.
  115. log.Warningf("Replacing existing route to %v via %v dev index %d with %v via %v dev index %d.", evt.Lease.Subnet, routeList[0].Gw, routeList[0].LinkIndex, evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, route.LinkIndex)
  116. if err := netlink.RouteDel(&routeList[0]); err != nil {
  117. log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
  118. continue
  119. }
  120. n.removeFromRouteList(routeList[0])
  121. }
  122. if len(routeList) > 0 && routeEqual(routeList[0], *route) {
  123. // Same Dst and same Gw, keep it and do not attempt to add it.
  124. log.Infof("Route to %v via %v dev index %d already exists, skipping.", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, routeList[0].LinkIndex)
  125. } else if err := netlink.RouteAdd(route); err != nil {
  126. log.Errorf("Error adding route to %v via %v dev index %d: %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, route.LinkIndex, err)
  127. continue
  128. }
  129. case subnet.EventRemoved:
  130. log.Info("Subnet removed: ", evt.Lease.Subnet)
  131. if evt.Lease.Attrs.BackendType != n.BackendType {
  132. log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
  133. continue
  134. }
  135. route := n.GetRoute(&evt.Lease)
  136. // Always remove the route from the route list.
  137. n.removeFromRouteList(*route)
  138. if err := netlink.RouteDel(route); err != nil {
  139. log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
  140. continue
  141. }
  142. default:
  143. log.Error("Internal error: unknown event type: ", int(evt.Type))
  144. }
  145. }
  146. }
  147. func (n *RouteNetwork) addToRouteList(route netlink.Route) {
  148. for _, r := range n.routes {
  149. if routeEqual(r, route) {
  150. return
  151. }
  152. }
  153. n.routes = append(n.routes, route)
  154. }
  155. func (n *RouteNetwork) removeFromRouteList(route netlink.Route) {
  156. for index, r := range n.routes {
  157. if routeEqual(r, route) {
  158. n.routes = append(n.routes[:index], n.routes[index+1:]...)
  159. return
  160. }
  161. }
  162. }
  163. func (n *RouteNetwork) routeCheck(ctx context.Context) {
  164. for {
  165. select {
  166. case <-ctx.Done():
  167. return
  168. case <-time.After(routeCheckRetries * time.Second):
  169. n.checkSubnetExistInRoutes()
  170. }
  171. }
  172. }
  173. func (n *RouteNetwork) checkSubnetExistInRoutes() {
  174. routeList, err := netlink.RouteList(nil, netlink.FAMILY_V4)
  175. if err == nil {
  176. for _, route := range n.routes {
  177. exist := false
  178. for _, r := range routeList {
  179. if r.Dst == nil {
  180. continue
  181. }
  182. if routeEqual(r, route) {
  183. exist = true
  184. break
  185. }
  186. }
  187. if !exist {
  188. if err := netlink.RouteAdd(&route); err != nil {
  189. if nerr, ok := err.(net.Error); !ok {
  190. log.Errorf("Error recovering route to %v: %v, %v", route.Dst, route.Gw, nerr)
  191. }
  192. continue
  193. } else {
  194. log.Infof("Route recovered %v : %v", route.Dst, route.Gw)
  195. }
  196. }
  197. }
  198. } else {
  199. log.Errorf("Error fetching route list. Will automatically retry: %v", err)
  200. }
  201. }
  202. func routeEqual(x, y netlink.Route) bool {
  203. // For ipip backend, when enabling directrouting, link index of some routes may change
  204. // For both ipip and host-gw backend, link index may also change if updating ExtIface
  205. if x.Dst.IP.Equal(y.Dst.IP) && x.Gw.Equal(y.Gw) && bytes.Equal(x.Dst.Mask, y.Dst.Mask) && x.LinkIndex == y.LinkIndex {
  206. return true
  207. }
  208. return false
  209. }