|
@@ -43,6 +43,8 @@ import (
|
|
|
|
|
|
"github.com/joho/godotenv"
|
|
|
|
|
|
+ "sync"
|
|
|
+
|
|
|
// Backends need to be imported for their init() to get executed and them to register
|
|
|
"github.com/coreos/flannel/backend"
|
|
|
_ "github.com/coreos/flannel/backend/alivpc"
|
|
@@ -230,17 +232,30 @@ func main() {
|
|
|
sigs := make(chan os.Signal, 1)
|
|
|
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
|
|
|
|
|
|
+ // This is the main context that everything should run in.
|
|
|
+ // All spawned goroutines should exit when cancel is called on this context.
|
|
|
+ // Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
|
|
|
+ // to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
|
|
|
+ // blocking and returning only when cancel() is called.
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
- go shutdown(sigs, cancel)
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ shutdownHandler(ctx, sigs, cancel)
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
|
|
|
if opts.healthzPort > 0 {
|
|
|
+ // It's not super easy to shutdown the HTTP server so don't attempt to stop it cleanly
|
|
|
go mustRunHealthz()
|
|
|
}
|
|
|
|
|
|
// Fetch the network config (i.e. what backend to use etc..).
|
|
|
config, err := getConfig(ctx, sm)
|
|
|
if err == errCanceled {
|
|
|
- exit()
|
|
|
+ wg.Wait()
|
|
|
+ os.Exit(0)
|
|
|
}
|
|
|
|
|
|
// Create a backend manager then use it to create the backend and register the network with it.
|
|
@@ -248,13 +263,17 @@ func main() {
|
|
|
be, err := bm.GetBackend(config.BackendType)
|
|
|
if err != nil {
|
|
|
log.Errorf("Error fetching backend: %s", err)
|
|
|
- exit()
|
|
|
+ cancel()
|
|
|
+ wg.Wait()
|
|
|
+ os.Exit(1)
|
|
|
}
|
|
|
|
|
|
bn, err := be.RegisterNetwork(ctx, config)
|
|
|
if err != nil {
|
|
|
log.Errorf("Error registering network: %s", err)
|
|
|
- exit()
|
|
|
+ cancel()
|
|
|
+ wg.Wait()
|
|
|
+ os.Exit(1)
|
|
|
}
|
|
|
|
|
|
// Set up ipMasq if needed
|
|
@@ -280,42 +299,44 @@ func main() {
|
|
|
}
|
|
|
|
|
|
// Start "Running" the backend network. This will block until the context is done so run in another goroutine.
|
|
|
- go bn.Run(ctx)
|
|
|
- log.Infof("Finished starting backend.")
|
|
|
+ log.Info("Running backend.")
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ bn.Run(ctx)
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
|
|
|
daemon.SdNotify(false, "READY=1")
|
|
|
|
|
|
// Kube subnet mgr doesn't lease the subnet for this node - it just uses the podCidr that's already assigned.
|
|
|
- if opts.kubeSubnetMgr {
|
|
|
- // Wait for the shutdown to be signalled
|
|
|
- <-ctx.Done()
|
|
|
- } else {
|
|
|
- // Block waiting to renew the lease
|
|
|
- _ = MonitorLease(ctx, sm, bn)
|
|
|
+ if !opts.kubeSubnetMgr {
|
|
|
+ err = MonitorLease(ctx, sm, bn, &wg)
|
|
|
+ if err == errInterrupted {
|
|
|
+ // The lease was "revoked" - shut everything down
|
|
|
+ cancel()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- // To get to here, the Cancel signal must have been received or the lease has been revoked.
|
|
|
- exit()
|
|
|
-}
|
|
|
-
|
|
|
-func exit() {
|
|
|
- // Wait just a second for the cancel signal to propagate everywhere, then just exit cleanly.
|
|
|
- log.Info("Waiting for cancel to propagate...")
|
|
|
- time.Sleep(time.Second)
|
|
|
- log.Info("Exiting...")
|
|
|
+ log.Info("Waiting for all goroutines to exit")
|
|
|
+ // Block waiting for all the goroutines to finish.
|
|
|
+ wg.Wait()
|
|
|
+ log.Info("Exiting cleanly...")
|
|
|
os.Exit(0)
|
|
|
}
|
|
|
|
|
|
-func shutdown(sigs chan os.Signal, cancel context.CancelFunc) {
|
|
|
- // Wait for the shutdown signal.
|
|
|
- <-sigs
|
|
|
+func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {
|
|
|
+ // Wait for the context do be Done or for the signal to come in to shutdown.
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ log.Info("Stopping shutdownHandler...")
|
|
|
+ case <-sigs:
|
|
|
+ // Call cancel on the context to close everything down.
|
|
|
+ cancel()
|
|
|
+ log.Info("shutdownHandler sent cancel signal...")
|
|
|
+ }
|
|
|
+
|
|
|
// Unregister to get default OS nuke behaviour in case we don't exit cleanly
|
|
|
signal.Stop(sigs)
|
|
|
- log.Info("Starting shutdown...")
|
|
|
-
|
|
|
- // Call cancel on the context to close everything down.
|
|
|
- cancel()
|
|
|
- log.Info("Sent cancel signal...")
|
|
|
}
|
|
|
|
|
|
func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) {
|
|
@@ -339,10 +360,16 @@ func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network) error {
|
|
|
+func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg *sync.WaitGroup) error {
|
|
|
// Use the subnet manager to start watching leases.
|
|
|
evts := make(chan subnet.Event)
|
|
|
- go subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
|
|
|
+
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+
|
|
|
renewMargin := time.Duration(opts.subnetLeaseRenewMargin) * time.Minute
|
|
|
dur := bn.Lease().Expiration.Sub(time.Now()) - renewMargin
|
|
|
|