Просмотр исходного кода

subnet: break apart response-handling in watchLeases

Brian Waldon 10 лет назад
Родитель
Сommit
fe10ea6326
1 измененных файлов с 55 добавлено и 31 удалено
  1. 55 31
      subnet/subnet.go

+ 55 - 31
subnet/subnet.go

@@ -3,6 +3,7 @@ package subnet
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"net"
 	"regexp"
 	"strconv"
@@ -346,46 +347,69 @@ func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
 	for {
 		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, sm.stop)
 
+		// watchSubnets exited by stop chan being signaled
+		if err == nil && resp == nil {
+			return
+		}
+
+		var batch *EventBatch
 		if err == nil {
-			if resp == nil {
-				// watchSubnets exited by stop chan being signaled
-				return
-			}
-			sm.lastIndex = resp.EtcdIndex
+			batch, err = sm.parseSubnetWatchResponse(resp)
+		} else {
+			batch, err = sm.parseSubnetWatchError(err)
+		}
 
-			sn, err := parseSubnetKey(resp.Node.Key)
-			if err != nil {
-				log.Error("Error parsing subnet IP: ", resp.Node.Key)
-				time.Sleep(time.Second)
-				continue
-			}
+		if err != nil {
+			log.Errorf("%v", err)
+			continue
+		}
 
-			// Don't process our own changes
-			if !sm.myLease.Network.Equal(sn) {
-				evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
-				receiver <- EventBatch{evt}
-			}
+		if batch != nil {
+			receiver <- *batch
+		}
+	}
+}
 
-		} else if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == etcdEventIndexCleared {
-			// etcd maintains a history window for events and it's possible to fall behind.
-			// to recover, get the current state and then "diff" against our cache to generate
-			// events for the caller
-			log.Warning("Watch of subnet leases failed b/c index outside history window")
-			leases, err := sm.getLeases()
-			if err != nil {
-				log.Errorf("Failed to retrieve subnet leases: ", err)
-				time.Sleep(time.Second)
-				continue
-			}
+func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *EventBatch, err error) {
+	sm.lastIndex = resp.EtcdIndex
 
-			batch = sm.applyLeases(leases)
-			receiver <- batch
+	sn, err := parseSubnetKey(resp.Node.Key)
+	if err != nil {
+		time.Sleep(time.Second)
+		err = fmt.Errorf("Error parsing subnet IP: %s", resp.Node.Key)
+		return
+	}
+
+	// Don't process our own changes
+	if !sm.myLease.Network.Equal(sn) {
+		evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
+		batch = &EventBatch{evt}
+	}
 
+	return
+}
+
+func (sm *SubnetManager) parseSubnetWatchError(err error) (batch *EventBatch, out error) {
+	etcdErr, ok := err.(*etcd.EtcdError)
+	if ok && etcdErr.ErrorCode == etcdEventIndexCleared {
+		// etcd maintains a history window for events and it's possible to fall behind.
+		// to recover, get the current state and then "diff" against our cache to generate
+		// events for the caller
+		log.Warning("Watch of subnet leases failed because etcd index outside history window")
+
+		leases, err := sm.getLeases()
+		if err == nil {
+			lb := sm.applyLeases(leases)
+			batch = &lb
 		} else {
-			log.Error("Watch of subnet leases failed: ", err)
-			continue
+			out = fmt.Errorf("Failed to retrieve subnet leases: %v", err)
+			time.Sleep(time.Second)
 		}
+	} else {
+		out = fmt.Errorf("Watch of subnet leases failed: ", err)
 	}
+
+	return
 }
 
 func (sm *SubnetManager) leaseRenewer() {