local_manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "errors"
  17. "fmt"
  18. "strconv"
  19. "time"
  20. etcd "github.com/coreos/etcd/client"
  21. "github.com/coreos/flannel/pkg/ip"
  22. log "github.com/golang/glog"
  23. "golang.org/x/net/context"
  24. )
  25. const (
  26. raceRetries = 10
  27. subnetTTL = 24 * time.Hour
  28. )
  29. type LocalManager struct {
  30. registry Registry
  31. }
  32. type watchCursor struct {
  33. index uint64
  34. }
  35. func isErrEtcdTestFailed(e error) bool {
  36. if e == nil {
  37. return false
  38. }
  39. etcdErr, ok := e.(etcd.Error)
  40. return ok && etcdErr.Code == etcd.ErrorCodeTestFailed
  41. }
  42. func isErrEtcdNodeExist(e error) bool {
  43. if e == nil {
  44. return false
  45. }
  46. etcdErr, ok := e.(etcd.Error)
  47. return ok || etcdErr.Code == etcd.ErrorCodeNodeExist
  48. }
  49. func isErrEtcdKeyNotFound(e error) bool {
  50. if e == nil {
  51. return false
  52. }
  53. etcdErr, ok := e.(etcd.Error)
  54. return ok || etcdErr.Code == etcd.ErrorCodeKeyNotFound
  55. }
  56. func (c watchCursor) String() string {
  57. return strconv.FormatUint(c.index, 10)
  58. }
  59. func NewLocalManager(config *EtcdConfig) (Manager, error) {
  60. r, err := newEtcdSubnetRegistry(config, nil)
  61. if err != nil {
  62. return nil, err
  63. }
  64. return newLocalManager(r), nil
  65. }
  66. func newLocalManager(r Registry) Manager {
  67. return &LocalManager{
  68. registry: r,
  69. }
  70. }
  71. func (m *LocalManager) GetNetworkConfig(ctx context.Context, network string) (*Config, error) {
  72. cfg, err := m.registry.getNetworkConfig(ctx, network)
  73. if err != nil {
  74. return nil, err
  75. }
  76. return ParseConfig(cfg)
  77. }
  78. func (m *LocalManager) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) {
  79. config, err := m.GetNetworkConfig(ctx, network)
  80. if err != nil {
  81. return nil, err
  82. }
  83. for i := 0; i < raceRetries; i++ {
  84. l, err := m.tryAcquireLease(ctx, network, config, attrs.PublicIP, attrs)
  85. switch err {
  86. case nil:
  87. return l, nil
  88. case errTryAgain:
  89. continue
  90. default:
  91. return nil, err
  92. }
  93. }
  94. return nil, errors.New("Max retries reached trying to acquire a subnet")
  95. }
  96. func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
  97. for _, l := range leases {
  98. if pubIP == l.Attrs.PublicIP {
  99. return &l
  100. }
  101. }
  102. return nil
  103. }
  104. func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIaddr ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
  105. leases, _, err := m.registry.getSubnets(ctx, network)
  106. if err != nil {
  107. return nil, err
  108. }
  109. // try to reuse a subnet if there's one that matches our IP
  110. if l := findLeaseByIP(leases, extIaddr); l != nil {
  111. // make sure the existing subnet is still within the configured network
  112. if isSubnetConfigCompat(config, l.Subnet) {
  113. log.Infof("Found lease (%v) for current IP (%v), reusing", l.Subnet, extIaddr)
  114. ttl := time.Duration(0)
  115. if !l.Expiration.IsZero() {
  116. // Not a reservation
  117. ttl = subnetTTL
  118. }
  119. exp, err := m.registry.updateSubnet(ctx, network, l.Subnet, attrs, ttl, 0)
  120. if err != nil {
  121. return nil, err
  122. }
  123. l.Attrs = *attrs
  124. l.Expiration = exp
  125. return l, nil
  126. } else {
  127. log.Infof("Found lease (%v) for current IP (%v) but not compatible with current config, deleting", l.Subnet, extIaddr)
  128. if err := m.registry.deleteSubnet(ctx, network, l.Subnet); err != nil {
  129. return nil, err
  130. }
  131. }
  132. }
  133. // no existing match, grab a new one
  134. sn, err := m.allocateSubnet(config, leases)
  135. if err != nil {
  136. return nil, err
  137. }
  138. exp, err := m.registry.createSubnet(ctx, network, sn, attrs, subnetTTL)
  139. switch {
  140. case err == nil:
  141. return &Lease{
  142. Subnet: sn,
  143. Attrs: *attrs,
  144. Expiration: exp,
  145. }, nil
  146. case isErrEtcdNodeExist(err):
  147. return nil, errTryAgain
  148. default:
  149. return nil, err
  150. }
  151. }
  152. func (m *LocalManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
  153. log.Infof("Picking subnet in range %s ... %s", config.SubnetMin, config.SubnetMax)
  154. var bag []ip.IP4
  155. sn := ip.IP4Net{IP: config.SubnetMin, PrefixLen: config.SubnetLen}
  156. OuterLoop:
  157. for ; sn.IP <= config.SubnetMax && len(bag) < 100; sn = sn.Next() {
  158. for _, l := range leases {
  159. if sn.Overlaps(l.Subnet) {
  160. continue OuterLoop
  161. }
  162. }
  163. bag = append(bag, sn.IP)
  164. }
  165. if len(bag) == 0 {
  166. return ip.IP4Net{}, errors.New("out of subnets")
  167. } else {
  168. i := randInt(0, len(bag))
  169. return ip.IP4Net{IP: bag[i], PrefixLen: config.SubnetLen}, nil
  170. }
  171. }
  172. func (m *LocalManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
  173. return m.registry.deleteSubnet(ctx, network, sn)
  174. }
  175. func (m *LocalManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
  176. exp, err := m.registry.updateSubnet(ctx, network, lease.Subnet, &lease.Attrs, subnetTTL, 0)
  177. if err != nil {
  178. return err
  179. }
  180. lease.Expiration = exp
  181. return nil
  182. }
  183. func getNextIndex(cursor interface{}) (uint64, error) {
  184. nextIndex := uint64(0)
  185. if wc, ok := cursor.(watchCursor); ok {
  186. nextIndex = wc.index
  187. } else if s, ok := cursor.(string); ok {
  188. var err error
  189. nextIndex, err = strconv.ParseUint(s, 10, 64)
  190. if err != nil {
  191. return 0, fmt.Errorf("failed to parse cursor: %v", err)
  192. }
  193. } else {
  194. return 0, fmt.Errorf("internal error: watch cursor is of unknown type")
  195. }
  196. return nextIndex, nil
  197. }
  198. func (m *LocalManager) leaseWatchReset(ctx context.Context, network string, sn ip.IP4Net) (LeaseWatchResult, error) {
  199. l, index, err := m.registry.getSubnet(ctx, network, sn)
  200. if err != nil {
  201. return LeaseWatchResult{}, err
  202. }
  203. return LeaseWatchResult{
  204. Snapshot: []Lease{*l},
  205. Cursor: watchCursor{index},
  206. }, nil
  207. }
  208. func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) {
  209. if cursor == nil {
  210. return m.leaseWatchReset(ctx, network, sn)
  211. }
  212. nextIndex, err := getNextIndex(cursor)
  213. if err != nil {
  214. return LeaseWatchResult{}, err
  215. }
  216. evt, index, err := m.registry.watchSubnet(ctx, network, nextIndex, sn)
  217. switch {
  218. case err == nil:
  219. return LeaseWatchResult{
  220. Events: []Event{evt},
  221. Cursor: watchCursor{index},
  222. }, nil
  223. case isIndexTooSmall(err):
  224. log.Warning("Watch of subnet leases failed because etcd index outside history window")
  225. return m.leaseWatchReset(ctx, network, sn)
  226. default:
  227. return LeaseWatchResult{}, err
  228. }
  229. }
  230. func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) {
  231. if cursor == nil {
  232. return m.leasesWatchReset(ctx, network)
  233. }
  234. nextIndex, err := getNextIndex(cursor)
  235. if err != nil {
  236. return LeaseWatchResult{}, err
  237. }
  238. evt, index, err := m.registry.watchSubnets(ctx, network, nextIndex)
  239. switch {
  240. case err == nil:
  241. return LeaseWatchResult{
  242. Events: []Event{evt},
  243. Cursor: watchCursor{index},
  244. }, nil
  245. case isIndexTooSmall(err):
  246. log.Warning("Watch of subnet leases failed because etcd index outside history window")
  247. return m.leasesWatchReset(ctx, network)
  248. default:
  249. return LeaseWatchResult{}, err
  250. }
  251. }
  252. func (m *LocalManager) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) {
  253. if cursor == nil {
  254. return m.networkWatchReset(ctx)
  255. }
  256. nextIndex, err := getNextIndex(cursor)
  257. if err != nil {
  258. return NetworkWatchResult{}, err
  259. }
  260. for {
  261. evt, index, err := m.registry.watchNetworks(ctx, nextIndex)
  262. switch {
  263. case err == nil:
  264. return NetworkWatchResult{
  265. Events: []Event{evt},
  266. Cursor: watchCursor{index},
  267. }, nil
  268. case err == errTryAgain:
  269. nextIndex = index
  270. case isIndexTooSmall(err):
  271. log.Warning("Watch of networks failed because etcd index outside history window")
  272. return m.networkWatchReset(ctx)
  273. default:
  274. return NetworkWatchResult{}, err
  275. }
  276. }
  277. }
  278. func isIndexTooSmall(err error) bool {
  279. etcdErr, ok := err.(etcd.Error)
  280. return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
  281. }
  282. // leasesWatchReset is called when incremental lease watch failed and we need to grab a snapshot
  283. func (m *LocalManager) leasesWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
  284. wr := LeaseWatchResult{}
  285. leases, index, err := m.registry.getSubnets(ctx, network)
  286. if err != nil {
  287. return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
  288. }
  289. wr.Cursor = watchCursor{index}
  290. wr.Snapshot = leases
  291. return wr, nil
  292. }
  293. // networkWatchReset is called when incremental network watch failed and we need to grab a snapshot
  294. func (m *LocalManager) networkWatchReset(ctx context.Context) (NetworkWatchResult, error) {
  295. wr := NetworkWatchResult{}
  296. networks, index, err := m.registry.getNetworks(ctx)
  297. if err != nil {
  298. return wr, fmt.Errorf("failed to retrieve networks: %v", err)
  299. }
  300. wr.Cursor = watchCursor{index}
  301. wr.Snapshot = networks
  302. return wr, nil
  303. }
  304. func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
  305. if sn.IP < config.SubnetMin || sn.IP > config.SubnetMax {
  306. return false
  307. }
  308. return sn.PrefixLen == config.SubnetLen
  309. }
  310. func (m *LocalManager) tryAddReservation(ctx context.Context, network string, r *Reservation) error {
  311. attrs := &LeaseAttrs{
  312. PublicIP: r.PublicIP,
  313. }
  314. _, err := m.registry.createSubnet(ctx, network, r.Subnet, attrs, 0)
  315. switch {
  316. case err == nil:
  317. return nil
  318. case !isErrEtcdNodeExist(err):
  319. return err
  320. }
  321. // This subnet or its reservation already exists.
  322. // Get what's there and
  323. // - if PublicIP matches, remove the TTL make it a reservation
  324. // - otherwise, error out
  325. sub, asof, err := m.registry.getSubnet(ctx, network, r.Subnet)
  326. switch {
  327. case err == nil:
  328. case isErrEtcdKeyNotFound(err):
  329. // Subnet just got expired or was deleted
  330. return errTryAgain
  331. default:
  332. return err
  333. }
  334. if sub.Attrs.PublicIP != r.PublicIP {
  335. // Subnet already taken
  336. return ErrLeaseTaken
  337. }
  338. // remove TTL
  339. _, err = m.registry.updateSubnet(ctx, network, r.Subnet, &sub.Attrs, 0, asof)
  340. if isErrEtcdTestFailed(err) {
  341. return errTryAgain
  342. }
  343. return err
  344. }
  345. func (m *LocalManager) AddReservation(ctx context.Context, network string, r *Reservation) error {
  346. config, err := m.GetNetworkConfig(ctx, network)
  347. if err != nil {
  348. return err
  349. }
  350. if config.SubnetLen != r.Subnet.PrefixLen {
  351. return fmt.Errorf("reservation subnet has mask incompatible with network config")
  352. }
  353. if !config.Network.Overlaps(r.Subnet) {
  354. return fmt.Errorf("reservation subnet is outside of flannel network")
  355. }
  356. for i := 0; i < raceRetries; i++ {
  357. err := m.tryAddReservation(ctx, network, r)
  358. switch {
  359. case err == nil:
  360. return nil
  361. case err == errTryAgain:
  362. continue
  363. default:
  364. return err
  365. }
  366. }
  367. return ErrNoMoreTries
  368. }
  369. func (m *LocalManager) tryRemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
  370. sub, asof, err := m.registry.getSubnet(ctx, network, subnet)
  371. if err != nil {
  372. return err
  373. }
  374. // add back the TTL
  375. _, err = m.registry.updateSubnet(ctx, network, subnet, &sub.Attrs, subnetTTL, asof)
  376. if isErrEtcdTestFailed(err) {
  377. return errTryAgain
  378. }
  379. return err
  380. }
  381. //RemoveReservation removes the subnet by setting TTL back to subnetTTL (24hours)
  382. func (m *LocalManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
  383. for i := 0; i < raceRetries; i++ {
  384. err := m.tryRemoveReservation(ctx, network, subnet)
  385. switch {
  386. case err == nil:
  387. return nil
  388. case err == errTryAgain:
  389. continue
  390. default:
  391. return err
  392. }
  393. }
  394. return ErrNoMoreTries
  395. }
  396. func (m *LocalManager) ListReservations(ctx context.Context, network string) ([]Reservation, error) {
  397. subnets, _, err := m.registry.getSubnets(ctx, network)
  398. if err != nil {
  399. return nil, err
  400. }
  401. rsvs := []Reservation{}
  402. for _, sub := range subnets {
  403. // Reservations don't have TTL and so no expiration
  404. if !sub.Expiration.IsZero() {
  405. continue
  406. }
  407. r := Reservation{
  408. Subnet: sub.Subnet,
  409. PublicIP: sub.Attrs.PublicIP,
  410. }
  411. rsvs = append(rsvs, r)
  412. }
  413. return rsvs, nil
  414. }