manager.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright 2015 flannel authors
  2. // Copyright 2015 Red Hat, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. package network
  16. import (
  17. "errors"
  18. "flag"
  19. "fmt"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "strings"
  24. "sync"
  25. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
  26. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  27. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  28. "github.com/coreos/flannel/backend"
  29. "github.com/coreos/flannel/pkg/ip"
  30. "github.com/coreos/flannel/subnet"
  31. )
  32. type CmdLineOpts struct {
  33. publicIP string
  34. ipMasq bool
  35. subnetFile string
  36. subnetDir string
  37. iface string
  38. networks string
  39. watchNetworks bool
  40. }
  41. var errAlreadyExists = errors.New("already exists")
  42. var opts CmdLineOpts
  43. func init() {
  44. flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
  45. flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
  46. flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
  47. flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
  48. flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
  49. flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
  50. flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
  51. }
  52. type Manager struct {
  53. ctx context.Context
  54. sm subnet.Manager
  55. bm backend.Manager
  56. allowedNetworks map[string]bool
  57. mux sync.Mutex
  58. networks map[string]*Network
  59. watch bool
  60. ipMasq bool
  61. extIface *backend.ExternalInterface
  62. }
  63. func (m *Manager) isNetAllowed(name string) bool {
  64. // If allowedNetworks is empty all networks are allowed
  65. if len(m.allowedNetworks) > 0 {
  66. _, ok := m.allowedNetworks[name]
  67. return ok
  68. }
  69. return true
  70. }
  71. func (m *Manager) isMultiNetwork() bool {
  72. return len(m.allowedNetworks) > 0 || m.watch
  73. }
  74. func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
  75. extIface, err := lookupExtIface(opts.iface)
  76. if err != nil {
  77. return nil, err
  78. }
  79. bm := backend.NewManager(ctx, sm, extIface)
  80. manager := &Manager{
  81. ctx: ctx,
  82. sm: sm,
  83. bm: bm,
  84. allowedNetworks: make(map[string]bool),
  85. networks: make(map[string]*Network),
  86. watch: opts.watchNetworks,
  87. ipMasq: opts.ipMasq,
  88. extIface: extIface,
  89. }
  90. for _, name := range strings.Split(opts.networks, ",") {
  91. if name != "" {
  92. manager.allowedNetworks[name] = true
  93. }
  94. }
  95. if manager.isMultiNetwork() {
  96. // Get list of existing networks
  97. result, err := manager.sm.WatchNetworks(ctx, nil)
  98. if err != nil {
  99. return nil, err
  100. }
  101. for _, n := range result.Snapshot {
  102. if manager.isNetAllowed(n) {
  103. manager.networks[n] = NewNetwork(ctx, sm, bm, n, manager.ipMasq)
  104. }
  105. }
  106. } else {
  107. manager.networks[""] = NewNetwork(ctx, sm, bm, "", manager.ipMasq)
  108. }
  109. return manager, nil
  110. }
  111. func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
  112. var iface *net.Interface
  113. var iaddr net.IP
  114. var err error
  115. if len(ifname) > 0 {
  116. if iaddr = net.ParseIP(ifname); iaddr != nil {
  117. iface, err = ip.GetInterfaceByIP(iaddr)
  118. if err != nil {
  119. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  120. }
  121. } else {
  122. iface, err = net.InterfaceByName(ifname)
  123. if err != nil {
  124. return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
  125. }
  126. }
  127. } else {
  128. log.Info("Determining IP address of default interface")
  129. if iface, err = ip.GetDefaultGatewayIface(); err != nil {
  130. return nil, fmt.Errorf("failed to get default interface: %s", err)
  131. }
  132. }
  133. if iaddr == nil {
  134. iaddr, err = ip.GetIfaceIP4Addr(iface)
  135. if err != nil {
  136. return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
  137. }
  138. }
  139. if iface.MTU == 0 {
  140. return nil, fmt.Errorf("failed to determine MTU for %s interface", iaddr)
  141. }
  142. var eaddr net.IP
  143. if len(opts.publicIP) > 0 {
  144. eaddr = net.ParseIP(opts.publicIP)
  145. if eaddr == nil {
  146. return nil, fmt.Errorf("invalid public IP address", opts.publicIP)
  147. }
  148. }
  149. if eaddr == nil {
  150. eaddr = iaddr
  151. }
  152. log.Infof("Using %s as external interface", iaddr)
  153. log.Infof("Using %s as external endpoint", eaddr)
  154. return &backend.ExternalInterface{
  155. Iface: iface,
  156. IfaceAddr: iaddr,
  157. ExtAddr: eaddr,
  158. }, nil
  159. }
  160. func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
  161. dir, name := filepath.Split(path)
  162. os.MkdirAll(dir, 0755)
  163. tempFile := filepath.Join(dir, "."+name)
  164. f, err := os.Create(tempFile)
  165. if err != nil {
  166. return err
  167. }
  168. // Write out the first usable IP by incrementing
  169. // sn.IP by one
  170. sn := bn.Lease().Subnet
  171. sn.IP += 1
  172. fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
  173. fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
  174. fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
  175. _, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
  176. f.Close()
  177. if err != nil {
  178. return err
  179. }
  180. // rename(2) the temporary file to the desired location so that it becomes
  181. // atomically visible with the contents
  182. return os.Rename(tempFile, path)
  183. }
  184. func (m *Manager) addNetwork(n *Network) error {
  185. m.mux.Lock()
  186. defer m.mux.Unlock()
  187. if _, ok := m.networks[n.Name]; ok {
  188. return errAlreadyExists
  189. }
  190. m.networks[n.Name] = n
  191. return nil
  192. }
  193. func (m *Manager) delNetwork(n *Network) {
  194. m.mux.Lock()
  195. delete(m.networks, n.Name)
  196. m.mux.Unlock()
  197. }
  198. func (m *Manager) getNetwork(netname string) (*Network, bool) {
  199. m.mux.Lock()
  200. n, ok := m.networks[netname]
  201. m.mux.Unlock()
  202. return n, ok
  203. }
  204. func (m *Manager) forEachNetwork(f func(n *Network)) {
  205. m.mux.Lock()
  206. for _, n := range m.networks {
  207. f(n)
  208. }
  209. m.mux.Unlock()
  210. }
  211. func (m *Manager) runNetwork(n *Network) {
  212. n.Run(m.extIface, func(bn backend.Network) {
  213. if m.isMultiNetwork() {
  214. path := filepath.Join(opts.subnetDir, n.Name) + ".env"
  215. if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
  216. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  217. return
  218. }
  219. } else {
  220. if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
  221. log.Warningf("%v failed to write subnet file: %s", n.Name, err)
  222. return
  223. }
  224. daemon.SdNotify("READY=1")
  225. }
  226. })
  227. m.delNetwork(n)
  228. }
  229. func (m *Manager) watchNetworks() {
  230. wg := sync.WaitGroup{}
  231. defer wg.Wait()
  232. events := make(chan []subnet.Event)
  233. wg.Add(1)
  234. go func() {
  235. subnet.WatchNetworks(m.ctx, m.sm, events)
  236. wg.Done()
  237. }()
  238. // skip over the initial snapshot
  239. <-events
  240. for {
  241. select {
  242. case <-m.ctx.Done():
  243. return
  244. case evtBatch := <-events:
  245. for _, e := range evtBatch {
  246. netname := e.Network
  247. if !m.isNetAllowed(netname) {
  248. log.Infof("Network %q is not allowed", netname)
  249. continue
  250. }
  251. switch e.Type {
  252. case subnet.EventAdded:
  253. n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
  254. if err := m.addNetwork(n); err != nil {
  255. log.Infof("Network %q: %v", netname, err)
  256. continue
  257. }
  258. log.Infof("Network added: %v", netname)
  259. wg.Add(1)
  260. go func() {
  261. m.runNetwork(n)
  262. wg.Done()
  263. }()
  264. case subnet.EventRemoved:
  265. log.Infof("Network removed: %v", netname)
  266. n, ok := m.getNetwork(netname)
  267. if !ok {
  268. log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
  269. continue
  270. }
  271. n.Cancel()
  272. }
  273. }
  274. }
  275. }
  276. }
  277. func (m *Manager) Run() {
  278. wg := sync.WaitGroup{}
  279. // Run existing networks
  280. m.forEachNetwork(func(n *Network) {
  281. wg.Add(1)
  282. go func(n *Network) {
  283. m.runNetwork(n)
  284. wg.Done()
  285. }(n)
  286. })
  287. if opts.watchNetworks {
  288. m.watchNetworks()
  289. }
  290. wg.Wait()
  291. m.bm.Wait()
  292. }