udp.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package udp
  2. import (
  3. "fmt"
  4. "encoding/json"
  5. "net"
  6. "os"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
  12. log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
  13. "github.com/coreos/rudder/backend"
  14. "github.com/coreos/rudder/pkg/ip"
  15. "github.com/coreos/rudder/subnet"
  16. )
  17. const (
  18. encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
  19. )
  20. type UdpBackend struct {
  21. sm *subnet.SubnetManager
  22. ctl *os.File
  23. ctl2 *os.File
  24. tun *os.File
  25. conn *net.UDPConn
  26. port int
  27. mtu int
  28. tunNet ip.IP4Net
  29. stop chan bool
  30. wg sync.WaitGroup
  31. }
  32. func New(sm *subnet.SubnetManager, port int) backend.Backend {
  33. return &UdpBackend{
  34. sm: sm,
  35. port: port,
  36. stop: make(chan bool),
  37. }
  38. }
  39. func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error) {
  40. sn, err := m.acquireLease(extIP)
  41. if err != nil {
  42. return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %s", err)
  43. }
  44. // Tunnel's subnet is that of the whole overlay network (e.g. /16)
  45. // and not that of the individual host (e.g. /24)
  46. m.tunNet = ip.IP4Net{
  47. IP: sn.IP,
  48. PrefixLen: m.sm.GetConfig().Network.PrefixLen,
  49. }
  50. // TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
  51. m.mtu = extIface.MTU - encapOverhead
  52. if err = m.initTun(ipMasq); err != nil {
  53. return ip.IP4Net{}, 0, err
  54. }
  55. m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.port})
  56. if err != nil {
  57. return ip.IP4Net{}, 0, fmt.Errorf("Failed to start listening on UDP socket: %s", err)
  58. }
  59. m.ctl, m.ctl2, err = newCtlSockets()
  60. if err != nil {
  61. return ip.IP4Net{}, 0, fmt.Errorf("Failed to create control socket: %s", err)
  62. }
  63. return sn, m.mtu, nil
  64. }
  65. func (m *UdpBackend) Run() {
  66. // one for each goroutine below
  67. m.wg.Add(2)
  68. go func() {
  69. runCProxy(m.tun, m.conn, m.ctl2, m.tunNet.IP, m.mtu)
  70. m.wg.Done()
  71. }()
  72. go func() {
  73. m.sm.LeaseRenewer(m.stop)
  74. m.wg.Done()
  75. }()
  76. m.monitorEvents()
  77. m.wg.Wait()
  78. }
  79. func (m *UdpBackend) Stop() {
  80. if m.ctl != nil {
  81. stopProxy(m.ctl)
  82. }
  83. close(m.stop)
  84. }
  85. func (m *UdpBackend) Name() string {
  86. return "UDP"
  87. }
  88. func (m *UdpBackend) acquireLease(extIP net.IP) (ip.IP4Net, error) {
  89. attrs := subnet.BaseAttrs{
  90. PublicIP: ip.FromIP(extIP),
  91. }
  92. data, err := json.Marshal(&attrs)
  93. if err != nil {
  94. return ip.IP4Net{}, err
  95. }
  96. var sn ip.IP4Net
  97. for {
  98. sn, err = m.sm.AcquireLease(attrs.PublicIP, string(data), m.stop)
  99. if err == nil {
  100. log.Info("Subnet lease acquired: ", sn)
  101. break
  102. }
  103. log.Error("Failed to acquire subnet: ", err)
  104. select {
  105. case <-time.After(time.Second):
  106. break
  107. case <-m.stop:
  108. return ip.IP4Net{}, backend.ErrInterrupted
  109. }
  110. }
  111. return sn, nil
  112. }
  113. func newCtlSockets() (*os.File, *os.File, error) {
  114. fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
  115. if err != nil {
  116. return nil, nil, err
  117. }
  118. f1 := os.NewFile(uintptr(fds[0]), "ctl")
  119. f2 := os.NewFile(uintptr(fds[1]), "ctl")
  120. return f1, f2, nil
  121. }
  122. func (m *UdpBackend) initTun(ipMasq bool) error {
  123. var tunName string
  124. var err error
  125. m.tun, tunName, err = ip.OpenTun("rudder%d")
  126. if err != nil {
  127. log.Error("Failed to open TUN device: ", err)
  128. return err
  129. }
  130. err = configureIface(tunName, m.tunNet, m.mtu)
  131. if err != nil {
  132. return err
  133. }
  134. if ipMasq {
  135. err = setupIpMasq(m.tunNet.Network(), tunName)
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. return nil
  141. }
  142. func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
  143. iface, err := net.InterfaceByName(ifname)
  144. if err != nil {
  145. log.Error("Failed to lookup interface ", ifname)
  146. return err
  147. }
  148. n := ipn.ToIPNet()
  149. err = netlink.NetworkLinkAddIp(iface, n.IP, n)
  150. if err != nil {
  151. log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
  152. return err
  153. }
  154. err = netlink.NetworkSetMTU(iface, mtu)
  155. if err != nil {
  156. log.Errorf("Failed to set MTU for %s: ", ifname, err)
  157. return err
  158. }
  159. err = netlink.NetworkLinkUp(iface)
  160. if err != nil {
  161. log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
  162. return err
  163. }
  164. // explicitly add a route since there might be a route for a subnet already
  165. // installed by Docker and then it won't get auto added
  166. err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
  167. if err != nil && err != syscall.EEXIST {
  168. log.Errorf("Failed to add route (%s -> %s): ", ipn.Network().String(), ifname, err)
  169. return err
  170. }
  171. return nil
  172. }
  173. func setupIpMasq(ipn ip.IP4Net, iface string) error {
  174. ipt, err := ip.NewIPTables()
  175. if err != nil {
  176. log.Error("Failed to setup IP Masquerade. iptables was not found")
  177. return err
  178. }
  179. err = ipt.ClearChain("nat", "RUDDER")
  180. if err != nil {
  181. log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
  182. return err
  183. }
  184. rules := [][]string{
  185. // This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
  186. []string{ "RUDDER", "-d", ipn.String(), "-j", "ACCEPT" },
  187. // This rule makes sure we don't NAT multicast traffic within overlay network
  188. []string{ "RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT" },
  189. // This rule will NAT everything originating from our overlay network and
  190. []string{ "RUDDER", "!", "-o", iface, "-j", "MASQUERADE" },
  191. // This rule will take everything coming from overlay and sent it to RUDDER chain
  192. []string{ "POSTROUTING", "-s", ipn.String(), "-j", "RUDDER" },
  193. }
  194. for _, args := range rules {
  195. log.Info("Adding iptables rule: ", strings.Join(args, " "))
  196. err = ipt.AppendUnique("nat", args...)
  197. if err != nil {
  198. log.Error("Failed to insert IP masquerade rule: ", err)
  199. return err
  200. }
  201. }
  202. return nil
  203. }
  204. func (m *UdpBackend) monitorEvents() {
  205. log.Info("Watching for new subnet leases")
  206. evts := make(chan subnet.EventBatch)
  207. m.wg.Add(1)
  208. go func() {
  209. m.sm.WatchLeases(evts, m.stop)
  210. m.wg.Done()
  211. }()
  212. for {
  213. select {
  214. case evtBatch := <-evts:
  215. for _, evt := range evtBatch {
  216. switch evt.Type {
  217. case subnet.SubnetAdded:
  218. log.Info("Subnet added: ", evt.Lease.Network)
  219. var attrs subnet.BaseAttrs
  220. if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
  221. log.Error("Error decoding subnet lease JSON: ", err)
  222. continue
  223. }
  224. setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.port)
  225. case subnet.SubnetRemoved:
  226. log.Info("Subnet removed: ", evt.Lease.Network)
  227. removeRoute(m.ctl, evt.Lease.Network)
  228. default:
  229. log.Error("Internal error: unknown event type: ", int(evt.Type))
  230. }
  231. }
  232. case <-m.stop:
  233. return
  234. }
  235. }
  236. }