manager.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Copyright 2015 flannel authors
  2. // Copyright 2015 Red Hat, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package network
  16. import (
  17. "errors"
  18. "flag"
  19. "fmt"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/coreos/go-systemd/daemon"
  27. log "github.com/golang/glog"
  28. "golang.org/x/net/context"
  29. "github.com/coreos/flannel/backend"
  30. "github.com/coreos/flannel/pkg/ip"
  31. "github.com/coreos/flannel/subnet"
  32. )
  33. type CmdLineOpts struct {
  34. publicIP string
  35. ipMasq bool
  36. subnetFile string
  37. subnetDir string
  38. iface string
  39. networks string
  40. watchNetworks bool
  41. }
  42. var errAlreadyExists = errors.New("already exists")
  43. var opts CmdLineOpts
  44. func init() {
  45. flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
  46. flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
  47. flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
  48. flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
  49. flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
  50. flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
  51. flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
  52. }
  53. type Manager struct {
  54. ctx context.Context
  55. sm subnet.Manager
  56. bm backend.Manager
  57. allowedNetworks map[string]bool
  58. mux sync.Mutex
  59. networks map[string]*Network
  60. watch bool
  61. ipMasq bool
  62. extIface *backend.ExternalInterface
  63. }
  64. func (m *Manager) isNetAllowed(name string) bool {
  65. // If allowedNetworks is empty all networks are allowed
  66. if len(m.allowedNetworks) > 0 {
  67. _, ok := m.allowedNetworks[name]
  68. return ok
  69. }
  70. return true
  71. }
  72. func (m *Manager) isMultiNetwork() bool {
  73. return len(m.allowedNetworks) > 0 || m.watch
  74. }
  75. func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
  76. extIface, err := lookupExtIface(opts.iface)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bm := backend.NewManager(ctx, sm, extIface)
  81. manager := &Manager{
  82. ctx: ctx,
  83. sm: sm,
  84. bm: bm,
  85. allowedNetworks: make(map[string]bool),
  86. networks: make(map[string]*Network),
  87. watch: opts.watchNetworks,
  88. ipMasq: opts.ipMasq,
  89. extIface: extIface,
  90. }
  91. for _, name := range strings.Split(opts.networks, ",") {
  92. if name != "" {
  93. manager.allowedNetworks[name] = true
  94. }
  95. }
  96. return manager, nil
  97. }
  98. func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
  99. var iface *net.Interface
  100. var ifaceAddr net.IP
  101. var err error
  102. if len(ifname) > 0 {
  103. if ifaceAddr = net.ParseIP(ifname); ifaceAddr != nil {
  104. log.Infof("Searching for interface using %s", ifaceAddr)
  105. iface, err = ip.GetInterfaceByIP(ifaceAddr)
  106. if err != nil {
  107. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  108. }
  109. } else {
  110. iface, err = net.InterfaceByName(ifname)
  111. if err != nil {
  112. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  113. }
  114. }
  115. } else {
  116. log.Info("Determining IP address of default interface")
  117. if iface, err = ip.GetDefaultGatewayIface(); err != nil {
  118. return nil, fmt.Errorf("failed to get default interface: %s", err)
  119. }
  120. }
  121. if ifaceAddr == nil {
  122. ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
  123. if err != nil {
  124. return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
  125. }
  126. }
  127. log.Infof("Using interface with name %s and address %s", iface.Name, ifaceAddr)
  128. if iface.MTU == 0 {
  129. return nil, fmt.Errorf("failed to determine MTU for %s interface", ifaceAddr)
  130. }
  131. var extAddr net.IP
  132. if len(opts.publicIP) > 0 {
  133. extAddr = net.ParseIP(opts.publicIP)
  134. if extAddr == nil {
  135. return nil, fmt.Errorf("invalid public IP address: %s", opts.publicIP)
  136. }
  137. log.Infof("Using %s as external address", extAddr)
  138. }
  139. if extAddr == nil {
  140. log.Infof("Defaulting external address to interface address (%s)", ifaceAddr)
  141. extAddr = ifaceAddr
  142. }
  143. return &backend.ExternalInterface{
  144. Iface: iface,
  145. IfaceAddr: ifaceAddr,
  146. ExtAddr: extAddr,
  147. }, nil
  148. }
  149. func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
  150. dir, name := filepath.Split(path)
  151. os.MkdirAll(dir, 0755)
  152. tempFile := filepath.Join(dir, "."+name)
  153. f, err := os.Create(tempFile)
  154. if err != nil {
  155. return err
  156. }
  157. // Write out the first usable IP by incrementing
  158. // sn.IP by one
  159. sn := bn.Lease().Subnet
  160. sn.IP += 1
  161. fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
  162. fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
  163. fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
  164. _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
  165. f.Close()
  166. if err != nil {
  167. return err
  168. }
  169. // rename(2) the temporary file to the desired location so that it becomes
  170. // atomically visible with the contents
  171. return os.Rename(tempFile, path)
  172. }
  173. func (m *Manager) addNetwork(n *Network) error {
  174. m.mux.Lock()
  175. defer m.mux.Unlock()
  176. if _, ok := m.networks[n.Name]; ok {
  177. return errAlreadyExists
  178. }
  179. m.networks[n.Name] = n
  180. return nil
  181. }
  182. func (m *Manager) delNetwork(n *Network) {
  183. m.mux.Lock()
  184. delete(m.networks, n.Name)
  185. m.mux.Unlock()
  186. }
  187. func (m *Manager) getNetwork(netname string) (*Network, bool) {
  188. m.mux.Lock()
  189. n, ok := m.networks[netname]
  190. m.mux.Unlock()
  191. return n, ok
  192. }
  193. func (m *Manager) forEachNetwork(f func(n *Network)) {
  194. m.mux.Lock()
  195. for _, n := range m.networks {
  196. f(n)
  197. }
  198. m.mux.Unlock()
  199. }
  200. func (m *Manager) runNetwork(n *Network) {
  201. n.Run(m.extIface, func(bn backend.Network) {
  202. if m.isMultiNetwork() {
  203. log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)
  204. path := filepath.Join(opts.subnetDir, n.Name) + ".env"
  205. if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
  206. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  207. return
  208. }
  209. } else {
  210. log.Infof("Lease acquired: %v", bn.Lease().Subnet)
  211. if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
  212. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  213. return
  214. }
  215. daemon.SdNotify("READY=1")
  216. }
  217. })
  218. m.delNetwork(n)
  219. }
  220. func (m *Manager) watchNetworks() {
  221. wg := sync.WaitGroup{}
  222. defer wg.Wait()
  223. events := make(chan []subnet.Event)
  224. wg.Add(1)
  225. go func() {
  226. subnet.WatchNetworks(m.ctx, m.sm, events)
  227. wg.Done()
  228. }()
  229. // skip over the initial snapshot
  230. <-events
  231. for {
  232. select {
  233. case <-m.ctx.Done():
  234. return
  235. case evtBatch := <-events:
  236. for _, e := range evtBatch {
  237. netname := e.Network
  238. if !m.isNetAllowed(netname) {
  239. log.Infof("Network %q is not allowed", netname)
  240. continue
  241. }
  242. switch e.Type {
  243. case subnet.EventAdded:
  244. n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
  245. if err := m.addNetwork(n); err != nil {
  246. log.Infof("Network %q: %v", netname, err)
  247. continue
  248. }
  249. log.Infof("Network added: %v", netname)
  250. wg.Add(1)
  251. go func() {
  252. m.runNetwork(n)
  253. wg.Done()
  254. }()
  255. case subnet.EventRemoved:
  256. log.Infof("Network removed: %v", netname)
  257. n, ok := m.getNetwork(netname)
  258. if !ok {
  259. log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
  260. continue
  261. }
  262. n.Cancel()
  263. }
  264. }
  265. }
  266. }
  267. }
  268. func (m *Manager) Run(ctx context.Context) {
  269. wg := sync.WaitGroup{}
  270. if m.isMultiNetwork() {
  271. for {
  272. // Try adding initial networks
  273. result, err := m.sm.WatchNetworks(ctx, nil)
  274. if err == nil {
  275. for _, n := range result.Snapshot {
  276. if m.isNetAllowed(n) {
  277. m.networks[n] = NewNetwork(ctx, m.sm, m.bm, n, m.ipMasq)
  278. }
  279. }
  280. break
  281. }
  282. // Otherwise retry in a few seconds
  283. log.Warning("Failed to retrieve networks (will retry): %v", err)
  284. select {
  285. case <-ctx.Done():
  286. return
  287. case <-time.After(time.Second):
  288. }
  289. }
  290. } else {
  291. m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq)
  292. }
  293. // Run existing networks
  294. m.forEachNetwork(func(n *Network) {
  295. wg.Add(1)
  296. go func(n *Network) {
  297. m.runNetwork(n)
  298. wg.Done()
  299. }(n)
  300. })
  301. if opts.watchNetworks {
  302. m.watchNetworks()
  303. }
  304. wg.Wait()
  305. m.bm.Wait()
  306. }