proxy.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package udp
  2. import (
  3. "encoding/json"
  4. "net"
  5. "os"
  6. "sync"
  7. log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
  8. "github.com/coreos/rudder/pkg/ip"
  9. "github.com/coreos/rudder/subnet"
  10. )
  11. const (
  12. minIP4HdrSize = 20
  13. )
  14. type routeEntry struct {
  15. sn ip.IP4Net
  16. addr *net.UDPAddr
  17. }
  18. type Router struct {
  19. mux sync.Mutex
  20. port int
  21. routes []routeEntry
  22. }
  23. func NewRouter(port int) *Router {
  24. return &Router{
  25. port: port,
  26. }
  27. }
  28. func (r *Router) SetRoute(sn ip.IP4Net, dst ip.IP4) {
  29. r.mux.Lock()
  30. defer r.mux.Unlock()
  31. for _, re := range r.routes {
  32. if re.sn.Equal(sn) {
  33. re.addr = &net.UDPAddr{
  34. IP: dst.ToIP(),
  35. Port: r.port,
  36. }
  37. return
  38. }
  39. }
  40. re := routeEntry{
  41. sn: sn,
  42. addr: &net.UDPAddr{
  43. IP: dst.ToIP(),
  44. Port: r.port,
  45. },
  46. }
  47. r.routes = append(r.routes, re)
  48. }
  49. func (r *Router) DelRoute(sn ip.IP4Net) {
  50. r.mux.Lock()
  51. defer r.mux.Unlock()
  52. for i, re := range r.routes {
  53. if re.sn.Equal(sn) {
  54. r.routes[i] = r.routes[len(r.routes)-1]
  55. r.routes = r.routes[:len(r.routes)-1]
  56. return
  57. }
  58. }
  59. }
  60. func (r *Router) routePacket(pkt []byte, conn *net.UDPConn) {
  61. if len(pkt) < minIP4HdrSize {
  62. log.V(1).Infof("Packet too small (%d bytes), unable to route", len(pkt))
  63. return
  64. }
  65. r.mux.Lock()
  66. defer r.mux.Unlock()
  67. dstIP := ip.FromBytes(pkt[16:20])
  68. for i, re := range r.routes {
  69. if re.sn.Contains(dstIP) {
  70. nbytes, err := conn.WriteToUDP(pkt, re.addr)
  71. switch {
  72. case err != nil:
  73. log.V(1).Info("UDP send failed with: ", err)
  74. case nbytes != len(pkt):
  75. log.V(1).Infof("Was only able to UDP send %d out of %d bytes to %s: ", nbytes, len(pkt), re.addr.IP)
  76. }
  77. // packets for same dest tend to come in burst. swap to front make it faster for subsequent ones
  78. if i != 0 {
  79. r.routes[0], r.routes[i] = r.routes[i], r.routes[0]
  80. }
  81. return
  82. }
  83. }
  84. log.V(1).Info("No route found for ", dstIP)
  85. }
  86. func proxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunMTU uint, port int) {
  87. log.Info("Running slow proxy loop")
  88. rtr := NewRouter(port)
  89. go proxyTunToUdp(rtr, tun, conn, tunMTU)
  90. go proxyUdpToTun(conn, tun, tunMTU)
  91. log.Info("Watching for new subnet leases")
  92. evts := make(chan subnet.EventBatch)
  93. sm.Start(evts)
  94. for evtBatch := range evts {
  95. for _, evt := range evtBatch {
  96. if evt.Type == subnet.SubnetAdded {
  97. log.Info("Subnet added: ", evt.Lease.Network)
  98. var attrs subnet.BaseAttrs
  99. if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
  100. log.Error("Error decoding subnet lease JSON: ", err)
  101. continue
  102. }
  103. rtr.SetRoute(evt.Lease.Network, attrs.PublicIP)
  104. } else if evt.Type == subnet.SubnetRemoved {
  105. log.Info("Subnet removed: ", evt.Lease.Network)
  106. rtr.DelRoute(evt.Lease.Network)
  107. } else {
  108. log.Error("Internal error: unknown event type: ", int(evt.Type))
  109. }
  110. }
  111. }
  112. }
  113. func proxyTunToUdp(r *Router, tun *os.File, conn *net.UDPConn, tunMTU uint) {
  114. pkt := make([]byte, tunMTU)
  115. for {
  116. nbytes, err := tun.Read(pkt)
  117. if err != nil {
  118. log.V(1).Info("Error reading from TUN device: ", err)
  119. } else {
  120. r.routePacket(pkt[:nbytes], conn)
  121. }
  122. }
  123. }
  124. func proxyUdpToTun(conn *net.UDPConn, tun *os.File, tunMTU uint) {
  125. pkt := make([]byte, tunMTU)
  126. for {
  127. nrecv, err := conn.Read(pkt)
  128. if err != nil {
  129. log.V(1).Info("Error reading from socket: ", err)
  130. } else {
  131. nsent, err := tun.Write(pkt[:nrecv])
  132. switch {
  133. case err != nil:
  134. log.V(1).Info("Error writing to TUN device: ", err)
  135. case nsent != nrecv:
  136. log.V(1).Infof("Was only able to write %d out of %d bytes to TUN device: ", nsent, nrecv)
  137. }
  138. }
  139. }
  140. }