Browse Source

added fast version (in C) of the proxy loop; ICMP net unreachable support; use TTL

Eugene Yakubovich 10 năm trước cách đây
mục cha
commit
23c28ecee4
8 tập tin đã thay đổi với 745 bổ sung175 xóa
  1. 6 4
      main.go
  2. 9 0
      pkg/ipnet.go
  3. 120 0
      udp/cproxy.go
  4. 408 0
      udp/proxy.c
  5. 168 0
      udp/proxy.go
  6. 26 0
      udp/proxy.h
  7. 0 103
      udp/router.go
  8. 8 68
      udp/run.go

+ 6 - 4
main.go

@@ -1,16 +1,16 @@
 package main
 
 import (
+	"flag"
 	"fmt"
 	"net"
 	"os"
-	"time"
-	"flag"
 	"path"
+	"time"
 
 	"github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
-	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
+	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
 
 	"github.com/coreos-inc/kolach/pkg"
 	"github.com/coreos-inc/kolach/subnet"
@@ -26,6 +26,7 @@ type CmdLineOpts struct {
 	etcdPrefix   string
 	help         bool
 	version      bool
+	slowProxy    bool
 	port         int
 	subnetFile   string
 	iface        string
@@ -39,6 +40,7 @@ func init() {
 	flag.IntVar(&opts.port, "port", defaultPort, "port to use for inter-node communications")
 	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/kolach/subnet.env", "filename where env variables (subnet and MTU values) will be written to")
 	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
+	flag.BoolVar(&opts.slowProxy, "no-fast-proxy", false, "disable accelerated proxy")
 	flag.BoolVar(&opts.help, "help", false, "print this message")
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 }
@@ -144,7 +146,7 @@ func main() {
 
 	sm := makeSubnetManager()
 
-	udp.Run(sm, iface, ip, opts.port, func(sn pkg.IP4Net, mtu int) {
+	udp.Run(sm, iface, ip, opts.port, !opts.slowProxy, func(sn pkg.IP4Net, mtu int) {
 		writeSubnet(sn, mtu)
 		daemon.SdNotify("READY=1")
 	})

+ 9 - 0
pkg/ipnet.go

@@ -48,6 +48,15 @@ func (ip IP4) ToIP() net.IP {
 	return net.IPv4(ip.Octets())
 }
 
+func (ip IP4) ToNetworkOrder() uint32 {
+	if NativelyLittle() {
+		a, b, c, d := byte(ip>>24), byte(ip>>16), byte(ip>>8), byte(ip)
+		return uint32(a) | (uint32(b) << 8) | (uint32(c) << 16) | (uint32(d) << 24)
+	} else {
+		return uint32(ip)
+	}
+}
+
 func (ip IP4) String() string {
 	return ip.ToIP().String()
 }

+ 120 - 0
udp/cproxy.go

@@ -0,0 +1,120 @@
+package udp
+
+//#include "proxy.h"
+import "C"
+
+import (
+	"encoding/json"
+	"net"
+	"os"
+	"reflect"
+	"syscall"
+	"unsafe"
+
+	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos-inc/kolach/pkg"
+	"github.com/coreos-inc/kolach/subnet"
+)
+
+func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP pkg.IP4) {
+	var log_errors int
+	if log.V(1) {
+		log_errors = 1
+	}
+	log.Info("C.run_proxy: log_errors: ", log_errors)
+
+	C.run_proxy(
+		C.int(tun.Fd()),
+		C.int(conn.Fd()),
+		C.int(ctl.Fd()),
+		C.in_addr_t(tunIP.ToNetworkOrder()),
+		C.int(log_errors),
+	)
+
+	log.Info("C.run_proxy exited")
+}
+
+func writeCommand(f *os.File, cmd *C.command) {
+	hdr := reflect.SliceHeader{
+		Data: uintptr(unsafe.Pointer(cmd)),
+		Len:  int(unsafe.Sizeof(*cmd)),
+		Cap:  int(unsafe.Sizeof(*cmd)),
+	}
+	buf := *(*[]byte)(unsafe.Pointer(&hdr))
+
+	f.Write(buf)
+}
+
+func newCtlSockets() (*os.File, *os.File, error) {
+	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	f1 := os.NewFile(uintptr(fds[0]), "ctl")
+	f2 := os.NewFile(uintptr(fds[1]), "ctl")
+	return f1, f2, nil
+}
+
+func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP pkg.IP4, port int) {
+	log.Info("Running fast proxy loop")
+
+	c, err := conn.File()
+	if err != nil {
+		log.Error("Converting UDPConn to File failed: ", err)
+		return
+	}
+	defer c.Close()
+
+	ctl, peerCtl, err := newCtlSockets()
+	if err != nil {
+		log.Error("Failed to create control socket: ", err)
+		return
+	}
+	defer ctl.Close()
+	defer peerCtl.Close()
+
+	go runCProxy(tun, c, peerCtl, tunIP)
+
+	log.Info("Watching for new subnet leases")
+	evts := make(chan subnet.EventBatch)
+	sm.Start(evts)
+
+	for evtBatch := range evts {
+		for _, evt := range evtBatch {
+			if evt.Type == subnet.SubnetAdded {
+				log.Info("Subnet added: ", evt.Lease.Network)
+				var attrs subnet.BaseAttrs
+				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
+					log.Error("Error decoding subnet lease JSON: ", err)
+					continue
+				}
+
+				cmd := C.command{
+					cmd:           C.CMD_SET_ROUTE,
+					dest_net:      C.in_addr_t(evt.Lease.Network.IP.ToNetworkOrder()),
+					dest_net_len:  C.int(evt.Lease.Network.PrefixLen),
+					next_hop_ip:   C.in_addr_t(attrs.PublicIP.ToNetworkOrder()),
+					next_hop_port: C.short(port),
+				}
+
+				writeCommand(ctl, &cmd)
+
+			} else if evt.Type == subnet.SubnetRemoved {
+				log.Info("Subnet removed: %v", evt.Lease.Network)
+
+				cmd := C.command{
+					cmd:          C.CMD_DEL_ROUTE,
+					dest_net:     C.in_addr_t(evt.Lease.Network.IP.ToNetworkOrder()),
+					dest_net_len: C.int(evt.Lease.Network.PrefixLen),
+				}
+
+				writeCommand(ctl, &cmd)
+
+			} else {
+				log.Errorf("Internal error: unknown event type: %d", int(evt.Type))
+			}
+		}
+	}
+}

+ 408 - 0
udp/proxy.c

@@ -0,0 +1,408 @@
+#include <stdlib.h>
+#include <memory.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <linux/ip.h>
+#include <linux/icmp.h>
+
+#define CMD_DEFINE
+#include "proxy.h"
+
+#define MTU 1500
+
+struct ip_net {
+	in_addr_t ip;
+	in_addr_t mask;
+};
+
+struct route_entry {
+	struct ip_net      dst;
+	struct sockaddr_in next_hop;
+};
+
+typedef struct icmp_pkt {
+	struct iphdr   iph;
+	struct icmphdr icmph;
+	/* dest unreachable must include IP hdr 8 bytes of upper layer proto
+	 * of the original packet. */
+	char    data[sizeof(struct iphdr) + MAX_IPOPTLEN + 8];
+} icmp_pkt;
+
+struct route_entry *routes;
+size_t routes_alloc;
+size_t routes_cnt;
+
+in_addr_t tun_addr;
+
+int log_enabled;
+int exit_flag;
+
+static inline in_addr_t netmask(int prefix_len) {
+	return htonl(~((uint32_t)0) << (32 - prefix_len));
+}
+
+static inline int contains(struct ip_net net, in_addr_t ip) {
+	return net.ip == (ip & net.mask);
+}
+
+static void log_error(const char *fmt, ...) {
+	va_list ap;
+
+	if( log_enabled ) {
+		va_start(ap, fmt);
+		vfprintf(stderr, fmt, ap);
+		va_end(ap);
+	}
+}
+
+/* fast version -- only works with mults of 4 bytes */
+uint16_t cksum(char* buf, int len) {
+	uint32_t sum = 0;
+	const uint32_t *b = (uint32_t*) buf;
+
+	uint16_t t1, t2;
+
+	assert(len % 4 == 0);
+
+	while( len > 0 ) {
+		uint32_t s = *b++;
+		sum += s;
+		if( sum < s )
+			sum++;
+		len -= 4;
+	}
+
+	/* Fold down to 16 bits */
+	t1 = sum;
+	t2 = sum >> 16;
+	t1 += t2;
+	if( t1 < t2 )
+		t1++;
+
+	return ~t1;
+}
+
+static void send_net_unreachable(int tun, char *offender) {
+	icmp_pkt pkt;
+	int off_iph_len;
+	struct iphdr *off_iph = (struct iphdr *)offender;
+	size_t pktlen, nsent;
+
+	off_iph_len = off_iph->ihl * 4;
+	if( off_iph_len >= sizeof(struct iphdr) + MAX_IPOPTLEN ) {
+		log_error("not sending net unreachable: mulformed ip pkt: iph=%d\n", (int)off_iph_len);
+		return; /* ip pkt mulformed */
+	}
+
+	if( off_iph->protocol == IPPROTO_ICMP ) {
+		/* To avoid infinite loops, RFC 792 instructs not to send ICMPs
+		 * about ICMPs */
+		return;
+	}
+
+	/* Lower 3 bits (in network order) of frag_off is actually flags */
+	if( (off_iph->frag_off & htons(0x1FFF)) != 0 ) {
+		/* ICMP messages are only sent for first fragemnt */
+		return;
+	}
+
+	pktlen = sizeof(struct iphdr) + sizeof(struct icmphdr) + off_iph_len + 8;
+
+	memset(&pkt, 0, sizeof(pkt));
+
+	/* Fill in the IP header */
+	pkt.iph.ihl = 5;
+	pkt.iph.version = IPVERSION;
+	pkt.iph.tot_len = htons(pktlen);
+	pkt.iph.ttl = 8;
+	pkt.iph.protocol = IPPROTO_ICMP;
+	pkt.iph.saddr = tun_addr;
+	pkt.iph.daddr = off_iph->saddr;
+	pkt.iph.check = cksum((char *)&pkt.iph, sizeof(struct iphdr));
+
+	/* Fill in the ICMP header */
+	pkt.icmph.type = ICMP_DEST_UNREACH;
+	pkt.icmph.code = ICMP_NET_UNREACH;
+
+	/* Copy the offenders IP hdr + first 8 bytes of IP payload */
+	memcpy(pkt.data, offender, off_iph_len + 8);
+
+	/* Compute the checksum over the ICMP header and data */
+	pkt.icmph.checksum = cksum((char *)&pkt.icmph, sizeof(struct icmphdr) + off_iph_len + 8);
+
+	/* Kick it back */
+	nsent = write(tun, &pkt, pktlen);
+
+	if( nsent < 0 ) {
+		log_error("failed to send ICMP net unreachable: %s\n", strerror(errno));
+	} else if( nsent != pktlen ) {
+		log_error("failed to send ICMP net unreachable: only %d out of %d byte sent\n", (int)nsent, (int)pktlen);
+	}
+}
+
+static int set_route(struct ip_net dst, struct sockaddr_in *next_hop) {
+	size_t i;
+
+	for( i = 0; i < routes_cnt; i++ ) {
+		if( dst.ip == routes[i].dst.ip && dst.mask == routes[i].dst.mask ) {
+			routes[i].next_hop = *next_hop;
+			return 0;
+		}
+	}
+
+	if( routes_alloc == routes_cnt ) {
+		int new_alloc = (routes_alloc ? 2*routes_alloc : 8);
+		struct route_entry *new_routes = realloc(routes, new_alloc);
+		if( !new_routes )
+			return ENOMEM;
+
+		routes = new_routes;
+		routes_alloc = new_alloc;
+	}
+
+	routes[routes_cnt].dst = dst;
+	routes[routes_cnt].next_hop = *next_hop;
+	routes_cnt++;
+
+	return 0;
+}
+
+static int del_route(struct ip_net dst) {
+	size_t i;
+
+	for( i = 0; i < routes_cnt; i++ ) {
+		if( dst.ip == routes[i].dst.ip && dst.mask == routes[i].dst.mask ) {
+			routes[i] = routes[routes_cnt-1];
+			routes_cnt--;
+			return 0;
+		}
+	}
+
+	return ENOENT;
+}
+
+static struct sockaddr_in *find_route(in_addr_t dst) {
+	size_t i;
+
+	for( i = 0; i < routes_cnt; i++ ) {
+		if( contains(routes[i].dst, dst) ) {
+			// packets for same dest tend to come in bursts. swap to front make it faster for subsequent ones
+			if( i != 0 ) {
+				struct route_entry tmp = routes[i];
+				routes[i] = routes[0];
+				routes[0] = tmp;
+			}
+
+			return &routes[0].next_hop;
+		}
+	}
+
+	return NULL;
+}
+
+static char *inaddr_str(in_addr_t a, char *buf, size_t len) {
+	struct in_addr addr;
+	addr.s_addr = a;
+
+	strncpy(buf, inet_ntoa(addr), len);
+	buf[len-1] = '\0';
+
+	return buf;
+}
+
+static ssize_t tun_recv_packet(int tun, char *buf, size_t buflen) {
+	ssize_t nread = read(tun, buf, buflen);
+
+	if( nread < sizeof(struct iphdr) ) {
+		if( nread < 0 ) {
+			log_error("TUN recv failed: %s\n", strerror(errno));
+		} else {
+			log_error("TUN recv packet too small: %d bytes\n", (int)nread);
+		}
+		return -1;
+	}
+
+	return nread;
+}
+
+static ssize_t sock_recv_packet(int sock, char *buf, size_t buflen) {
+	ssize_t nread = recv(sock, buf, buflen, 0);
+
+	if( nread < sizeof(struct iphdr) ) {
+		if( nread < 0 ) {
+			log_error("UDP recv failed: %s\n", strerror(errno));
+		} else {
+			log_error("UDP recv packet too small: %d bytes\n", (int)nread);
+		}
+		return -1;
+	}
+
+	return nread;
+}
+
+static void sock_send_packet(int sock, char *pkt, size_t pktlen, struct sockaddr_in *dst) {
+	ssize_t nsent = sendto(sock, pkt, pktlen, 0, (struct sockaddr *)dst, sizeof(struct sockaddr_in));
+
+	if( nsent != pktlen ) {
+		if( nsent < 0 ) {
+			log_error("UDP send to %s:%hu failed: %s\n",
+					inet_ntoa(dst->sin_addr), ntohs(dst->sin_port), strerror(errno));
+		} else {
+			log_error("Was only able to send %d out of %d bytes to %s:%hu\n",
+					(int)nsent, (int)pktlen, inet_ntoa(dst->sin_addr), ntohs(dst->sin_port));
+		}
+	}
+}
+
+static void tun_send_packet(int tun, char *pkt, size_t pktlen) {
+	ssize_t nsent = write(tun, pkt, pktlen);
+
+	if( nsent != pktlen ) {
+		if( nsent < 0 ) {
+			log_error("TUN send failed: %s\n", strerror(errno));
+		} else {
+			log_error("Was only able to send %d out of %d bytes to TUN\n", (int)nsent, (int)pktlen);
+		}
+	}
+}
+
+static void tun_to_udp(int tun, int sock) {
+	char buf[MTU];
+
+	struct iphdr *iph;
+	struct sockaddr_in *next_hop;
+
+	ssize_t pktlen = tun_recv_packet(tun, buf, MTU);
+	if( pktlen < 0 )
+		return;
+	
+	iph = (struct iphdr *)buf;
+
+	next_hop = find_route((in_addr_t) iph->daddr);
+	if( !next_hop ) {
+		send_net_unreachable(tun, buf);
+		return;
+	}
+
+	if( --(iph->ttl) == 0 ) {
+		char saddr[32], daddr[32];
+		log_error("Discarding IP fragment %s -> %s due to zero TTL\n",
+				inaddr_str(iph->saddr, saddr, sizeof(saddr)),
+				inaddr_str(iph->daddr, daddr, sizeof(daddr)));
+		return;
+	}
+
+	/* TTL modified, need to recompute checksum */
+	iph->check = 0;
+	iph->check = cksum((char *)iph, iph->ihl * 4);
+
+	sock_send_packet(sock, buf, pktlen, next_hop);
+}
+
+static void udp_to_tun(int sock, int tun) {
+	char buf[MTU];
+	struct iphdr *iph;
+
+	ssize_t pktlen = recv(sock, buf, MTU, 0);
+	if( pktlen < 0 ) {
+		return;
+	}
+
+	iph = (struct iphdr *)buf;
+
+	if( --(iph->ttl) == 0 ) {
+		char saddr[32], daddr[32];
+		log_error("Discarding IP fragment %s -> %s due to zero TTL\n",
+				inaddr_str(iph->saddr, saddr, sizeof(saddr)),
+				inaddr_str(iph->daddr, daddr, sizeof(daddr)));
+		return;
+	}
+
+	/* TTL modified, need to recompute checksum */
+	iph->check = 0;
+	iph->check = cksum((char *)iph, iph->ihl * 4);
+
+	tun_send_packet(tun, buf, pktlen);
+}
+
+static void process_cmd(int ctl) {
+	char buf[128];
+	struct ip_net ipn;
+	struct sockaddr_in sa = {
+		.sin_family = AF_INET
+	};
+
+	ssize_t nrecv = recv(ctl, buf, sizeof(buf), 0);
+	if( nrecv < 0 ) {
+		log_error("CTL recv failed: %s\n", strerror(errno));
+		return;
+	}
+
+	struct command *cmd = (struct command *)buf;
+
+	if( cmd->cmd == CMD_SET_ROUTE ) {
+		ipn.mask = netmask(cmd->dest_net_len);
+		ipn.ip = cmd->dest_net & ipn.mask;
+
+		sa.sin_addr.s_addr = cmd->next_hop_ip;
+		sa.sin_port = htons(cmd->next_hop_port);
+
+		set_route(ipn, &sa);
+
+	} else if( cmd->cmd == CMD_DEL_ROUTE ) {
+		ipn.mask = netmask(cmd->dest_net_len);
+		ipn.ip = cmd->dest_net & ipn.mask;
+
+		del_route(ipn);
+
+	} else if( cmd->cmd == CMD_STOP ) {
+		exit_flag = 1;
+	}
+}
+
+
+void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, int log_errors) {
+	struct pollfd fds[3] = {
+		{
+			.fd = tun,
+			.events = POLLIN
+		},
+		{
+			.fd = sock,
+			.events = POLLIN
+		},
+		{
+			.fd = ctl,
+			.events = POLLIN
+		},
+	};
+
+	exit_flag = 0;
+	tun_addr = tun_ip;
+	log_enabled = log_errors;
+
+	while( !exit_flag ) {
+		int nfds = poll(fds, 3, -1);
+		if( nfds < 0 ) {
+			log_error("Poll failed: %s\n", strerror(errno));
+			exit(1);
+		}
+
+		if( fds[0].revents & POLLIN )
+			tun_to_udp(tun, sock);
+
+		if( fds[1].revents & POLLIN )
+			udp_to_tun(sock, tun);
+
+		if( fds[2].revents & POLLIN )
+			process_cmd(ctl);
+	}
+}
+

+ 168 - 0
udp/proxy.go

@@ -0,0 +1,168 @@
+package udp
+
+import (
+	"encoding/json"
+	"net"
+	"os"
+	"sync"
+
+	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos-inc/kolach/pkg"
+	"github.com/coreos-inc/kolach/subnet"
+)
+
+const (
+	minIP4HdrSize = 20
+)
+
+type routeEntry struct {
+	sn   pkg.IP4Net
+	addr *net.UDPAddr
+}
+
+type Router struct {
+	mux    sync.Mutex
+	port   int
+	routes []routeEntry
+}
+
+func NewRouter(port int) *Router {
+	return &Router{
+		port: port,
+	}
+}
+
+func (r *Router) SetRoute(sn pkg.IP4Net, dst pkg.IP4) {
+	r.mux.Lock()
+	defer r.mux.Unlock()
+
+	for _, re := range r.routes {
+		if re.sn.Equal(sn) {
+			re.addr = &net.UDPAddr{
+				IP:   dst.ToIP(),
+				Port: r.port,
+			}
+			return
+		}
+	}
+
+	re := routeEntry{
+		sn: sn,
+		addr: &net.UDPAddr{
+			IP:   dst.ToIP(),
+			Port: r.port,
+		},
+	}
+
+	r.routes = append(r.routes, re)
+}
+
+func (r *Router) DelRoute(sn pkg.IP4Net) {
+	r.mux.Lock()
+	defer r.mux.Unlock()
+
+	for i, re := range r.routes {
+		if re.sn.Equal(sn) {
+			r.routes[i] = r.routes[len(r.routes)-1]
+			r.routes = r.routes[:len(r.routes)-1]
+			return
+		}
+	}
+}
+
+func (r *Router) routePacket(pkt []byte, conn *net.UDPConn) {
+	if len(pkt) < minIP4HdrSize {
+		log.V(1).Infof("Packet too small (%d bytes), unable to route", len(pkt))
+		return
+	}
+
+	r.mux.Lock()
+	defer r.mux.Unlock()
+
+	dstIP := pkg.FromBytes(pkt[16:20])
+
+	for i, re := range r.routes {
+		if re.sn.Contains(dstIP) {
+			nbytes, err := conn.WriteToUDP(pkt, re.addr)
+			switch {
+			case err != nil:
+				log.V(1).Info("UDP send failed with: ", err)
+			case nbytes != len(pkt):
+				log.V(1).Infof("Was only able to UDP send %d out of %d bytes to %s: ", nbytes, len(pkt), re.addr.IP)
+			}
+
+			// packets for same dest tend to come in burst. swap to front make it faster for subsequent ones
+			if i != 0 {
+				r.routes[0], r.routes[i] = r.routes[i], r.routes[0]
+			}
+			return
+		}
+	}
+
+	log.V(1).Info("No route found for ", dstIP)
+}
+
+func proxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, port int) {
+	log.Info("Running slow proxy loop")
+
+	rtr := NewRouter(port)
+
+	go proxyTunToUdp(rtr, tun, conn)
+	go proxyUdpToTun(conn, tun)
+
+	log.Info("Watching for new subnet leases")
+	evts := make(chan subnet.EventBatch)
+	sm.Start(evts)
+
+	for evtBatch := range evts {
+		for _, evt := range evtBatch {
+			if evt.Type == subnet.SubnetAdded {
+				log.Info("Subnet added: ", evt.Lease.Network)
+				var attrs subnet.BaseAttrs
+				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
+					log.Error("Error decoding subnet lease JSON: ", err)
+					continue
+				}
+				rtr.SetRoute(evt.Lease.Network, attrs.PublicIP)
+
+			} else if evt.Type == subnet.SubnetRemoved {
+				log.Info("Subnet removed: %v", evt.Lease.Network)
+				rtr.DelRoute(evt.Lease.Network)
+
+			} else {
+				log.Errorf("Internal error: unknown event type: %d", int(evt.Type))
+			}
+		}
+	}
+}
+
+func proxyTunToUdp(r *Router, tun *os.File, conn *net.UDPConn) {
+	pkt := make([]byte, 1600)
+	for {
+		nbytes, err := tun.Read(pkt)
+		if err != nil {
+			log.V(1).Info("Error reading from TUN device: ", err)
+		} else {
+			r.routePacket(pkt[:nbytes], conn)
+		}
+	}
+}
+
+func proxyUdpToTun(conn *net.UDPConn, tun *os.File) {
+	pkt := make([]byte, 1600)
+	for {
+		nrecv, err := conn.Read(pkt)
+		if err != nil {
+			log.V(1).Info("Error reading from socket: ", err)
+		} else {
+			nsent, err := tun.Write(pkt[:nrecv])
+			switch {
+			case err != nil:
+				log.V(1).Info("Error writing to TUN device: ", err)
+			case nsent != nrecv:
+				log.V(1).Infof("Was only able to write %d out of %d bytes to TUN device: ", nsent, nrecv)
+			}
+		}
+	}
+}

