manager.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright 2015 flannel authors
  2. // Copyright 2015 Red Hat, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package network
  16. import (
  17. "errors"
  18. "flag"
  19. "fmt"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
  27. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  28. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  29. "github.com/coreos/flannel/backend"
  30. "github.com/coreos/flannel/pkg/ip"
  31. "github.com/coreos/flannel/subnet"
  32. )
  33. type CmdLineOpts struct {
  34. publicIP string
  35. ipMasq bool
  36. subnetFile string
  37. subnetDir string
  38. iface string
  39. networks string
  40. watchNetworks bool
  41. }
  42. var errAlreadyExists = errors.New("already exists")
  43. var opts CmdLineOpts
  44. func init() {
  45. flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
  46. flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
  47. flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
  48. flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
  49. flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
  50. flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
  51. flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
  52. }
  53. type Manager struct {
  54. ctx context.Context
  55. sm subnet.Manager
  56. bm backend.Manager
  57. allowedNetworks map[string]bool
  58. mux sync.Mutex
  59. networks map[string]*Network
  60. watch bool
  61. ipMasq bool
  62. extIface *backend.ExternalInterface
  63. }
  64. func (m *Manager) isNetAllowed(name string) bool {
  65. // If allowedNetworks is empty all networks are allowed
  66. if len(m.allowedNetworks) > 0 {
  67. _, ok := m.allowedNetworks[name]
  68. return ok
  69. }
  70. return true
  71. }
  72. func (m *Manager) isMultiNetwork() bool {
  73. return len(m.allowedNetworks) > 0 || m.watch
  74. }
  75. func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
  76. extIface, err := lookupExtIface(opts.iface)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bm := backend.NewManager(ctx, sm, extIface)
  81. manager := &Manager{
  82. ctx: ctx,
  83. sm: sm,
  84. bm: bm,
  85. allowedNetworks: make(map[string]bool),
  86. networks: make(map[string]*Network),
  87. watch: opts.watchNetworks,
  88. ipMasq: opts.ipMasq,
  89. extIface: extIface,
  90. }
  91. for _, name := range strings.Split(opts.networks, ",") {
  92. if name != "" {
  93. manager.allowedNetworks[name] = true
  94. }
  95. }
  96. return manager, nil
  97. }
  98. func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
  99. var iface *net.Interface
  100. var iaddr net.IP
  101. var err error
  102. if len(ifname) > 0 {
  103. if iaddr = net.ParseIP(ifname); iaddr != nil {
  104. iface, err = ip.GetInterfaceByIP(iaddr)
  105. if err != nil {
  106. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  107. }
  108. } else {
  109. iface, err = net.InterfaceByName(ifname)
  110. if err != nil {
  111. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  112. }
  113. }
  114. } else {
  115. log.Info("Determining IP address of default interface")
  116. if iface, err = ip.GetDefaultGatewayIface(); err != nil {
  117. return nil, fmt.Errorf("failed to get default interface: %s", err)
  118. }
  119. }
  120. if iaddr == nil {
  121. iaddr, err = ip.GetIfaceIP4Addr(iface)
  122. if err != nil {
  123. return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
  124. }
  125. }
  126. if iface.MTU == 0 {
  127. return nil, fmt.Errorf("failed to determine MTU for %s interface", iaddr)
  128. }
  129. var eaddr net.IP
  130. if len(opts.publicIP) > 0 {
  131. eaddr = net.ParseIP(opts.publicIP)
  132. if eaddr == nil {
  133. return nil, fmt.Errorf("invalid public IP address", opts.publicIP)
  134. }
  135. }
  136. if eaddr == nil {
  137. eaddr = iaddr
  138. }
  139. log.Infof("Using %s as external interface", iaddr)
  140. log.Infof("Using %s as external endpoint", eaddr)
  141. return &backend.ExternalInterface{
  142. Iface: iface,
  143. IfaceAddr: iaddr,
  144. ExtAddr: eaddr,
  145. }, nil
  146. }
  147. func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
  148. dir, name := filepath.Split(path)
  149. os.MkdirAll(dir, 0755)
  150. tempFile := filepath.Join(dir, "."+name)
  151. f, err := os.Create(tempFile)
  152. if err != nil {
  153. return err
  154. }
  155. // Write out the first usable IP by incrementing
  156. // sn.IP by one
  157. sn := bn.Lease().Subnet
  158. sn.IP += 1
  159. fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
  160. fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
  161. fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
  162. _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
  163. f.Close()
  164. if err != nil {
  165. return err
  166. }
  167. // rename(2) the temporary file to the desired location so that it becomes
  168. // atomically visible with the contents
  169. return os.Rename(tempFile, path)
  170. }
  171. func (m *Manager) addNetwork(n *Network) error {
  172. m.mux.Lock()
  173. defer m.mux.Unlock()
  174. if _, ok := m.networks[n.Name]; ok {
  175. return errAlreadyExists
  176. }
  177. m.networks[n.Name] = n
  178. return nil
  179. }
  180. func (m *Manager) delNetwork(n *Network) {
  181. m.mux.Lock()
  182. delete(m.networks, n.Name)
  183. m.mux.Unlock()
  184. }
  185. func (m *Manager) getNetwork(netname string) (*Network, bool) {
  186. m.mux.Lock()
  187. n, ok := m.networks[netname]
  188. m.mux.Unlock()
  189. return n, ok
  190. }
  191. func (m *Manager) forEachNetwork(f func(n *Network)) {
  192. m.mux.Lock()
  193. for _, n := range m.networks {
  194. f(n)
  195. }
  196. m.mux.Unlock()
  197. }
  198. func (m *Manager) runNetwork(n *Network) {
  199. n.Run(m.extIface, func(bn backend.Network) {
  200. if m.isMultiNetwork() {
  201. path := filepath.Join(opts.subnetDir, n.Name) + ".env"
  202. if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
  203. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  204. return
  205. }
  206. } else {
  207. if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
  208. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  209. return
  210. }
  211. daemon.SdNotify("READY=1")
  212. }
  213. })
  214. m.delNetwork(n)
  215. }
  216. func (m *Manager) watchNetworks() {
  217. wg := sync.WaitGroup{}
  218. defer wg.Wait()
  219. events := make(chan []subnet.Event)
  220. wg.Add(1)
  221. go func() {
  222. subnet.WatchNetworks(m.ctx, m.sm, events)
  223. wg.Done()
  224. }()
  225. // skip over the initial snapshot
  226. <-events
  227. for {
  228. select {
  229. case <-m.ctx.Done():
  230. return
  231. case evtBatch := <-events:
  232. for _, e := range evtBatch {
  233. netname := e.Network
  234. if !m.isNetAllowed(netname) {
  235. log.Infof("Network %q is not allowed", netname)
  236. continue
  237. }
  238. switch e.Type {
  239. case subnet.EventAdded:
  240. n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
  241. if err := m.addNetwork(n); err != nil {
  242. log.Infof("Network %q: %v", netname, err)
  243. continue
  244. }
  245. log.Infof("Network added: %v", netname)
  246. wg.Add(1)
  247. go func() {
  248. m.runNetwork(n)
  249. wg.Done()
  250. }()
  251. case subnet.EventRemoved:
  252. log.Infof("Network removed: %v", netname)
  253. n, ok := m.getNetwork(netname)
  254. if !ok {
  255. log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
  256. continue
  257. }
  258. n.Cancel()
  259. }
  260. }
  261. }
  262. }
  263. }
  264. func (m *Manager) Run(ctx context.Context) {
  265. wg := sync.WaitGroup{}
  266. if m.isMultiNetwork() {
  267. for {
  268. // Try adding initial networks
  269. result, err := m.sm.WatchNetworks(ctx, nil)
  270. if err == nil {
  271. for _, n := range result.Snapshot {
  272. if m.isNetAllowed(n) {
  273. m.networks[n] = NewNetwork(ctx, m.sm, m.bm, n, m.ipMasq)
  274. }
  275. }
  276. break
  277. }
  278. // Otherwise retry in a few seconds
  279. log.Warning("Failed to retrieve networks (will retry): %v", err)
  280. select {
  281. case <-ctx.Done():
  282. return
  283. case <-time.After(time.Second):
  284. }
  285. }
  286. } else {
  287. m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq)
  288. }
  289. // Run existing networks
  290. m.forEachNetwork(func(n *Network) {
  291. wg.Add(1)
  292. go func(n *Network) {
  293. m.runNetwork(n)
  294. wg.Done()
  295. }(n)
  296. })
  297. if opts.watchNetworks {
  298. m.watchNetworks()
  299. }
  300. wg.Wait()
  301. m.bm.Wait()
  302. }