route_network.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // +build !windows
  2. // Copyright 2017 flannel authors
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package backend
  16. import (
  17. "bytes"
  18. "net"
  19. "sync"
  20. "time"
  21. "github.com/flannel-io/flannel/subnet"
  22. "github.com/vishvananda/netlink"
  23. "golang.org/x/net/context"
  24. log "k8s.io/klog"
  25. )
  26. const (
  27. routeCheckRetries = 10
  28. )
  29. type RouteNetwork struct {
  30. SimpleNetwork
  31. BackendType string
  32. routes []netlink.Route
  33. v6Routes []netlink.Route
  34. SM subnet.Manager
  35. GetRoute func(lease *subnet.Lease) *netlink.Route
  36. GetV6Route func(lease *subnet.Lease) *netlink.Route
  37. Mtu int
  38. LinkIndex int
  39. }
  40. func (n *RouteNetwork) MTU() int {
  41. return n.Mtu
  42. }
  43. func (n *RouteNetwork) Run(ctx context.Context) {
  44. wg := sync.WaitGroup{}
  45. log.Info("Watching for new subnet leases")
  46. evts := make(chan []subnet.Event)
  47. wg.Add(1)
  48. go func() {
  49. subnet.WatchLeases(ctx, n.SM, n.SubnetLease, evts)
  50. wg.Done()
  51. }()
  52. n.routes = make([]netlink.Route, 0, 10)
  53. wg.Add(1)
  54. go func() {
  55. n.routeCheck(ctx)
  56. wg.Done()
  57. }()
  58. defer wg.Wait()
  59. for {
  60. select {
  61. case evtBatch, ok := <-evts:
  62. if !ok {
  63. log.Infof("evts chan closed")
  64. return
  65. }
  66. n.handleSubnetEvents(evtBatch)
  67. }
  68. }
  69. }
  70. func (n *RouteNetwork) handleSubnetEvents(batch []subnet.Event) {
  71. for _, evt := range batch {
  72. switch evt.Type {
  73. case subnet.EventAdded:
  74. if evt.Lease.Attrs.BackendType != n.BackendType {
  75. log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
  76. continue
  77. }
  78. if evt.Lease.EnableIPv4 {
  79. log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)
  80. route := n.GetRoute(&evt.Lease)
  81. routeAdd(route, netlink.FAMILY_V4, n.addToRouteList, n.removeFromV4RouteList)
  82. }
  83. if evt.Lease.EnableIPv6 {
  84. log.Infof("Subnet added: %v via %v", evt.Lease.IPv6Subnet, evt.Lease.Attrs.PublicIPv6)
  85. route := n.GetV6Route(&evt.Lease)
  86. routeAdd(route, netlink.FAMILY_V6, n.addToV6RouteList, n.removeFromV6RouteList)
  87. }
  88. case subnet.EventRemoved:
  89. if evt.Lease.Attrs.BackendType != n.BackendType {
  90. log.Warningf("Ignoring non-%v subnet: type=%v", n.BackendType, evt.Lease.Attrs.BackendType)
  91. continue
  92. }
  93. if evt.Lease.EnableIPv4 {
  94. log.Info("Subnet removed: ", evt.Lease.Subnet)
  95. route := n.GetRoute(&evt.Lease)
  96. // Always remove the route from the route list.
  97. n.removeFromV4RouteList(*route)
  98. if err := netlink.RouteDel(route); err != nil {
  99. log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
  100. }
  101. }
  102. if evt.Lease.EnableIPv6 {
  103. log.Info("Subnet removed: ", evt.Lease.IPv6Subnet)
  104. route := n.GetV6Route(&evt.Lease)
  105. // Always remove the route from the route list.
  106. n.removeFromV6RouteList(*route)
  107. if err := netlink.RouteDel(route); err != nil {
  108. log.Errorf("Error deleting route to %v: %v", evt.Lease.IPv6Subnet, err)
  109. }
  110. }
  111. default:
  112. log.Error("Internal error: unknown event type: ", int(evt.Type))
  113. }
  114. }
  115. }
  116. func routeAdd(route *netlink.Route, ipFamily int, addToRouteList, removeFromRouteList func(netlink.Route)) {
  117. addToRouteList(*route)
  118. // Check if route exists before attempting to add it
  119. routeList, err := netlink.RouteListFiltered(ipFamily, &netlink.Route{Dst: route.Dst}, netlink.RT_FILTER_DST)
  120. if err != nil {
  121. log.Warningf("Unable to list routes: %v", err)
  122. }
  123. if len(routeList) > 0 && !routeEqual(routeList[0], *route) {
  124. // Same Dst different Gw or different link index. Remove it, correct route will be added below.
  125. log.Warningf("Replacing existing route to %v with %v", routeList[0], route)
  126. if err := netlink.RouteDel(&routeList[0]); err != nil {
  127. log.Errorf("Effor deleteing route to %v: %v", routeList[0].Dst, err)
  128. return
  129. }
  130. removeFromRouteList(routeList[0])
  131. }
  132. routeList, err = netlink.RouteListFiltered(ipFamily, &netlink.Route{Dst: route.Dst}, netlink.RT_FILTER_DST)
  133. if err != nil {
  134. log.Warningf("Unable to list routes: %v", err)
  135. }
  136. if len(routeList) > 0 && routeEqual(routeList[0], *route) {
  137. // Same Dst and same Gw, keep it and do not attempt to add it.
  138. log.Infof("Route to %v already exists, skipping.", route)
  139. } else if err := netlink.RouteAdd(route); err != nil {
  140. log.Errorf("Error adding route to %v", route)
  141. return
  142. }
  143. routeList, err = netlink.RouteListFiltered(ipFamily, &netlink.Route{Dst: route.Dst}, netlink.RT_FILTER_DST)
  144. if err != nil {
  145. log.Warningf("Unable to list routes: %v", err)
  146. }
  147. }
  148. func (n *RouteNetwork) addToRouteList(route netlink.Route) {
  149. n.routes = addToRouteList(&route, n.routes)
  150. }
  151. func (n *RouteNetwork) addToV6RouteList(route netlink.Route) {
  152. n.v6Routes = addToRouteList(&route, n.v6Routes)
  153. }
  154. func addToRouteList(route *netlink.Route, routes []netlink.Route) []netlink.Route {
  155. for _, r := range routes {
  156. if routeEqual(r, *route) {
  157. return routes
  158. }
  159. }
  160. return append(routes, *route)
  161. }
  162. func (n *RouteNetwork) removeFromV4RouteList(route netlink.Route) {
  163. n.routes = n.removeFromRouteList(&route, n.routes)
  164. }
  165. func (n *RouteNetwork) removeFromV6RouteList(route netlink.Route) {
  166. n.v6Routes = n.removeFromRouteList(&route, n.v6Routes)
  167. }
  168. func (n *RouteNetwork) removeFromRouteList(route *netlink.Route, routes []netlink.Route) []netlink.Route {
  169. for index, r := range routes {
  170. if routeEqual(r, *route) {
  171. routes = append(routes[:index], routes[index+1:]...)
  172. return routes
  173. }
  174. }
  175. return routes
  176. }
  177. func (n *RouteNetwork) routeCheck(ctx context.Context) {
  178. for {
  179. select {
  180. case <-ctx.Done():
  181. return
  182. case <-time.After(routeCheckRetries * time.Second):
  183. n.checkSubnetExistInV4Routes()
  184. n.checkSubnetExistInV6Routes()
  185. }
  186. }
  187. }
  188. func (n *RouteNetwork) checkSubnetExistInV4Routes() {
  189. n.checkSubnetExistInRoutes(n.routes, netlink.FAMILY_V4)
  190. }
  191. func (n *RouteNetwork) checkSubnetExistInV6Routes() {
  192. n.checkSubnetExistInRoutes(n.v6Routes, netlink.FAMILY_V6)
  193. }
  194. func (n *RouteNetwork) checkSubnetExistInRoutes(routes []netlink.Route, ipFamily int) {
  195. routeList, err := netlink.RouteList(nil, ipFamily)
  196. if err == nil {
  197. for _, route := range routes {
  198. exist := false
  199. for _, r := range routeList {
  200. if r.Dst == nil {
  201. continue
  202. }
  203. if routeEqual(r, route) {
  204. exist = true
  205. break
  206. }
  207. }
  208. if !exist {
  209. if err := netlink.RouteAdd(&route); err != nil {
  210. if nerr, ok := err.(net.Error); !ok {
  211. log.Errorf("Error recovering route to %v: %v, %v", route.Dst, route.Gw, nerr)
  212. }
  213. continue
  214. } else {
  215. log.Infof("Route recovered %v : %v", route.Dst, route.Gw)
  216. }
  217. }
  218. }
  219. } else {
  220. log.Errorf("Error fetching route list. Will automatically retry: %v", err)
  221. }
  222. }
  223. func routeEqual(x, y netlink.Route) bool {
  224. // For ipip backend, when enabling directrouting, link index of some routes may change
  225. // For both ipip and host-gw backend, link index may also change if updating ExtIface
  226. if x.Dst.IP.Equal(y.Dst.IP) && x.Gw.Equal(y.Gw) && bytes.Equal(x.Dst.Mask, y.Dst.Mask) && x.LinkIndex == y.LinkIndex {
  227. return true
  228. }
  229. return false
  230. }