+ 26 - 0
udp/proxy.h

@@ -0,0 +1,26 @@
+#ifndef PROXY_H
+#define PROXY_H
+
+#include <netinet/in.h>
+
+#ifdef CMD_DEFINE
+#	define cmdexport
+#else
+#	define cmdexport static
+#endif
+
+cmdexport const int CMD_SET_ROUTE = 1;
+cmdexport const int CMD_DEL_ROUTE = 2;
+cmdexport const int CMD_STOP      = 3;
+
+typedef struct command {
+	int       cmd;
+	in_addr_t dest_net;
+	int       dest_net_len;
+	in_addr_t next_hop_ip;
+	short     next_hop_port;
+} command;
+
+void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, int log_errors);
+
+#endif

+ 0 - 103
udp/router.go

@@ -1,103 +0,0 @@
-package udp
-
-import (
-	"net"
-	"sync"
-
-	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos-inc/kolach/pkg"
-)
-
-const (
-	minIP4HdrSize = 20
-)
-
-type routeEntry struct {
-	sn   pkg.IP4Net
-	addr *net.UDPAddr
-}
-
-type Router struct {
-	mux    sync.Mutex
-	port   int
-	routes []routeEntry
-}
-
-func NewRouter(port int) *Router {
-	return &Router{
-		port: port,
-	}
-}
-
-func (r *Router) SetRoute(sn pkg.IP4Net, dst pkg.IP4) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for _, re := range r.routes {
-		if re.sn.Equal(sn) {
-			re.addr = &net.UDPAddr{
-				IP: dst.ToIP(),
-				Port: r.port,
-			}
-			return
-		}
-	}
-
-	re := routeEntry{
-		sn: sn,
-		addr: &net.UDPAddr{
-			IP: dst.ToIP(),
-			Port: r.port,
-		},
-	}
-
-	r.routes = append(r.routes, re)
-}
-
-func (r *Router) DelRoute(sn pkg.IP4Net) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for i, re := range r.routes {
-		if re.sn.Equal(sn) {
-			r.routes[i] = r.routes[len(r.routes)-1]
-			r.routes = r.routes[:len(r.routes)-1]
-			return
-		}
-	}
-}
-
-func (r *Router) routePacket(pkt []byte, conn *net.UDPConn) {
-	if len(pkt) < minIP4HdrSize {
-		log.V(1).Infof("Packet too small (%d bytes), unable to route", len(pkt))
-		return
-	}
-
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	dstIP := pkg.FromBytes(pkt[16:20])
-
-	for i, re := range r.routes {
-		if re.sn.Contains(dstIP) {
-			nbytes, err := conn.WriteToUDP(pkt, re.addr)
-			if err != nil || nbytes != len(pkt) {
-				if err != nil {
-					log.V(1).Info("UDP write failed with: ", err)
-				} else {
-					log.V(1).Infof("Was only able to send %d out of %d bytes to %s: ", nbytes, len(pkt), re.addr.IP)
-				}
-			}
-
-			// packets for same dest tend to come in burst. swap to front make it faster for subsequent ones
-			if i != 0 {
-				r.routes[0], r.routes[i] = r.routes[i], r.routes[0]
-			}
-			return
-		}
-	}
-
-	log.V(1).Info("No route found for ", dstIP)
-}
-

