mock_registry.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. etcd "github.com/coreos/etcd/client"
  21. "golang.org/x/net/context"
  22. "github.com/coreos/flannel/pkg/ip"
  23. )
  24. type netwk struct {
  25. config string
  26. backendData string
  27. subnets []Lease
  28. subnetsEvents chan event
  29. mux sync.Mutex
  30. subnetEvents map[ip.IP4Net]chan event
  31. }
  32. func (n *netwk) sendSubnetEvent(sn ip.IP4Net, e event) {
  33. n.subnetsEvents <- e
  34. n.mux.Lock()
  35. c, ok := n.subnetEvents[sn]
  36. if !ok {
  37. c = make(chan event, 10)
  38. n.subnetEvents[sn] = c
  39. }
  40. n.mux.Unlock()
  41. c <- e
  42. }
  43. func (n *netwk) subnetEventsChan(sn ip.IP4Net) chan event {
  44. n.mux.Lock()
  45. c, ok := n.subnetEvents[sn]
  46. if !ok {
  47. c = make(chan event, 10)
  48. n.subnetEvents[sn] = c
  49. }
  50. n.mux.Unlock()
  51. return c
  52. }
  53. type event struct {
  54. evt Event
  55. index uint64
  56. }
  57. type MockSubnetRegistry struct {
  58. mux sync.Mutex
  59. networks map[string]*netwk
  60. networkEvents chan event
  61. index uint64
  62. }
  63. func NewMockRegistry(network, config string, initialSubnets []Lease) *MockSubnetRegistry {
  64. msr := &MockSubnetRegistry{
  65. networkEvents: make(chan event, 1000),
  66. index: 1000,
  67. networks: make(map[string]*netwk),
  68. }
  69. msr.networks[network] = &netwk{
  70. config: config,
  71. subnets: initialSubnets,
  72. subnetsEvents: make(chan event, 1000),
  73. subnetEvents: make(map[ip.IP4Net]chan event),
  74. }
  75. return msr
  76. }
  77. func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  78. msr.mux.Lock()
  79. defer msr.mux.Unlock()
  80. n, ok := msr.networks[network]
  81. if !ok {
  82. return "", fmt.Errorf("Network %s not found", network)
  83. }
  84. return n.config, nil
  85. }
  86. func (msr *MockSubnetRegistry) setConfig(network, config string) error {
  87. msr.mux.Lock()
  88. defer msr.mux.Unlock()
  89. n, ok := msr.networks[network]
  90. if !ok {
  91. return fmt.Errorf("Network %s not found", network)
  92. }
  93. n.config = config
  94. return nil
  95. }
  96. func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  97. msr.mux.Lock()
  98. defer msr.mux.Unlock()
  99. n, ok := msr.networks[network]
  100. if !ok {
  101. return nil, 0, fmt.Errorf("Network %s not found", network)
  102. }
  103. subs := make([]Lease, len(n.subnets))
  104. copy(subs, n.subnets)
  105. return subs, msr.index, nil
  106. }
  107. func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  108. msr.mux.Lock()
  109. defer msr.mux.Unlock()
  110. n, ok := msr.networks[network]
  111. if !ok {
  112. return nil, 0, fmt.Errorf("Network %s not found", network)
  113. }
  114. for _, l := range n.subnets {
  115. if l.Subnet.Equal(sn) {
  116. return &l, msr.index, nil
  117. }
  118. }
  119. return nil, msr.index, fmt.Errorf("subnet %s not found", sn)
  120. }
  121. func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  122. msr.mux.Lock()
  123. defer msr.mux.Unlock()
  124. n, ok := msr.networks[network]
  125. if !ok {
  126. return time.Time{}, fmt.Errorf("Network %s not found", network)
  127. }
  128. // check for existing
  129. if _, _, err := n.findSubnet(sn); err == nil {
  130. return time.Time{}, etcd.Error{
  131. Code: etcd.ErrorCodeNodeExist,
  132. Index: msr.index,
  133. }
  134. }
  135. msr.index += 1
  136. exp := time.Time{}
  137. if ttl != 0 {
  138. exp = clock.Now().Add(ttl)
  139. }
  140. l := Lease{
  141. Subnet: sn,
  142. Attrs: *attrs,
  143. Expiration: exp,
  144. asof: msr.index,
  145. }
  146. n.subnets = append(n.subnets, l)
  147. evt := Event{
  148. Type: EventAdded,
  149. Lease: l,
  150. Network: network,
  151. }
  152. n.sendSubnetEvent(sn, event{evt, msr.index})
  153. return exp, nil
  154. }
  155. func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  156. msr.mux.Lock()
  157. defer msr.mux.Unlock()
  158. n, ok := msr.networks[network]
  159. if !ok {
  160. return time.Time{}, fmt.Errorf("Network %s not found", network)
  161. }
  162. msr.index += 1
  163. exp := time.Time{}
  164. if ttl != 0 {
  165. exp = clock.Now().Add(ttl)
  166. }
  167. sub, i, err := n.findSubnet(sn)
  168. if err != nil {
  169. return time.Time{}, err
  170. }
  171. sub.Attrs = *attrs
  172. sub.asof = msr.index
  173. sub.Expiration = exp
  174. n.subnets[i] = sub
  175. n.sendSubnetEvent(sn, event{
  176. Event{
  177. Type: EventAdded,
  178. Lease: sub,
  179. Network: network,
  180. }, msr.index,
  181. })
  182. return sub.Expiration, nil
  183. }
  184. func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  185. msr.mux.Lock()
  186. defer msr.mux.Unlock()
  187. n, ok := msr.networks[network]
  188. if !ok {
  189. return fmt.Errorf("Network %s not found", network)
  190. }
  191. msr.index += 1
  192. sub, i, err := n.findSubnet(sn)
  193. if err != nil {
  194. return err
  195. }
  196. n.subnets[i] = n.subnets[len(n.subnets)-1]
  197. n.subnets = n.subnets[:len(n.subnets)-1]
  198. sub.asof = msr.index
  199. n.sendSubnetEvent(sn, event{
  200. Event{
  201. Type: EventRemoved,
  202. Lease: sub,
  203. Network: network,
  204. }, msr.index,
  205. })
  206. return nil
  207. }
  208. func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  209. msr.mux.Lock()
  210. n, ok := msr.networks[network]
  211. msr.mux.Unlock()
  212. if !ok {
  213. return Event{}, 0, fmt.Errorf("Network %s not found", network)
  214. }
  215. for {
  216. msr.mux.Lock()
  217. index := msr.index
  218. msr.mux.Unlock()
  219. if since < index {
  220. return Event{}, 0, etcd.Error{
  221. Code: etcd.ErrorCodeEventIndexCleared,
  222. Cause: "out of date",
  223. Message: "cursor is out of date",
  224. Index: index,
  225. }
  226. }
  227. select {
  228. case <-ctx.Done():
  229. return Event{}, 0, ctx.Err()
  230. case e := <-n.subnetsEvents:
  231. if e.index > since {
  232. return e.evt, e.index, nil
  233. }
  234. }
  235. }
  236. }
  237. func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  238. msr.mux.Lock()
  239. n, ok := msr.networks[network]
  240. msr.mux.Unlock()
  241. if !ok {
  242. return Event{}, 0, fmt.Errorf("Network %s not found", network)
  243. }
  244. for {
  245. msr.mux.Lock()
  246. index := msr.index
  247. msr.mux.Unlock()
  248. if since < index {
  249. return Event{}, msr.index, etcd.Error{
  250. Code: etcd.ErrorCodeEventIndexCleared,
  251. Cause: "out of date",
  252. Message: "cursor is out of date",
  253. Index: index,
  254. }
  255. }
  256. select {
  257. case <-ctx.Done():
  258. return Event{}, index, ctx.Err()
  259. case e := <-n.subnetEventsChan(sn):
  260. if e.index > since {
  261. return e.evt, index, nil
  262. }
  263. }
  264. }
  265. }
  266. func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
  267. msr.mux.Lock()
  268. defer msr.mux.Unlock()
  269. n, ok := msr.networks[network]
  270. if !ok {
  271. return
  272. }
  273. if sub, i, err := n.findSubnet(sn); err == nil {
  274. msr.index += 1
  275. n.subnets[i] = n.subnets[len(n.subnets)-1]
  276. n.subnets = n.subnets[:len(n.subnets)-1]
  277. sub.asof = msr.index
  278. n.sendSubnetEvent(sn, event{
  279. Event{
  280. Type: EventRemoved,
  281. Lease: sub,
  282. }, msr.index,
  283. })
  284. }
  285. }
  286. func configKeyToNetworkKey(configKey string) string {
  287. if !strings.HasSuffix(configKey, "/config") {
  288. return ""
  289. }
  290. return strings.TrimSuffix(configKey, "/config")
  291. }
  292. func (msr *MockSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  293. msr.mux.Lock()
  294. defer msr.mux.Unlock()
  295. ns := []string{}
  296. for n, _ := range msr.networks {
  297. ns = append(ns, n)
  298. }
  299. return ns, msr.index, nil
  300. }
  301. func (msr *MockSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  302. msr.mux.Lock()
  303. index := msr.index
  304. msr.mux.Unlock()
  305. for {
  306. if since < index {
  307. return Event{}, 0, etcd.Error{
  308. Code: etcd.ErrorCodeEventIndexCleared,
  309. Cause: "out of date",
  310. Message: "cursor is out of date",
  311. Index: index,
  312. }
  313. }
  314. select {
  315. case <-ctx.Done():
  316. return Event{}, 0, ctx.Err()
  317. case e := <-msr.networkEvents:
  318. if e.index > since {
  319. return e.evt, e.index, nil
  320. }
  321. }
  322. }
  323. }
  324. func (msr *MockSubnetRegistry) getNetwork(ctx context.Context, network string) (*netwk, error) {
  325. msr.mux.Lock()
  326. defer msr.mux.Unlock()
  327. n, ok := msr.networks[network]
  328. if !ok {
  329. return nil, fmt.Errorf("Network %s not found", network)
  330. }
  331. return n, nil
  332. }
  333. func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, config string) error {
  334. msr.mux.Lock()
  335. defer msr.mux.Unlock()
  336. _, ok := msr.networks[network]
  337. if ok {
  338. return fmt.Errorf("Network %s already exists", network)
  339. }
  340. msr.index += 1
  341. n := &netwk{
  342. config: network,
  343. subnetsEvents: make(chan event, 1000),
  344. subnetEvents: make(map[ip.IP4Net]chan event),
  345. }
  346. msr.networks[network] = n
  347. msr.networkEvents <- event{
  348. Event{
  349. Type: EventAdded,
  350. Network: network,
  351. }, msr.index,
  352. }
  353. return nil
  354. }
  355. func (msr *MockSubnetRegistry) DeleteNetwork(ctx context.Context, network string) error {
  356. msr.mux.Lock()
  357. defer msr.mux.Unlock()
  358. _, ok := msr.networks[network]
  359. if !ok {
  360. return fmt.Errorf("Network %s not found", network)
  361. }
  362. delete(msr.networks, network)
  363. msr.index += 1
  364. msr.networkEvents <- event{
  365. Event{
  366. Type: EventRemoved,
  367. Network: network,
  368. }, msr.index,
  369. }
  370. return nil
  371. }
  372. func (n *netwk) findSubnet(sn ip.IP4Net) (Lease, int, error) {
  373. for i, sub := range n.subnets {
  374. if sub.Subnet.Equal(sn) {
  375. return sub, i, nil
  376. }
  377. }
  378. return Lease{}, 0, fmt.Errorf("subnet not found")
  379. }
  380. func (msr *MockSubnetRegistry) createBackendData(ctx context.Context, network, data string) error {
  381. n, ok := msr.networks[network]
  382. if !ok {
  383. return fmt.Errorf("network %s not found", network)
  384. }
  385. if n.backendData == "" {
  386. n.backendData = data
  387. }
  388. return nil
  389. }
  390. func (msr *MockSubnetRegistry) getBackendData(ctx context.Context, network string) (string, error) {
  391. n, ok := msr.networks[network]
  392. if !ok {
  393. return "", fmt.Errorf("network %s not found", network)
  394. }
  395. if n.backendData == "" {
  396. return "", fmt.Errorf("backendData not set for %s network", network)
  397. }
  398. return n.backendData, nil
  399. }