run.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package udp
  2. import (
  3. "encoding/json"
  4. "net"
  5. "strings"
  6. "syscall"
  7. "time"
  8. "github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
  9. log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
  10. "github.com/coreos/rudder/backend"
  11. "github.com/coreos/rudder/pkg/ip"
  12. "github.com/coreos/rudder/subnet"
  13. )
  14. const (
  15. encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
  16. )
  17. func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
  18. iface, err := net.InterfaceByName(ifname)
  19. if err != nil {
  20. log.Error("Failed to lookup interface ", ifname)
  21. return err
  22. }
  23. n := ipn.ToIPNet()
  24. err = netlink.NetworkLinkAddIp(iface, n.IP, n)
  25. if err != nil {
  26. log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
  27. return err
  28. }
  29. err = netlink.NetworkSetMTU(iface, mtu)
  30. if err != nil {
  31. log.Errorf("Failed to set MTU for %s: ", ifname, err)
  32. return err
  33. }
  34. err = netlink.NetworkLinkUp(iface)
  35. if err != nil {
  36. log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
  37. return err
  38. }
  39. // explicitly add a route since there might be a route for a subnet already
  40. // installed by Docker and then it won't get auto added
  41. err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
  42. if err != nil && err != syscall.EEXIST {
  43. log.Errorf("Failed to add route (%s -> %s): ", ipn.Network().String(), ifname, err)
  44. return err
  45. }
  46. return nil
  47. }
  48. func setupIpMasq(ipn ip.IP4Net, iface string) error {
  49. ipt, err := ip.NewIPTables()
  50. if err != nil {
  51. log.Error("Failed to setup IP Masquerade. iptables was not found")
  52. return err
  53. }
  54. err = ipt.ClearChain("nat", "RUDDER")
  55. if err != nil {
  56. log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
  57. return err
  58. }
  59. rules := [][]string{
  60. // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
  61. []string{ "RUDDER", "-d", ipn.String(), "-j", "ACCEPT" },
  62. // This rule makes sure we don't NAT multicast traffic within overlay network
  63. []string{ "RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT" },
  64. // This rule will NAT everything originating from our overlay network and
  65. []string{ "RUDDER", "!", "-o", iface, "-j", "MASQUERADE" },
  66. // This rule will take everything coming from overlay and sent it to RUDDER chain
  67. []string{ "POSTROUTING", "-s", ipn.String(), "-j", "RUDDER" },
  68. }
  69. for _, args := range rules {
  70. log.Info("Adding iptables rule: ", strings.Join(args, " "))
  71. err = ipt.AppendUnique("nat", args...)
  72. if err != nil {
  73. log.Error("Failed to insert IP masquerade rule: ", err)
  74. return err
  75. }
  76. }
  77. return nil
  78. }
  79. func acquireLease(sm *subnet.SubnetManager, pubIP net.IP) (ip.IP4Net, error) {
  80. attrs := subnet.BaseAttrs{
  81. PublicIP: ip.FromIP(pubIP),
  82. }
  83. data, err := json.Marshal(&attrs)
  84. if err != nil {
  85. return ip.IP4Net{}, err
  86. }
  87. var sn ip.IP4Net
  88. for {
  89. sn, err = sm.AcquireLease(attrs.PublicIP, string(data))
  90. if err == nil {
  91. log.Info("Subnet lease acquired: ", sn)
  92. break
  93. }
  94. log.Error("Failed to acquire subnet: ", err)
  95. time.Sleep(time.Second)
  96. }
  97. return sn, nil
  98. }
  99. func Run(sm *subnet.SubnetManager, tepIface *net.Interface, tepIP net.IP, port int, ipMasq bool, ready backend.ReadyFunc) {
  100. sn, err := acquireLease(sm, tepIP)
  101. if err != nil {
  102. log.Error("Failed to acquire lease: ", err)
  103. return
  104. }
  105. tun, tunName, err := ip.OpenTun("rudder%d")
  106. if err != nil {
  107. log.Error("Failed to open TUN device: ", err)
  108. return
  109. }
  110. localAddr := net.UDPAddr{
  111. Port: port,
  112. }
  113. conn, err := net.ListenUDP("udp4", &localAddr)
  114. if err != nil {
  115. log.Error("Failed to start listening on UDP socket: ", err)
  116. return
  117. }
  118. // Interface's subnet is that of the whole overlay network (e.g. /16)
  119. // and not that of the individual host (e.g. /24)
  120. tunNet := ip.IP4Net{
  121. IP: sn.IP,
  122. PrefixLen: sm.GetConfig().Network.PrefixLen,
  123. }
  124. // TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
  125. var mtu int
  126. if tepIface.MTU > 0 {
  127. mtu = tepIface.MTU - encapOverhead
  128. } else {
  129. log.Errorf("Failed to determine MTU for %s interface", tepIP)
  130. return
  131. }
  132. err = configureIface(tunName, tunNet, mtu)
  133. if err != nil {
  134. return
  135. }
  136. if ipMasq {
  137. err = setupIpMasq(tunNet.Network(), tunName)
  138. if err != nil {
  139. return
  140. }
  141. }
  142. // all initialized and ready for business
  143. log.Info("UDP encapsulation initialized")
  144. ready(sn, mtu)
  145. fastProxy(sm, tun, conn, tunNet.IP, uint(mtu), port)
  146. }