浏览代码

Merge pull request #31 from eyakubovich/master

Bug fixes and updated README.md with diagram of packet traversing the network.
Eugene Yakubovich 10 年之前
父节点
当前提交
29d66fe3ab
共有 6 个文件被更改,包括 95 次插入47 次删除
  1. 5 0
      README.md
  2. 1 4
      main.go
  3. 二进制
      packet-01.png
  4. 81 0
      subnet/registry.go
  5. 4 42
      subnet/subnet.go
  6. 4 1
      udp/proxy.c

+ 5 - 0
README.md

@@ -16,6 +16,11 @@ providers. Rudder uses the Universal TUN/TAP device and creates an overlay netwo
 using UDP to encapsulate IP packets. The subnet allocation is done with the help
 of etcd which maintains the overlay to actual IP mappings.
 
+The following diagram demonstrates the path a packet takes as it traverses the
+overlay network:
+
+![Life of a packet](./packet-01.png)
+
 ## Building Rudder
 
 * Step 1: Make sure you have Linux headers installed on your machine. On Ubuntu, run ```sudo apt-get install linux-libc-dev```. On Fedora/Redhat, run ```sudo yum install kernel-headers```.

+ 1 - 4
main.go

@@ -8,7 +8,6 @@ import (
 	"path"
 	"time"
 
-	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
@@ -105,10 +104,8 @@ func lookupIface() (*net.Interface, net.IP) {
 }
 
 func makeSubnetManager() *subnet.SubnetManager {
-	etcdCli := etcd.NewClient([]string{opts.etcdEndpoint})
-
 	for {
-		sm, err := subnet.NewSubnetManager(etcdCli, opts.etcdPrefix)
+		sm, err := subnet.NewSubnetManager(opts.etcdEndpoint, opts.etcdPrefix)
 		if err == nil {
 			return sm
 		}

二进制
packet-01.png


+ 81 - 0
subnet/registry.go

@@ -0,0 +1,81 @@
+package subnet
+
+import (
+	"sync"
+
+	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+)
+
+type subnetRegistry interface {
+	getConfig() (*etcd.Response, error)
+	getSubnets() (*etcd.Response, error)
+	createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
+	updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
+	watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
+}
+
+type etcdSubnetRegistry struct {
+	mux      sync.Mutex
+	cli      *etcd.Client
+	endpoint string
+	prefix   string
+}
+
+func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
+	return &etcdSubnetRegistry{
+		cli:      etcd.NewClient([]string{endpoint}),
+		endpoint: endpoint,
+		prefix:   prefix,
+	}
+}
+
+func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
+	resp, err := esr.client().Get(esr.prefix+"/config", false, false)
+	if err != nil {
+		return nil, err
+	}
+	return resp, nil
+}
+
+func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
+	return esr.client().Get(esr.prefix+"/subnets", false, true)
+}
+
+func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
+	return esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
+}
+
+func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
+	return esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
+}
+
+func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
+	for {
+		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
+
+		if err != nil {
+			return nil, err
+		}
+
+		if len(resp.Body) == 0 {
+			// etcd timed out, go back but recreate the client as the underlying
+			// http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
+			esr.resetClient()
+			continue
+		}
+
+		return resp.Unmarshal()
+	}
+}
+
+func (esr *etcdSubnetRegistry) client() *etcd.Client {
+	esr.mux.Lock()
+	defer esr.mux.Unlock()
+	return esr.cli
+}
+
+func (esr *etcdSubnetRegistry) resetClient() {
+	esr.mux.Lock()
+	defer esr.mux.Unlock()
+	esr.cli = etcd.NewClient([]string{esr.endpoint})
+}

+ 4 - 42
subnet/subnet.go

@@ -61,9 +61,9 @@ type Event struct {
 
 type EventBatch []Event
 
-func NewSubnetManager(etcdCli *etcd.Client, prefix string) (*SubnetManager, error) {
-	esr := etcdSubnetRegistry{etcdCli, prefix}
-	return newSubnetManager(&esr)
+func NewSubnetManager(etcdEndpoint, prefix string) (*SubnetManager, error) {
+	esr := newEtcdSubnetRegistry(etcdEndpoint, prefix)
+	return newSubnetManager(esr)
 }
 
 func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error) {
@@ -141,7 +141,6 @@ func (sm *SubnetManager) GetConfig() *Config {
 }
 
 /// Implementation
-
 func parseSubnetKey(s string) (ip.IP4Net, error) {
 	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
 		snIp := net.ParseIP(parts[1]).To4()
@@ -154,43 +153,6 @@ func parseSubnetKey(s string) (ip.IP4Net, error) {
 	return ip.IP4Net{}, errors.New("Error parsing IP Subnet")
 }
 
-type subnetRegistry interface {
-	getConfig() (*etcd.Response, error)
-	getSubnets() (*etcd.Response, error)
-	createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
-}
-
-type etcdSubnetRegistry struct {
-	cli    *etcd.Client
-	prefix string
-}
-
-func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
-	resp, err := esr.cli.Get(esr.prefix+"/config", false, false)
-	if err != nil {
-		return nil, err
-	}
-	return resp, nil
-}
-
-func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
-	return esr.cli.Get(esr.prefix+"/subnets", false, true)
-}
-
-func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.cli.Create(esr.prefix+"/subnets/"+sn, data, ttl)
-}
-
-func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.cli.Set(esr.prefix+"/subnets/"+sn, data, ttl)
-}
-
-func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
-	return esr.cli.Watch(esr.prefix+"/subnets", since, true, nil, stop)
-}
-
 func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
 	cfgResp, err := r.getConfig()
 	if err != nil {
@@ -424,7 +386,7 @@ func (sm *SubnetManager) leaseRenewer() {
 				continue
 			}
 
-			sm.leaseExp = *(resp.Node.Expiration)
+			sm.leaseExp = *resp.Node.Expiration
 			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
 			dur = sm.leaseExp.Sub(time.Now()) - renewMargin
 

+ 4 - 1
udp/proxy.c

@@ -319,7 +319,7 @@ static void tun_to_udp(int tun, int sock, char *buf, size_t buflen) {
 static void udp_to_tun(int sock, int tun, char *buf, size_t buflen) {
 	struct iphdr *iph;
 
-	ssize_t pktlen = recv(sock, buf, buflen, 0);
+	ssize_t pktlen = sock_recv_packet(sock, buf, buflen);
 	if( pktlen < 0 ) {
 		return;
 	}
@@ -400,6 +400,9 @@ void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int
 	while( !exit_flag ) {
 		int nfds = poll(fds, 3, -1);
 		if( nfds < 0 ) {
+			if( errno == EINTR )
+				continue;
+
 			log_error("Poll failed: %s\n", strerror(errno));
 			exit(1);
 		}