cproxy.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package udp
  2. //#include "proxy.h"
  3. import "C"
  4. import (
  5. "encoding/json"
  6. "net"
  7. "os"
  8. "reflect"
  9. "syscall"
  10. "unsafe"
  11. log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
  12. "github.com/coreos-inc/kolach/pkg"
  13. "github.com/coreos-inc/kolach/subnet"
  14. )
  15. func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP pkg.IP4) {
  16. var log_errors int
  17. if log.V(1) {
  18. log_errors = 1
  19. }
  20. log.Info("C.run_proxy: log_errors: ", log_errors)
  21. C.run_proxy(
  22. C.int(tun.Fd()),
  23. C.int(conn.Fd()),
  24. C.int(ctl.Fd()),
  25. C.in_addr_t(tunIP.ToNetworkOrder()),
  26. C.int(log_errors),
  27. )
  28. log.Info("C.run_proxy exited")
  29. }
  30. func writeCommand(f *os.File, cmd *C.command) {
  31. hdr := reflect.SliceHeader{
  32. Data: uintptr(unsafe.Pointer(cmd)),
  33. Len: int(unsafe.Sizeof(*cmd)),
  34. Cap: int(unsafe.Sizeof(*cmd)),
  35. }
  36. buf := *(*[]byte)(unsafe.Pointer(&hdr))
  37. f.Write(buf)
  38. }
  39. func newCtlSockets() (*os.File, *os.File, error) {
  40. fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
  41. if err != nil {
  42. return nil, nil, err
  43. }
  44. f1 := os.NewFile(uintptr(fds[0]), "ctl")
  45. f2 := os.NewFile(uintptr(fds[1]), "ctl")
  46. return f1, f2, nil
  47. }
  48. func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP pkg.IP4, port int) {
  49. log.Info("Running fast proxy loop")
  50. c, err := conn.File()
  51. if err != nil {
  52. log.Error("Converting UDPConn to File failed: ", err)
  53. return
  54. }
  55. defer c.Close()
  56. ctl, peerCtl, err := newCtlSockets()
  57. if err != nil {
  58. log.Error("Failed to create control socket: ", err)
  59. return
  60. }
  61. defer ctl.Close()
  62. defer peerCtl.Close()
  63. go runCProxy(tun, c, peerCtl, tunIP)
  64. log.Info("Watching for new subnet leases")
  65. evts := make(chan subnet.EventBatch)
  66. sm.Start(evts)
  67. for evtBatch := range evts {
  68. for _, evt := range evtBatch {
  69. if evt.Type == subnet.SubnetAdded {
  70. log.Info("Subnet added: ", evt.Lease.Network)
  71. var attrs subnet.BaseAttrs
  72. if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
  73. log.Error("Error decoding subnet lease JSON: ", err)
  74. continue
  75. }
  76. cmd := C.command{
  77. cmd: C.CMD_SET_ROUTE,
  78. dest_net: C.in_addr_t(evt.Lease.Network.IP.ToNetworkOrder()),
  79. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  80. next_hop_ip: C.in_addr_t(attrs.PublicIP.ToNetworkOrder()),
  81. next_hop_port: C.short(port),
  82. }
  83. writeCommand(ctl, &cmd)
  84. } else if evt.Type == subnet.SubnetRemoved {
  85. log.Info("Subnet removed: %v", evt.Lease.Network)
  86. cmd := C.command{
  87. cmd: C.CMD_DEL_ROUTE,
  88. dest_net: C.in_addr_t(evt.Lease.Network.IP.ToNetworkOrder()),
  89. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  90. }
  91. writeCommand(ctl, &cmd)
  92. } else {
  93. log.Errorf("Internal error: unknown event type: %d", int(evt.Type))
  94. }
  95. }
  96. }
  97. }