cproxy.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package udp
  2. //#include "proxy.h"
  3. import "C"
  4. import (
  5. "encoding/json"
  6. "net"
  7. "os"
  8. "reflect"
  9. "syscall"
  10. "unsafe"
  11. log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
  12. "github.com/coreos-inc/kolach/pkg"
  13. "github.com/coreos-inc/kolach/subnet"
  14. )
  15. func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP pkg.IP4) {
  16. var log_errors int
  17. if log.V(1) {
  18. log_errors = 1
  19. }
  20. C.run_proxy(
  21. C.int(tun.Fd()),
  22. C.int(conn.Fd()),
  23. C.int(ctl.Fd()),
  24. C.in_addr_t(tunIP.NetworkOrder()),
  25. C.int(log_errors),
  26. )
  27. }
  28. func writeCommand(f *os.File, cmd *C.command) {
  29. hdr := reflect.SliceHeader{
  30. Data: uintptr(unsafe.Pointer(cmd)),
  31. Len: int(unsafe.Sizeof(*cmd)),
  32. Cap: int(unsafe.Sizeof(*cmd)),
  33. }
  34. buf := *(*[]byte)(unsafe.Pointer(&hdr))
  35. f.Write(buf)
  36. }
  37. func newCtlSockets() (*os.File, *os.File, error) {
  38. fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
  39. if err != nil {
  40. return nil, nil, err
  41. }
  42. f1 := os.NewFile(uintptr(fds[0]), "ctl")
  43. f2 := os.NewFile(uintptr(fds[1]), "ctl")
  44. return f1, f2, nil
  45. }
  46. func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP pkg.IP4, port int) {
  47. log.Info("Running fast proxy loop")
  48. c, err := conn.File()
  49. if err != nil {
  50. log.Error("Converting UDPConn to File failed: ", err)
  51. return
  52. }
  53. defer c.Close()
  54. ctl, peerCtl, err := newCtlSockets()
  55. if err != nil {
  56. log.Error("Failed to create control socket: ", err)
  57. return
  58. }
  59. defer ctl.Close()
  60. defer peerCtl.Close()
  61. go runCProxy(tun, c, peerCtl, tunIP)
  62. log.Info("Watching for new subnet leases")
  63. evts := make(chan subnet.EventBatch)
  64. sm.Start(evts)
  65. for evtBatch := range evts {
  66. for _, evt := range evtBatch {
  67. if evt.Type == subnet.SubnetAdded {
  68. log.Info("Subnet added: ", evt.Lease.Network)
  69. var attrs subnet.BaseAttrs
  70. if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
  71. log.Error("Error decoding subnet lease JSON: ", err)
  72. continue
  73. }
  74. cmd := C.command{
  75. cmd: C.CMD_SET_ROUTE,
  76. dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
  77. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  78. next_hop_ip: C.in_addr_t(attrs.PublicIP.NetworkOrder()),
  79. next_hop_port: C.short(port),
  80. }
  81. writeCommand(ctl, &cmd)
  82. } else if evt.Type == subnet.SubnetRemoved {
  83. log.Info("Subnet removed: ", evt.Lease.Network)
  84. cmd := C.command{
  85. cmd: C.CMD_DEL_ROUTE,
  86. dest_net: C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
  87. dest_net_len: C.int(evt.Lease.Network.PrefixLen),
  88. }
  89. writeCommand(ctl, &cmd)
  90. } else {
  91. log.Error("Internal error: unknown event type: ", int(evt.Type))
  92. }
  93. }
  94. }
  95. }