cproxy.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package udp
  2. //#include "proxy.h"
  3. import "C"
  4. import (
  5. "encoding/json"
  6. "net"
  7. "os"
  8. "reflect"
  9. "syscall"
  10. "unsafe"
  11. log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
  12. "github.com/coreos/rudder/pkg/ip"
  13. "github.com/coreos/rudder/subnet"
  14. )
  15. func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP ip.IP4, tunMTU uint) {
  16. var log_errors int
  17. if log.V(1) {
  18. log_errors = 1
  19. }
  20. C.run_proxy(
  21. C.int(tun.Fd()),
  22. C.int(conn.Fd()),
  23. C.int(ctl.Fd()),
  24. C.in_addr_t(tunIP.NetworkOrder()),
  25. C.size_t(tunMTU),
  26. C.int(log_errors),
  27. )
  28. }
  29. func writeCommand(f *os.File, cmd *C.command) {
  30. hdr := reflect.SliceHeader{
  31. Data: uintptr(unsafe.Pointer(cmd)),
  32. Len: int(unsafe.Sizeof(*cmd)),
  33. Cap: int(unsafe.Sizeof(*cmd)),
  34. }
  35. buf := *(*[]byte)(unsafe.Pointer(&hdr))
  36. f.Write(buf)
  37. }
  38. func newCtlSockets() (*os.File, *os.File, error) {
  39. fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
  40. if err != nil {
  41. return nil, nil, err
  42. }
  43. f1 := os.NewFile(uintptr(fds[0]), "ctl")
  44. f2 := os.NewFile(uintptr(fds[1]), "ctl")
  45. return f1, f2, nil
  46. }
  47. func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP ip.IP4, tunMTU uint, port int) {
  48. log.Info("Running fast proxy loop")
  49. c, err := conn.File()
  50. if err != nil {
  51. log.Error("Converting UDPConn to File failed: ", err)
  52. return
  53. }
  54. defer c.Close()
  55. ctl, peerCtl, err := newCtlSockets()
  56. if err != nil {
  57. log.Error("Failed to create control socket: ", err)
  58. return
  59. }
  60. defer ctl.Close()
  61. defer peerCtl.Close()
  62. go runCProxy(tun, c, peerCtl, tunIP, tunMTU)
  63. log.Info("Watching for new subnet leases")
  64. evts := make(chan subnet.EventBatch)
  65. sm.Start(evts)
  66. for evtBatch := range evts {
  67. for _, evt := range evtBatch {
  68. if evt.Type == subnet.SubnetAdded {
  69. log.Info("Subnet added: ", evt.Lease.Network)
  70. var attrs subnet.BaseAttrs
  71. if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
  72. log.Error("Error decoding subnet lease JSON: ", err)
  73. continue
  74. }
  75. cmd := C.command{
  76. cmd: C.CMD_SET_ROUTE,
  77. dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
  78. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  79. next_hop_ip: C.in_addr_t(attrs.PublicIP.NetworkOrder()),
  80. next_hop_port: C.short(port),
  81. }
  82. writeCommand(ctl, &cmd)
  83. } else if evt.Type == subnet.SubnetRemoved {
  84. log.Info("Subnet removed: ", evt.Lease.Network)
  85. cmd := C.command{
  86. cmd: C.CMD_DEL_ROUTE,
  87. dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
  88. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  89. }
  90. writeCommand(ctl, &cmd)
  91. } else {
  92. log.Error("Internal error: unknown event type: ", int(evt.Type))
  93. }
  94. }
  95. }
  96. }