+ 8 - 68
udp/run.go

@@ -1,17 +1,16 @@
 package udp
 
 import (
-	"os"
+	"encoding/json"
 	"net"
 	"time"
-	"encoding/json"
 
 	"github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
 	log "github.com/coreos-inc/kolach/Godeps/_workspace/src/github.com/golang/glog"
 
+	"github.com/coreos-inc/kolach/backend"
 	"github.com/coreos-inc/kolach/pkg"
 	"github.com/coreos-inc/kolach/subnet"
-	"github.com/coreos-inc/kolach/backend"
 )
 
 const (
@@ -48,36 +47,6 @@ func configureIface(ifname string, ipn pkg.IP4Net, mtu int) error {
 	return nil
 }
 
-func proxyTunToUdp(r *Router, tun *os.File, conn *net.UDPConn) {
-	pkt := make([]byte, 1600)
-	for {
-		nbytes, err := tun.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from TUN device: ", err)
-		} else {
-			r.routePacket(pkt[:nbytes], conn)
-		}
-	}
-}
-
-func proxyUdpToTun(conn *net.UDPConn, tun *os.File) {
-	pkt := make([]byte, 1600)
-	for {
-		nrecv, err := conn.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from socket: ", err)
-		} else {
-			nsent, err := tun.Write(pkt[:nrecv])
-			switch {
-			case err != nil:
-				log.V(1).Info("Error writing to TUN device: ", err)
-			case nsent != nrecv:
-				log.V(1).Infof("Was only able to write %d out of %d bytes to TUN device: ", nsent, nrecv)
-			}
-		}
-	}
-}
-
 func acquireLease(sm *subnet.SubnetManager, pubIP net.IP) (pkg.IP4Net, error) {
 	attrs := subnet.BaseAttrs{
 		PublicIP: pkg.FromIP(pubIP),
@@ -101,33 +70,7 @@ func acquireLease(sm *subnet.SubnetManager, pubIP net.IP) (pkg.IP4Net, error) {
 	return sn, nil
 }
 
-func monitorEvents(sm *subnet.SubnetManager, rtr *Router) {
-	evts := make(chan subnet.EventBatch)
-	sm.Start(evts)
-
-	for evtBatch := range evts {
-		for _, evt := range evtBatch {
-			if evt.Type == subnet.SubnetAdded {
-				log.Info("Subnet added: ", evt.Lease.Network)
-				var attrs subnet.BaseAttrs
-				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-					log.Error("Error decoding subnet lease JSON: ", err)
-					continue
-				}
-				rtr.SetRoute(evt.Lease.Network, attrs.PublicIP)
-
-			} else if evt.Type == subnet.SubnetRemoved {
-				log.Info("Subnet removed: %v", evt.Lease.Network)
-				rtr.DelRoute(evt.Lease.Network)
-
-			} else {
-				log.Errorf("Internal error: unknown event type: %d", int(evt.Type))
-			}
-		}
-	}
-}
-
-func Run(sm *subnet.SubnetManager, iface *net.Interface, ip net.IP, port int, ready backend.ReadyFunc) {
+func Run(sm *subnet.SubnetManager, iface *net.Interface, ip net.IP, port int, fast bool, ready backend.ReadyFunc) {
 	sn, err := acquireLease(sm, ip)
 	if err != nil {
 		log.Error("Failed to acquire lease: ", err)
@@ -170,16 +113,13 @@ func Run(sm *subnet.SubnetManager, iface *net.Interface, ip net.IP, port int, re
 		return
 	}
 
-	rtr := NewRouter(port)
-
 	// all initialized and ready for business
 	log.Info("UDP encapsulation initialized")
 	ready(sn, mtu)
 
-	log.Info("Dispatching to run the proxy loop")
-	go proxyTunToUdp(rtr, tun, conn)
-	go proxyUdpToTun(conn, tun)
-
-	log.Info("Watching for new subnet leases")
-	monitorEvents(sm, rtr)
+	if fast {
+		fastProxy(sm, tun, conn, ipn.IP, port)
+	} else {
+		proxy(sm, tun, conn, port)
+	}
 }