manager.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. // Copyright 2015 flannel authors
  2. // Copyright 2015 Red Hat, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package network
  16. import (
  17. "errors"
  18. "flag"
  19. "fmt"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/coreos/go-systemd/daemon"
  27. log "github.com/golang/glog"
  28. "golang.org/x/net/context"
  29. "github.com/coreos/flannel/backend"
  30. "github.com/coreos/flannel/pkg/ip"
  31. "github.com/coreos/flannel/subnet"
  32. )
  33. type CmdLineOpts struct {
  34. publicIP string
  35. ipMasq bool
  36. subnetFile string
  37. subnetDir string
  38. iface string
  39. networks string
  40. watchNetworks bool
  41. subnetLeaseRenewMargin int
  42. }
  43. var errAlreadyExists = errors.New("already exists")
  44. var opts CmdLineOpts
  45. func init() {
  46. flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
  47. flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
  48. flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
  49. flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
  50. flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
  51. flag.IntVar(&opts.subnetLeaseRenewMargin, "subnet-lease-renew-margin", 60, "Subnet lease renewal margin, in minutes.")
  52. flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
  53. flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
  54. }
  55. type Manager struct {
  56. ctx context.Context
  57. sm subnet.Manager
  58. bm backend.Manager
  59. allowedNetworks map[string]bool
  60. mux sync.Mutex
  61. networks map[string]*Network
  62. watch bool
  63. ipMasq bool
  64. extIface *backend.ExternalInterface
  65. }
  66. func (m *Manager) isNetAllowed(name string) bool {
  67. // If allowedNetworks is empty all networks are allowed
  68. if len(m.allowedNetworks) > 0 {
  69. _, ok := m.allowedNetworks[name]
  70. return ok
  71. }
  72. return true
  73. }
  74. func (m *Manager) isMultiNetwork() bool {
  75. return len(m.allowedNetworks) > 0 || m.watch
  76. }
  77. func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
  78. extIface, err := lookupExtIface(opts.iface)
  79. if err != nil {
  80. return nil, err
  81. }
  82. bm := backend.NewManager(ctx, sm, extIface)
  83. manager := &Manager{
  84. ctx: ctx,
  85. sm: sm,
  86. bm: bm,
  87. allowedNetworks: make(map[string]bool),
  88. networks: make(map[string]*Network),
  89. watch: opts.watchNetworks,
  90. ipMasq: opts.ipMasq,
  91. extIface: extIface,
  92. }
  93. for _, name := range strings.Split(opts.networks, ",") {
  94. if name != "" {
  95. manager.allowedNetworks[name] = true
  96. }
  97. }
  98. return manager, nil
  99. }
  100. func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
  101. var iface *net.Interface
  102. var ifaceAddr net.IP
  103. var err error
  104. if len(ifname) > 0 {
  105. if ifaceAddr = net.ParseIP(ifname); ifaceAddr != nil {
  106. log.Infof("Searching for interface using %s", ifaceAddr)
  107. iface, err = ip.GetInterfaceByIP(ifaceAddr)
  108. if err != nil {
  109. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  110. }
  111. } else {
  112. iface, err = net.InterfaceByName(ifname)
  113. if err != nil {
  114. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  115. }
  116. }
  117. } else {
  118. log.Info("Determining IP address of default interface")
  119. if iface, err = ip.GetDefaultGatewayIface(); err != nil {
  120. return nil, fmt.Errorf("failed to get default interface: %s", err)
  121. }
  122. }
  123. if ifaceAddr == nil {
  124. ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
  125. if err != nil {
  126. return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
  127. }
  128. }
  129. log.Infof("Using interface with name %s and address %s", iface.Name, ifaceAddr)
  130. if iface.MTU == 0 {
  131. return nil, fmt.Errorf("failed to determine MTU for %s interface", ifaceAddr)
  132. }
  133. var extAddr net.IP
  134. if len(opts.publicIP) > 0 {
  135. extAddr = net.ParseIP(opts.publicIP)
  136. if extAddr == nil {
  137. return nil, fmt.Errorf("invalid public IP address: %s", opts.publicIP)
  138. }
  139. log.Infof("Using %s as external address", extAddr)
  140. }
  141. if extAddr == nil {
  142. log.Infof("Defaulting external address to interface address (%s)", ifaceAddr)
  143. extAddr = ifaceAddr
  144. }
  145. return &backend.ExternalInterface{
  146. Iface: iface,
  147. IfaceAddr: ifaceAddr,
  148. ExtAddr: extAddr,
  149. }, nil
  150. }
  151. func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
  152. dir, name := filepath.Split(path)
  153. os.MkdirAll(dir, 0755)
  154. tempFile := filepath.Join(dir, "."+name)
  155. f, err := os.Create(tempFile)
  156. if err != nil {
  157. return err
  158. }
  159. // Write out the first usable IP by incrementing
  160. // sn.IP by one
  161. sn := bn.Lease().Subnet
  162. sn.IP += 1
  163. fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
  164. fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
  165. fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
  166. _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
  167. f.Close()
  168. if err != nil {
  169. return err
  170. }
  171. // rename(2) the temporary file to the desired location so that it becomes
  172. // atomically visible with the contents
  173. return os.Rename(tempFile, path)
  174. }
  175. func (m *Manager) addNetwork(n *Network) error {
  176. m.mux.Lock()
  177. defer m.mux.Unlock()
  178. if _, ok := m.networks[n.Name]; ok {
  179. return errAlreadyExists
  180. }
  181. m.networks[n.Name] = n
  182. return nil
  183. }
  184. func (m *Manager) delNetwork(n *Network) {
  185. m.mux.Lock()
  186. delete(m.networks, n.Name)
  187. m.mux.Unlock()
  188. }
  189. func (m *Manager) getNetwork(netname string) (*Network, bool) {
  190. m.mux.Lock()
  191. n, ok := m.networks[netname]
  192. m.mux.Unlock()
  193. return n, ok
  194. }
  195. func (m *Manager) forEachNetwork(f func(n *Network)) {
  196. m.mux.Lock()
  197. for _, n := range m.networks {
  198. f(n)
  199. }
  200. m.mux.Unlock()
  201. }
  202. func (m *Manager) runNetwork(n *Network) {
  203. n.Run(m.extIface, func(bn backend.Network) {
  204. if m.isMultiNetwork() {
  205. log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)
  206. path := filepath.Join(opts.subnetDir, n.Name) + ".env"
  207. if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
  208. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  209. return
  210. }
  211. } else {
  212. log.Infof("Lease acquired: %v", bn.Lease().Subnet)
  213. if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
  214. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  215. return
  216. }
  217. daemon.SdNotify(false, "READY=1")
  218. }
  219. })
  220. m.delNetwork(n)
  221. }
  222. func (m *Manager) watchNetworks() {
  223. wg := sync.WaitGroup{}
  224. defer wg.Wait()
  225. events := make(chan []subnet.Event)
  226. wg.Add(1)
  227. go func() {
  228. subnet.WatchNetworks(m.ctx, m.sm, events)
  229. wg.Done()
  230. }()
  231. // skip over the initial snapshot
  232. <-events
  233. for {
  234. select {
  235. case <-m.ctx.Done():
  236. return
  237. case evtBatch := <-events:
  238. for _, e := range evtBatch {
  239. netname := e.Network
  240. if !m.isNetAllowed(netname) {
  241. log.Infof("Network %q is not allowed", netname)
  242. continue
  243. }
  244. switch e.Type {
  245. case subnet.EventAdded:
  246. n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
  247. if err := m.addNetwork(n); err != nil {
  248. log.Infof("Network %q: %v", netname, err)
  249. continue
  250. }
  251. log.Infof("Network added: %v", netname)
  252. wg.Add(1)
  253. go func() {
  254. m.runNetwork(n)
  255. wg.Done()
  256. }()
  257. case subnet.EventRemoved:
  258. log.Infof("Network removed: %v", netname)
  259. n, ok := m.getNetwork(netname)
  260. if !ok {
  261. log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
  262. continue
  263. }
  264. n.Cancel()
  265. }
  266. }
  267. }
  268. }
  269. }
  270. func (m *Manager) Run(ctx context.Context) {
  271. wg := sync.WaitGroup{}
  272. if m.isMultiNetwork() {
  273. for {
  274. // Try adding initial networks
  275. result, err := m.sm.WatchNetworks(ctx, nil)
  276. if err == nil {
  277. for _, n := range result.Snapshot {
  278. if m.isNetAllowed(n) {
  279. m.networks[n] = NewNetwork(ctx, m.sm, m.bm, n, m.ipMasq)
  280. }
  281. }
  282. break
  283. }
  284. // Otherwise retry in a few seconds
  285. log.Warning("Failed to retrieve networks (will retry): %v", err)
  286. select {
  287. case <-ctx.Done():
  288. return
  289. case <-time.After(time.Second):
  290. }
  291. }
  292. } else {
  293. m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq)
  294. }
  295. // Run existing networks
  296. m.forEachNetwork(func(n *Network) {
  297. wg.Add(1)
  298. go func(n *Network) {
  299. m.runNetwork(n)
  300. wg.Done()
  301. }(n)
  302. })
  303. if opts.watchNetworks {
  304. m.watchNetworks()
  305. }
  306. wg.Wait()
  307. m.bm.Wait()
  308. }