watch.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "time"
  17. "golang.org/x/net/context"
  18. "github.com/coreos/flannel/pkg/ip"
  19. log "k8s.io/klog"
  20. )
  21. // WatchLeases performs a long term watch of the given network's subnet leases
  22. // and communicates addition/deletion events on receiver channel. It takes care
  23. // of handling "fall-behind" logic where the history window has advanced too far
  24. // and it needs to diff the latest snapshot with its saved state and generate events
  25. func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
  26. lw := &leaseWatcher{
  27. ownLease: ownLease,
  28. }
  29. var cursor interface{}
  30. for {
  31. res, err := sm.WatchLeases(ctx, cursor)
  32. if err != nil {
  33. if err == context.Canceled || err == context.DeadlineExceeded {
  34. return
  35. }
  36. if res.Cursor != nil {
  37. cursor = res.Cursor
  38. }
  39. log.Errorf("Watch subnets: %v", err)
  40. time.Sleep(time.Second)
  41. continue
  42. }
  43. cursor = res.Cursor
  44. var batch []Event
  45. if len(res.Events) > 0 {
  46. batch = lw.update(res.Events)
  47. } else {
  48. batch = lw.reset(res.Snapshot)
  49. }
  50. if len(batch) > 0 {
  51. receiver <- batch
  52. }
  53. }
  54. }
  55. type leaseWatcher struct {
  56. ownLease *Lease
  57. leases []Lease
  58. }
  59. func (lw *leaseWatcher) reset(leases []Lease) []Event {
  60. batch := []Event{}
  61. for _, nl := range leases {
  62. if lw.ownLease != nil && nl.Subnet.Equal(lw.ownLease.Subnet) {
  63. continue
  64. }
  65. found := false
  66. for i, ol := range lw.leases {
  67. if ol.Subnet.Equal(nl.Subnet) {
  68. lw.leases = deleteLease(lw.leases, i)
  69. found = true
  70. break
  71. }
  72. }
  73. if !found {
  74. // new lease
  75. batch = append(batch, Event{EventAdded, nl})
  76. }
  77. }
  78. // everything left in sm.leases has been deleted
  79. for _, l := range lw.leases {
  80. if lw.ownLease != nil && l.Subnet.Equal(lw.ownLease.Subnet) {
  81. continue
  82. }
  83. batch = append(batch, Event{EventRemoved, l})
  84. }
  85. // copy the leases over (caution: don't just assign a slice)
  86. lw.leases = make([]Lease, len(leases))
  87. copy(lw.leases, leases)
  88. return batch
  89. }
  90. func (lw *leaseWatcher) update(events []Event) []Event {
  91. batch := []Event{}
  92. for _, e := range events {
  93. if lw.ownLease != nil && e.Lease.Subnet.Equal(lw.ownLease.Subnet) {
  94. continue
  95. }
  96. switch e.Type {
  97. case EventAdded:
  98. batch = append(batch, lw.add(&e.Lease))
  99. case EventRemoved:
  100. batch = append(batch, lw.remove(&e.Lease))
  101. }
  102. }
  103. return batch
  104. }
  105. func (lw *leaseWatcher) add(lease *Lease) Event {
  106. for i, l := range lw.leases {
  107. if l.Subnet.Equal(lease.Subnet) {
  108. lw.leases[i] = *lease
  109. return Event{EventAdded, lw.leases[i]}
  110. }
  111. }
  112. lw.leases = append(lw.leases, *lease)
  113. return Event{EventAdded, lw.leases[len(lw.leases)-1]}
  114. }
  115. func (lw *leaseWatcher) remove(lease *Lease) Event {
  116. for i, l := range lw.leases {
  117. if l.Subnet.Equal(lease.Subnet) {
  118. lw.leases = deleteLease(lw.leases, i)
  119. return Event{EventRemoved, l}
  120. }
  121. }
  122. log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
  123. return Event{EventRemoved, *lease}
  124. }
  125. func deleteLease(l []Lease, i int) []Lease {
  126. l = append(l[:i], l[i+1:]...)
  127. return l
  128. }
  129. // WatchLease performs a long term watch of the given network's subnet lease
  130. // and communicates addition/deletion events on receiver channel. It takes care
  131. // of handling "fall-behind" logic where the history window has advanced too far
  132. // and it needs to diff the latest snapshot with its saved state and generate events
  133. func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Event) {
  134. var cursor interface{}
  135. for {
  136. wr, err := sm.WatchLease(ctx, sn, cursor)
  137. if err != nil {
  138. if err == context.Canceled || err == context.DeadlineExceeded {
  139. return
  140. }
  141. log.Errorf("Subnet watch failed: %v", err)
  142. time.Sleep(time.Second)
  143. continue
  144. }
  145. if len(wr.Snapshot) > 0 {
  146. receiver <- Event{
  147. Type: EventAdded,
  148. Lease: wr.Snapshot[0],
  149. }
  150. } else {
  151. receiver <- wr.Events[0]
  152. }
  153. cursor = wr.Cursor
  154. }
  155. }