Quellcode durchsuchen

Fix flannel hang if lease expired

Chun Chen vor 4 Jahren
Ursprung
Commit
78035d0c8c

+ 5 - 4
backend/extension/extension_network.go

@@ -61,11 +61,12 @@ func (n *network) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-evts:
+		case evtBatch, ok := <-evts:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			n.handleSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }

+ 5 - 4
backend/ipsec/ipsec_network.go

@@ -94,12 +94,13 @@ func (n *network) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtsBatch := <-evts:
+		case evtsBatch, ok := <-evts:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			log.Info("Handling event")
 			n.handleSubnetEvents(evtsBatch)
-		case <-ctx.Done():
-			log.Info("Received DONE")
-			return
 		}
 	}
 }

+ 5 - 4
backend/route_network.go

@@ -69,11 +69,12 @@ func (n *RouteNetwork) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-evts:
+		case evtBatch, ok := <-evts:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			n.handleSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }

+ 5 - 4
backend/route_network_windows.go

@@ -67,11 +67,12 @@ func (n *RouteNetwork) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-evts:
+		case evtBatch, ok := <-evts:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			n.handleSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }

+ 6 - 5
backend/udp/udp_network_amd64.go

@@ -109,12 +109,13 @@ func (n *network) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-evts:
+		case evtBatch, ok := <-evts:
+			if !ok {
+				log.Infof("evts chan closed")
+				stopProxy(n.ctl)
+				return
+			}
 			n.processSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			stopProxy(n.ctl)
-			return
 		}
 	}
 }

+ 5 - 4
backend/vxlan/vxlan_network.go

@@ -69,11 +69,12 @@ func (nw *network) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-events:
+		case evtBatch, ok := <-events:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			nw.handleSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }

+ 5 - 4
backend/vxlan/vxlan_network_windows.go

@@ -73,11 +73,12 @@ func (nw *network) Run(ctx context.Context) {
 
 	for {
 		select {
-		case evtBatch := <-events:
+		case evtBatch, ok := <-events:
+			if !ok {
+				log.Infof("evts chan closed")
+				return
+			}
 			nw.handleSubnetEvents(evtBatch)
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }

+ 5 - 5
main.go

@@ -426,7 +426,11 @@ func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg
 			log.Info("Lease renewed, new expiration: ", bn.Lease().Expiration)
 			dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
 
-		case e := <-evts:
+		case e, ok := <-evts:
+			if !ok {
+				log.Infof("Stopped monitoring lease")
+				return errCanceled
+			}
 			switch e.Type {
 			case subnet.EventAdded:
 				bn.Lease().Expiration = e.Lease.Expiration
@@ -437,10 +441,6 @@ func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg
 				log.Error("Lease has been revoked. Shutting down daemon.")
 				return errInterrupted
 			}
-
-		case <-ctx.Done():
-			log.Infof("Stopped monitoring lease")
-			return errCanceled
 		}
 	}
 }

+ 1 - 1
subnet/etcdv2/mock_etcd.go

@@ -178,7 +178,7 @@ func (me *mockEtcd) sendEvent(resp *etcd.Response) {
 	me.events = append(me.events, resp)
 
 	// and notify watchers
-	for w, _ := range me.watchers {
+	for w := range me.watchers {
 		w.notifyEvent(resp)
 	}
 }

+ 4 - 0
subnet/watch.go

@@ -37,6 +37,8 @@ func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan
 		res, err := sm.WatchLeases(ctx, cursor)
 		if err != nil {
 			if err == context.Canceled || err == context.DeadlineExceeded {
+				log.Infof("%v, close receiver chan", err)
+				close(receiver)
 				return
 			}
 
@@ -169,6 +171,8 @@ func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Eve
 		wr, err := sm.WatchLease(ctx, sn, cursor)
 		if err != nil {
 			if err == context.Canceled || err == context.DeadlineExceeded {
+				log.Infof("%v, close receiver chan", err)
+				close(receiver)
 				return
 			}