watch.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "time"
  17. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  18. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  19. "github.com/coreos/flannel/pkg/ip"
  20. )
  21. // WatchLeases performs a long term watch of the given network's subnet leases
  22. // and communicates addition/deletion events on receiver channel. It takes care
  23. // of handling "fall-behind" logic where the history window has advanced too far
  24. // and it needs to diff the latest snapshot with its saved state and generate events
  25. func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Lease, receiver chan []Event) {
  26. lw := &leaseWatcher{
  27. ownLease: ownLease,
  28. }
  29. var cursor interface{}
  30. for {
  31. res, err := sm.WatchLeases(ctx, network, cursor)
  32. if err != nil {
  33. if err == context.Canceled || err == context.DeadlineExceeded {
  34. return
  35. }
  36. log.Errorf("Watch subnets: %v", err)
  37. time.Sleep(time.Second)
  38. continue
  39. }
  40. cursor = res.Cursor
  41. batch := []Event{}
  42. if len(res.Events) > 0 {
  43. batch = lw.update(res.Events)
  44. } else {
  45. batch = lw.reset(res.Snapshot)
  46. }
  47. if batch != nil {
  48. receiver <- batch
  49. }
  50. }
  51. }
  52. type leaseWatcher struct {
  53. ownLease *Lease
  54. leases []Lease
  55. }
  56. func (lw *leaseWatcher) reset(leases []Lease) []Event {
  57. batch := []Event{}
  58. for _, nl := range leases {
  59. if lw.ownLease != nil && nl.Subnet.Equal(lw.ownLease.Subnet) {
  60. continue
  61. }
  62. found := false
  63. for i, ol := range lw.leases {
  64. if ol.Subnet.Equal(nl.Subnet) {
  65. lw.leases = deleteLease(lw.leases, i)
  66. found = true
  67. break
  68. }
  69. }
  70. if !found {
  71. // new lease
  72. batch = append(batch, Event{EventAdded, nl, ""})
  73. }
  74. }
  75. // everything left in sm.leases has been deleted
  76. for _, l := range lw.leases {
  77. batch = append(batch, Event{EventRemoved, l, ""})
  78. }
  79. lw.leases = leases
  80. return batch
  81. }
  82. func (lw *leaseWatcher) update(events []Event) []Event {
  83. batch := []Event{}
  84. for _, e := range events {
  85. if lw.ownLease != nil && e.Lease.Subnet.Equal(lw.ownLease.Subnet) {
  86. continue
  87. }
  88. switch e.Type {
  89. case EventAdded:
  90. batch = append(batch, lw.add(&e.Lease))
  91. case EventRemoved:
  92. batch = append(batch, lw.remove(&e.Lease))
  93. }
  94. }
  95. return batch
  96. }
  97. func (lw *leaseWatcher) add(lease *Lease) Event {
  98. for i, l := range lw.leases {
  99. if l.Subnet.Equal(lease.Subnet) {
  100. lw.leases[i] = *lease
  101. return Event{EventAdded, lw.leases[i], ""}
  102. }
  103. }
  104. lw.leases = append(lw.leases, *lease)
  105. return Event{EventAdded, lw.leases[len(lw.leases)-1], ""}
  106. }
  107. func (lw *leaseWatcher) remove(lease *Lease) Event {
  108. for i, l := range lw.leases {
  109. if l.Subnet.Equal(lease.Subnet) {
  110. lw.leases = deleteLease(lw.leases, i)
  111. return Event{EventRemoved, l, ""}
  112. }
  113. }
  114. log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
  115. return Event{EventRemoved, *lease, ""}
  116. }
  117. func deleteLease(l []Lease, i int) []Lease {
  118. l[i], l = l[len(l)-1], l[:len(l)-1]
  119. return l
  120. }
  121. // WatchNetworks performs a long term watch of flannel networks and communicates
  122. // addition/deletion events on receiver channel. It takes care of handling
  123. // "fall-behind" logic where the history window has advanced too far and it
  124. // needs to diff the latest snapshot with its saved state and generate events
  125. func WatchNetworks(ctx context.Context, sm Manager, receiver chan []Event) {
  126. nw := newNetWatcher()
  127. var cursor interface{}
  128. for {
  129. res, err := sm.WatchNetworks(ctx, cursor)
  130. if err != nil {
  131. if err == context.Canceled || err == context.DeadlineExceeded {
  132. return
  133. }
  134. log.Errorf("Watch networks: %v", err)
  135. time.Sleep(time.Second)
  136. continue
  137. }
  138. cursor = res.Cursor
  139. batch := []Event{}
  140. if len(res.Events) > 0 {
  141. batch = nw.update(res.Events)
  142. } else {
  143. batch = nw.reset(res.Snapshot)
  144. }
  145. if batch != nil {
  146. receiver <- batch
  147. }
  148. }
  149. }
  150. type netWatcher struct {
  151. networks map[string]bool
  152. }
  153. func newNetWatcher() *netWatcher {
  154. return &netWatcher{networks: make(map[string]bool)}
  155. }
  156. func (nw *netWatcher) reset(networks []string) []Event {
  157. batch := []Event{}
  158. newNetworks := make(map[string]bool)
  159. for _, netname := range networks {
  160. if nw.networks[netname] {
  161. delete(nw.networks, netname)
  162. } else {
  163. // new network
  164. batch = append(batch, Event{EventAdded, Lease{}, netname})
  165. }
  166. newNetworks[netname] = true
  167. }
  168. // everything left in sm.networks has been deleted
  169. for netname, _ := range nw.networks {
  170. batch = append(batch, Event{EventRemoved, Lease{}, netname})
  171. }
  172. nw.networks = newNetworks
  173. return batch
  174. }
  175. func (nw *netWatcher) update(events []Event) []Event {
  176. batch := []Event{}
  177. for _, e := range events {
  178. switch e.Type {
  179. case EventAdded:
  180. batch = append(batch, nw.add(e.Network))
  181. case EventRemoved:
  182. batch = append(batch, nw.remove(e.Network))
  183. }
  184. }
  185. return batch
  186. }
  187. func (nw *netWatcher) add(network string) Event {
  188. if _, ok := nw.networks[network]; !ok {
  189. nw.networks[network] = true
  190. }
  191. return Event{EventAdded, Lease{}, network}
  192. }
  193. func (nw *netWatcher) remove(network string) Event {
  194. if _, ok := nw.networks[network]; ok {
  195. delete(nw.networks, network)
  196. } else {
  197. log.Errorf("Removed network (%s) was not found", network)
  198. }
  199. return Event{EventRemoved, Lease{}, network}
  200. }
  201. // WatchLease performs a long term watch of the given network's subnet lease
  202. // and communicates addition/deletion events on receiver channel. It takes care
  203. // of handling "fall-behind" logic where the history window has advanced too far
  204. // and it needs to diff the latest snapshot with its saved state and generate events
  205. func WatchLease(ctx context.Context, sm Manager, network string, sn ip.IP4Net, receiver chan Event) {
  206. var cursor interface{}
  207. for {
  208. wr, err := sm.WatchLease(ctx, network, sn, cursor)
  209. if err != nil {
  210. if err == context.Canceled || err == context.DeadlineExceeded {
  211. return
  212. }
  213. log.Errorf("Subnet watch failed: %v", err)
  214. time.Sleep(time.Second)
  215. continue
  216. }
  217. if len(wr.Snapshot) > 0 {
  218. receiver <- Event{
  219. Type: EventAdded,
  220. Lease: wr.Snapshot[0],
  221. }
  222. } else {
  223. receiver <- wr.Events[0]
  224. }
  225. cursor = wr.Cursor
  226. }
  227. }