watch.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "time"
  17. "golang.org/x/net/context"
  18. "github.com/flannel-io/flannel/pkg/ip"
  19. log "k8s.io/klog"
  20. )
  21. // WatchLeases performs a long term watch of the given network's subnet leases
  22. // and communicates addition/deletion events on receiver channel. It takes care
  23. // of handling "fall-behind" logic where the history window has advanced too far
  24. // and it needs to diff the latest snapshot with its saved state and generate events
  25. func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
  26. lw := &leaseWatcher{
  27. ownLease: ownLease,
  28. }
  29. var cursor interface{}
  30. for {
  31. res, err := sm.WatchLeases(ctx, cursor)
  32. if err != nil {
  33. if err == context.Canceled || err == context.DeadlineExceeded {
  34. log.Infof("%v, close receiver chan", err)
  35. close(receiver)
  36. return
  37. }
  38. if res.Cursor != nil {
  39. cursor = res.Cursor
  40. }
  41. log.Errorf("Watch subnets: %v", err)
  42. time.Sleep(time.Second)
  43. continue
  44. }
  45. cursor = res.Cursor
  46. var batch []Event
  47. if len(res.Events) > 0 {
  48. batch = lw.update(res.Events)
  49. } else {
  50. batch = lw.reset(res.Snapshot)
  51. }
  52. if len(batch) > 0 {
  53. receiver <- batch
  54. }
  55. }
  56. }
  57. type leaseWatcher struct {
  58. ownLease *Lease
  59. leases []Lease
  60. }
  61. func (lw *leaseWatcher) reset(leases []Lease) []Event {
  62. batch := []Event{}
  63. for _, nl := range leases {
  64. if lw.ownLease != nil && nl.Subnet.Equal(lw.ownLease.Subnet) {
  65. continue
  66. }
  67. found := false
  68. for i, ol := range lw.leases {
  69. if ol.Subnet.Equal(nl.Subnet) {
  70. lw.leases = deleteLease(lw.leases, i)
  71. found = true
  72. break
  73. }
  74. }
  75. if !found {
  76. // new lease
  77. batch = append(batch, Event{EventAdded, nl})
  78. }
  79. }
  80. // everything left in sm.leases has been deleted
  81. for _, l := range lw.leases {
  82. if lw.ownLease != nil && l.Subnet.Equal(lw.ownLease.Subnet) {
  83. continue
  84. }
  85. batch = append(batch, Event{EventRemoved, l})
  86. }
  87. // copy the leases over (caution: don't just assign a slice)
  88. lw.leases = make([]Lease, len(leases))
  89. copy(lw.leases, leases)
  90. return batch
  91. }
  92. func (lw *leaseWatcher) update(events []Event) []Event {
  93. batch := []Event{}
  94. for _, e := range events {
  95. if lw.ownLease != nil && e.Lease.Subnet.Equal(lw.ownLease.Subnet) {
  96. continue
  97. }
  98. switch e.Type {
  99. case EventAdded:
  100. batch = append(batch, lw.add(&e.Lease))
  101. case EventRemoved:
  102. batch = append(batch, lw.remove(&e.Lease))
  103. }
  104. }
  105. return batch
  106. }
  107. func (lw *leaseWatcher) add(lease *Lease) Event {
  108. for i, l := range lw.leases {
  109. if l.Subnet.Equal(lease.Subnet) {
  110. lw.leases[i] = *lease
  111. return Event{EventAdded, lw.leases[i]}
  112. }
  113. }
  114. lw.leases = append(lw.leases, *lease)
  115. return Event{EventAdded, lw.leases[len(lw.leases)-1]}
  116. }
  117. func (lw *leaseWatcher) remove(lease *Lease) Event {
  118. for i, l := range lw.leases {
  119. if l.Subnet.Equal(lease.Subnet) {
  120. lw.leases = deleteLease(lw.leases, i)
  121. return Event{EventRemoved, l}
  122. }
  123. }
  124. log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
  125. return Event{EventRemoved, *lease}
  126. }
  127. func deleteLease(l []Lease, i int) []Lease {
  128. l = append(l[:i], l[i+1:]...)
  129. return l
  130. }
  131. // WatchLease performs a long term watch of the given network's subnet lease
  132. // and communicates addition/deletion events on receiver channel. It takes care
  133. // of handling "fall-behind" logic where the history window has advanced too far
  134. // and it needs to diff the latest snapshot with its saved state and generate events
  135. func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Event) {
  136. var cursor interface{}
  137. for {
  138. wr, err := sm.WatchLease(ctx, sn, cursor)
  139. if err != nil {
  140. if err == context.Canceled || err == context.DeadlineExceeded {
  141. log.Infof("%v, close receiver chan", err)
  142. close(receiver)
  143. return
  144. }
  145. log.Errorf("Subnet watch failed: %v", err)
  146. time.Sleep(time.Second)
  147. continue
  148. }
  149. if len(wr.Snapshot) > 0 {
  150. receiver <- Event{
  151. Type: EventAdded,
  152. Lease: wr.Snapshot[0],
  153. }
  154. } else {
  155. receiver <- wr.Events[0]
  156. }
  157. cursor = wr.Cursor
  158. }
  159. }