Browse Source

Merge pull request #19 from bcwaldon/watch-error-handling

Refactor watch event handling; sleep for 1s on all errors
Eugene Yakubovich 10 years ago
parent
commit
703e95fa91
1 changed files with 58 additions and 33 deletions
  1. 58 33
      subnet/subnet.go

+ 58 - 33
subnet/subnet.go

@@ -3,6 +3,7 @@ package subnet
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"net"
 	"regexp"
 	"strconv"
@@ -22,6 +23,8 @@ const (
 
 // etcd error codes
 const (
+	etcdKeyNotFound       = 100
+	etcdKeyAlreadyExists  = 105
 	etcdEventIndexCleared = 401
 )
 
@@ -105,7 +108,7 @@ func (sm *SubnetManager) AcquireLease(ip pkg.IP4, data string) (pkg.IP4Net, erro
 			return sn, nil
 
 		// if etcd returned Key Already Exists, try again.
-		case err.(*etcd.EtcdError).ErrorCode == 105:
+		case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
 			continue
 
 		default:
@@ -221,7 +224,7 @@ func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
 		}
 		sm.lastIndex = resp.EtcdIndex
 
-	case err.(*etcd.EtcdError).ErrorCode == 100:
+	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
 		// key not found: treat it as empty set
 		sm.lastIndex = err.(*etcd.EtcdError).Index
 
@@ -344,46 +347,68 @@ func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
 	for {
 		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, sm.stop)
 
+		// watchSubnets exited by stop chan being signaled
+		if err == nil && resp == nil {
+			return
+		}
+
+		var batch *EventBatch
 		if err == nil {
-			if resp == nil {
-				// watchSubnets exited by stop chan being signaled
-				return
-			}
-			sm.lastIndex = resp.EtcdIndex
+			batch, err = sm.parseSubnetWatchResponse(resp)
+		} else {
+			batch, err = sm.parseSubnetWatchError(err)
+		}
 
-			sn, err := parseSubnetKey(resp.Node.Key)
-			if err != nil {
-				log.Error("Error parsing subnet IP: ", resp.Node.Key)
-				time.Sleep(time.Second)
-				continue
-			}
+		if err != nil {
+			log.Errorf("%v", err)
+			time.Sleep(time.Second)
+			continue
+		}
 
-			// Don't process our own changes
-			if !sm.myLease.Network.Equal(sn) {
-				evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
-				receiver <- EventBatch{evt}
-			}
+		if batch != nil {
+			receiver <- *batch
+		}
+	}
+}
 
-		} else if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == etcdEventIndexCleared {
-			// etcd maintains a history window for events and it's possible to fall behind.
-			// to recover, get the current state and then "diff" against our cache to generate
-			// events for the caller
-			log.Warning("Watch of subnet leases failed b/c index outside history window")
-			leases, err := sm.getLeases()
-			if err != nil {
-				log.Errorf("Failed to retrieve subnet leases: ", err)
-				time.Sleep(time.Second)
-				continue
-			}
+func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *EventBatch, err error) {
+	sm.lastIndex = resp.EtcdIndex
 
-			batch = sm.applyLeases(leases)
-			receiver <- batch
+	sn, err := parseSubnetKey(resp.Node.Key)
+	if err != nil {
+		err = fmt.Errorf("Error parsing subnet IP: %s", resp.Node.Key)
+		return
+	}
+
+	// Don't process our own changes
+	if !sm.myLease.Network.Equal(sn) {
+		evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
+		batch = &EventBatch{evt}
+	}
 
+	return
+}
+
+func (sm *SubnetManager) parseSubnetWatchError(err error) (batch *EventBatch, out error) {
+	etcdErr, ok := err.(*etcd.EtcdError)
+	if ok && etcdErr.ErrorCode == etcdEventIndexCleared {
+		// etcd maintains a history window for events and it's possible to fall behind.
+		// to recover, get the current state and then "diff" against our cache to generate
+		// events for the caller
+		log.Warning("Watch of subnet leases failed because etcd index outside history window")
+
+		leases, err := sm.getLeases()
+		if err == nil {
+			lb := sm.applyLeases(leases)
+			batch = &lb
 		} else {
-			log.Error("Watch of subnet leases failed: ", err)
-			continue
+			out = fmt.Errorf("Failed to retrieve subnet leases: %v", err)
 		}
+	} else {
+		out = fmt.Errorf("Watch of subnet leases failed: ", err)
 	}
+
+	return
 }
 
 func (sm *SubnetManager) leaseRenewer() {