watch.go 7.8 KB


  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "time"
  17. "golang.org/x/net/context"
  18. "github.com/flannel-io/flannel/pkg/ip"
  19. log "k8s.io/klog"
  20. )
  21. // WatchLeases performs a long term watch of the given network's subnet leases
  22. // and communicates addition/deletion events on receiver channel. It takes care
  23. // of handling "fall-behind" logic where the history window has advanced too far
  24. // and it needs to diff the latest snapshot with its saved state and generate events
  25. func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
  26. lw := &leaseWatcher{
  27. ownLease: ownLease,
  28. }
  29. var cursor interface{}
  30. for {
  31. res, err := sm.WatchLeases(ctx, cursor)
  32. if err != nil {
  33. if err == context.Canceled || err == context.DeadlineExceeded {
  34. log.Infof("%v, close receiver chan", err)
  35. close(receiver)
  36. return
  37. }
  38. if res.Cursor != nil {
  39. cursor = res.Cursor
  40. }
  41. log.Errorf("Watch subnets: %v", err)
  42. time.Sleep(time.Second)
  43. continue
  44. }
  45. cursor = res.Cursor
  46. var batch []Event
  47. if len(res.Events) > 0 {
  48. batch = lw.update(res.Events)
  49. } else {
  50. batch = lw.reset(res.Snapshot)
  51. }
  52. if len(batch) > 0 {
  53. receiver <- batch
  54. }
  55. }
  56. }
  57. type leaseWatcher struct {
  58. ownLease *Lease
  59. leases []Lease
  60. }
  61. func (lw *leaseWatcher) reset(leases []Lease) []Event {
  62. batch := []Event{}
  63. for _, nl := range leases {
  64. if lw.ownLease != nil && nl.EnableIPv4 && !nl.EnableIPv6 &&
  65. nl.Subnet.Equal(lw.ownLease.Subnet) {
  66. continue
  67. } else if lw.ownLease != nil && !nl.EnableIPv4 && nl.EnableIPv6 &&
  68. nl.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  69. continue
  70. } else if lw.ownLease != nil && nl.EnableIPv4 && nl.EnableIPv6 &&
  71. nl.Subnet.Equal(lw.ownLease.Subnet) &&
  72. nl.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  73. continue
  74. } else if lw.ownLease != nil && !nl.EnableIPv4 && !nl.EnableIPv6 &&
  75. nl.Subnet.Equal(lw.ownLease.Subnet) {
  76. //TODO - temporarily compatible with etcd subnet manager
  77. continue
  78. }
  79. found := false
  80. for i, ol := range lw.leases {
  81. if ol.EnableIPv4 && !ol.EnableIPv6 && ol.Subnet.Equal(nl.Subnet) {
  82. lw.leases = deleteLease(lw.leases, i)
  83. found = true
  84. break
  85. } else if ol.EnableIPv4 && !ol.EnableIPv6 && ol.IPv6Subnet.Equal(nl.IPv6Subnet) {
  86. lw.leases = deleteLease(lw.leases, i)
  87. found = true
  88. break
  89. } else if ol.EnableIPv4 && ol.EnableIPv6 && ol.Subnet.Equal(nl.Subnet) &&
  90. ol.IPv6Subnet.Equal(nl.IPv6Subnet) {
  91. lw.leases = deleteLease(lw.leases, i)
  92. found = true
  93. break
  94. } else if !ol.EnableIPv4 && !ol.EnableIPv6 && ol.Subnet.Equal(nl.Subnet) {
  95. //TODO - temporarily compatible with etcd subnet manager
  96. lw.leases = deleteLease(lw.leases, i)
  97. found = true
  98. break
  99. }
  100. }
  101. if !found {
  102. // new lease
  103. batch = append(batch, Event{EventAdded, nl})
  104. }
  105. }
  106. // everything left in sm.leases has been deleted
  107. for _, l := range lw.leases {
  108. if lw.ownLease != nil && l.EnableIPv4 && !l.EnableIPv6 &&
  109. l.Subnet.Equal(lw.ownLease.Subnet) {
  110. continue
  111. } else if lw.ownLease != nil && !l.EnableIPv4 && l.EnableIPv6 &&
  112. l.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  113. continue
  114. } else if lw.ownLease != nil && l.EnableIPv4 && l.EnableIPv6 &&
  115. l.Subnet.Equal(lw.ownLease.Subnet) &&
  116. l.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  117. continue
  118. } else if lw.ownLease != nil && !l.EnableIPv4 && !l.EnableIPv6 &&
  119. l.Subnet.Equal(lw.ownLease.Subnet) {
  120. //TODO - temporarily compatible with etcd subnet manager
  121. continue
  122. }
  123. batch = append(batch, Event{EventRemoved, l})
  124. }
  125. // copy the leases over (caution: don't just assign a slice)
  126. lw.leases = make([]Lease, len(leases))
  127. copy(lw.leases, leases)
  128. return batch
  129. }
  130. func (lw *leaseWatcher) update(events []Event) []Event {
  131. batch := []Event{}
  132. for _, e := range events {
  133. if lw.ownLease != nil && e.Lease.EnableIPv4 && !e.Lease.EnableIPv6 &&
  134. e.Lease.Subnet.Equal(lw.ownLease.Subnet) {
  135. continue
  136. } else if lw.ownLease != nil && !e.Lease.EnableIPv4 && e.Lease.EnableIPv6 &&
  137. e.Lease.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  138. continue
  139. } else if lw.ownLease != nil && e.Lease.EnableIPv4 && e.Lease.EnableIPv6 &&
  140. e.Lease.Subnet.Equal(lw.ownLease.Subnet) &&
  141. e.Lease.IPv6Subnet.Equal(lw.ownLease.IPv6Subnet) {
  142. continue
  143. } else if lw.ownLease != nil && !e.Lease.EnableIPv4 && !e.Lease.EnableIPv6 &&
  144. e.Lease.Subnet.Equal(lw.ownLease.Subnet) {
  145. //TODO - temporarily compatible with etcd subnet manager
  146. continue
  147. }
  148. switch e.Type {
  149. case EventAdded:
  150. batch = append(batch, lw.add(&e.Lease))
  151. case EventRemoved:
  152. batch = append(batch, lw.remove(&e.Lease))
  153. }
  154. }
  155. return batch
  156. }
  157. func (lw *leaseWatcher) add(lease *Lease) Event {
  158. for i, l := range lw.leases {
  159. if l.EnableIPv4 && !l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) {
  160. lw.leases[i] = *lease
  161. return Event{EventAdded, lw.leases[i]}
  162. } else if !l.EnableIPv4 && l.EnableIPv6 && l.IPv6Subnet.Equal(lease.IPv6Subnet) {
  163. lw.leases[i] = *lease
  164. return Event{EventAdded, lw.leases[i]}
  165. } else if l.EnableIPv4 && l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) &&
  166. l.IPv6Subnet.Equal(lease.IPv6Subnet) {
  167. lw.leases[i] = *lease
  168. return Event{EventAdded, lw.leases[i]}
  169. } else if !l.EnableIPv4 && !l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) {
  170. //TODO - temporarily compatible with etcd subnet manager
  171. lw.leases[i] = *lease
  172. return Event{EventAdded, lw.leases[i]}
  173. }
  174. }
  175. lw.leases = append(lw.leases, *lease)
  176. return Event{EventAdded, lw.leases[len(lw.leases)-1]}
  177. }
  178. func (lw *leaseWatcher) remove(lease *Lease) Event {
  179. for i, l := range lw.leases {
  180. if l.EnableIPv4 && !l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) {
  181. lw.leases = deleteLease(lw.leases, i)
  182. return Event{EventRemoved, l}
  183. } else if !l.EnableIPv4 && l.EnableIPv6 && l.IPv6Subnet.Equal(lease.IPv6Subnet) {
  184. lw.leases = deleteLease(lw.leases, i)
  185. return Event{EventRemoved, l}
  186. } else if l.EnableIPv4 && l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) &&
  187. l.IPv6Subnet.Equal(lease.IPv6Subnet) {
  188. lw.leases = deleteLease(lw.leases, i)
  189. return Event{EventRemoved, l}
  190. } else if !l.EnableIPv4 && !l.EnableIPv6 && l.Subnet.Equal(lease.Subnet) {
  191. //TODO - temporarily compatible with etcd subnet manager
  192. lw.leases = deleteLease(lw.leases, i)
  193. return Event{EventRemoved, l}
  194. }
  195. }
  196. log.Errorf("Removed subnet (%s) and ipv6 subnet (%s) were not found", lease.Subnet, lease.IPv6Subnet)
  197. return Event{EventRemoved, *lease}
  198. }
  199. func deleteLease(l []Lease, i int) []Lease {
  200. l = append(l[:i], l[i+1:]...)
  201. return l
  202. }
  203. // WatchLease performs a long term watch of the given network's subnet lease
  204. // and communicates addition/deletion events on receiver channel. It takes care
  205. // of handling "fall-behind" logic where the history window has advanced too far
  206. // and it needs to diff the latest snapshot with its saved state and generate events
  207. func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Event) {
  208. var cursor interface{}
  209. for {
  210. wr, err := sm.WatchLease(ctx, sn, cursor)
  211. if err != nil {
  212. if err == context.Canceled || err == context.DeadlineExceeded {
  213. log.Infof("%v, close receiver chan", err)
  214. close(receiver)
  215. return
  216. }
  217. log.Errorf("Subnet watch failed: %v", err)
  218. time.Sleep(time.Second)
  219. continue
  220. }
  221. if len(wr.Snapshot) > 0 {
  222. receiver <- Event{
  223. Type: EventAdded,
  224. Lease: wr.Snapshot[0],
  225. }
  226. } else {
  227. receiver <- wr.Events[0]
  228. }
  229. cursor = wr.Cursor
  230. }
  231. }