manager.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. // Copyright 2015 flannel authors
  2. // Copyright 2015 Red Hat, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package network
  16. import (
  17. "errors"
  18. "flag"
  19. "fmt"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/coreos/go-systemd/daemon"
  27. log "github.com/golang/glog"
  28. "golang.org/x/net/context"
  29. "github.com/coreos/flannel/backend"
  30. "github.com/coreos/flannel/pkg/ip"
  31. "github.com/coreos/flannel/subnet"
  32. )
  33. type CmdLineOpts struct {
  34. publicIP string
  35. ipMasq bool
  36. subnetFile string
  37. subnetDir string
  38. iface string
  39. networks string
  40. watchNetworks bool
  41. }
  42. var errAlreadyExists = errors.New("already exists")
  43. var opts CmdLineOpts
  44. func init() {
  45. flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
  46. flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
  47. flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
  48. flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
  49. flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
  50. flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
  51. flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
  52. }
  53. type Manager struct {
  54. ctx context.Context
  55. sm subnet.Manager
  56. bm backend.Manager
  57. allowedNetworks map[string]bool
  58. mux sync.Mutex
  59. networks map[string]*Network
  60. watch bool
  61. ipMasq bool
  62. extIface *backend.ExternalInterface
  63. }
  64. func (m *Manager) isNetAllowed(name string) bool {
  65. // If allowedNetworks is empty all networks are allowed
  66. if len(m.allowedNetworks) > 0 {
  67. _, ok := m.allowedNetworks[name]
  68. return ok
  69. }
  70. return true
  71. }
  72. func (m *Manager) isMultiNetwork() bool {
  73. return len(m.allowedNetworks) > 0 || m.watch
  74. }
  75. func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
  76. extIface, err := lookupExtIface(opts.iface)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bm := backend.NewManager(ctx, sm, extIface)
  81. manager := &Manager{
  82. ctx: ctx,
  83. sm: sm,
  84. bm: bm,
  85. allowedNetworks: make(map[string]bool),
  86. networks: make(map[string]*Network),
  87. watch: opts.watchNetworks,
  88. ipMasq: opts.ipMasq,
  89. extIface: extIface,
  90. }
  91. for _, name := range strings.Split(opts.networks, ",") {
  92. if name != "" {
  93. manager.allowedNetworks[name] = true
  94. }
  95. }
  96. return manager, nil
  97. }
  98. func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
  99. var iface *net.Interface
  100. var iaddr net.IP
  101. var err error
  102. if len(ifname) > 0 {
  103. if iaddr = net.ParseIP(ifname); iaddr != nil {
  104. iface, err = ip.GetInterfaceByIP(iaddr)
  105. if err != nil {
  106. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  107. }
  108. } else {
  109. iface, err = net.InterfaceByName(ifname)
  110. if err != nil {
  111. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  112. }
  113. }
  114. } else {
  115. log.Info("Determining IP address of default interface")
  116. if iface, err = ip.GetDefaultGatewayIface(); err != nil {
  117. return nil, fmt.Errorf("failed to get default interface: %s", err)
  118. }
  119. }
  120. if iaddr == nil {
  121. iaddr, err = ip.GetIfaceIP4Addr(iface)
  122. if err != nil {
  123. return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
  124. }
  125. }
  126. if iface.MTU == 0 {
  127. return nil, fmt.Errorf("failed to determine MTU for %s interface", iaddr)
  128. }
  129. var eaddr net.IP
  130. if len(opts.publicIP) > 0 {
  131. eaddr = net.ParseIP(opts.publicIP)
  132. if eaddr == nil {
  133. return nil, fmt.Errorf("invalid public IP address", opts.publicIP)
  134. }
  135. }
  136. if eaddr == nil {
  137. eaddr = iaddr
  138. }
  139. log.Infof("Using %s as external interface", iaddr)
  140. log.Infof("Using %s as external endpoint", eaddr)
  141. return &backend.ExternalInterface{
  142. Iface: iface,
  143. IfaceAddr: iaddr,
  144. ExtAddr: eaddr,
  145. }, nil
  146. }
  147. func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
  148. dir, name := filepath.Split(path)
  149. os.MkdirAll(dir, 0755)
  150. tempFile := filepath.Join(dir, "."+name)
  151. f, err := os.Create(tempFile)
  152. if err != nil {
  153. return err
  154. }
  155. // Write out the first usable IP by incrementing
  156. // sn.IP by one
  157. sn := bn.Lease().Subnet
  158. sn.IP += 1
  159. fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
  160. fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
  161. fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
  162. _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
  163. f.Close()
  164. if err != nil {
  165. return err
  166. }
  167. // rename(2) the temporary file to the desired location so that it becomes
  168. // atomically visible with the contents
  169. return os.Rename(tempFile, path)
  170. }
  171. func (m *Manager) addNetwork(n *Network) error {
  172. m.mux.Lock()
  173. defer m.mux.Unlock()
  174. if _, ok := m.networks[n.Name]; ok {
  175. return errAlreadyExists
  176. }
  177. m.networks[n.Name] = n
  178. return nil
  179. }
  180. func (m *Manager) delNetwork(n *Network) {
  181. m.mux.Lock()
  182. delete(m.networks, n.Name)
  183. m.mux.Unlock()
  184. }
  185. func (m *Manager) getNetwork(netname string) (*Network, bool) {
  186. m.mux.Lock()
  187. n, ok := m.networks[netname]
  188. m.mux.Unlock()
  189. return n, ok
  190. }
  191. func (m *Manager) forEachNetwork(f func(n *Network)) {
  192. m.mux.Lock()
  193. for _, n := range m.networks {
  194. f(n)
  195. }
  196. m.mux.Unlock()
  197. }
  198. func (m *Manager) runNetwork(n *Network) {
  199. n.Run(m.extIface, func(bn backend.Network) {
  200. if m.isMultiNetwork() {
  201. log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)
  202. path := filepath.Join(opts.subnetDir, n.Name) + ".env"
  203. if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
  204. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  205. return
  206. }
  207. } else {
  208. log.Infof("Lease acquired: %v", bn.Lease().Subnet)
  209. if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
  210. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  211. return
  212. }
  213. daemon.SdNotify("READY=1")
  214. }
  215. })
  216. m.delNetwork(n)
  217. }
  218. func (m *Manager) watchNetworks() {
  219. wg := sync.WaitGroup{}
  220. defer wg.Wait()
  221. events := make(chan []subnet.Event)
  222. wg.Add(1)
  223. go func() {
  224. subnet.WatchNetworks(m.ctx, m.sm, events)
  225. wg.Done()
  226. }()
  227. // skip over the initial snapshot
  228. <-events
  229. for {
  230. select {
  231. case <-m.ctx.Done():
  232. return
  233. case evtBatch := <-events:
  234. for _, e := range evtBatch {
  235. netname := e.Network
  236. if !m.isNetAllowed(netname) {
  237. log.Infof("Network %q is not allowed", netname)
  238. continue
  239. }
  240. switch e.Type {
  241. case subnet.EventAdded:
  242. n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
  243. if err := m.addNetwork(n); err != nil {
  244. log.Infof("Network %q: %v", netname, err)
  245. continue
  246. }
  247. log.Infof("Network added: %v", netname)
  248. wg.Add(1)
  249. go func() {
  250. m.runNetwork(n)
  251. wg.Done()
  252. }()
  253. case subnet.EventRemoved:
  254. log.Infof("Network removed: %v", netname)
  255. n, ok := m.getNetwork(netname)
  256. if !ok {
  257. log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
  258. continue
  259. }
  260. n.Cancel()
  261. }
  262. }
  263. }
  264. }
  265. }
  266. func (m *Manager) Run(ctx context.Context) {
  267. wg := sync.WaitGroup{}
  268. if m.isMultiNetwork() {
  269. for {
  270. // Try adding initial networks
  271. result, err := m.sm.WatchNetworks(ctx, nil)
  272. if err == nil {
  273. for _, n := range result.Snapshot {
  274. if m.isNetAllowed(n) {
  275. m.networks[n] = NewNetwork(ctx, m.sm, m.bm, n, m.ipMasq)
  276. }
  277. }
  278. break
  279. }
  280. // Otherwise retry in a few seconds
  281. log.Warning("Failed to retrieve networks (will retry): %v", err)
  282. select {
  283. case <-ctx.Done():
  284. return
  285. case <-time.After(time.Second):
  286. }
  287. }
  288. } else {
  289. m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq)
  290. }
  291. // Run existing networks
  292. m.forEachNetwork(func(n *Network) {
  293. wg.Add(1)
  294. go func(n *Network) {
  295. m.runNetwork(n)
  296. wg.Done()
  297. }(n)
  298. })
  299. if opts.watchNetworks {
  300. m.watchNetworks()
  301. }
  302. wg.Wait()
  303. m.bm.Wait()
  304. }