registry.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
  24. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
  25. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  26. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. )
  29. var (
  30. subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
  31. errTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context, network string) (string, error)
  35. getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. getNetworks(ctx context.Context) ([]string, uint64, error)
  43. watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
  44. }
  45. type EtcdConfig struct {
  46. Endpoints []string
  47. Keyfile string
  48. Certfile string
  49. CAFile string
  50. Prefix string
  51. }
  52. type etcdNewFunc func(c *EtcdConfig) (etcd.KeysAPI, error)
  53. type etcdSubnetRegistry struct {
  54. cliNewFunc etcdNewFunc
  55. mux sync.Mutex
  56. cli etcd.KeysAPI
  57. etcdCfg *EtcdConfig
  58. networkRegex *regexp.Regexp
  59. }
  60. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  61. tlsInfo := transport.TLSInfo{
  62. CertFile: c.Certfile,
  63. KeyFile: c.Keyfile,
  64. CAFile: c.CAFile,
  65. }
  66. t, err := transport.NewTransport(tlsInfo)
  67. if err != nil {
  68. return nil, err
  69. }
  70. cli, err := etcd.New(etcd.Config{
  71. Endpoints: c.Endpoints,
  72. Transport: t,
  73. })
  74. if err != nil {
  75. return nil, err
  76. }
  77. return etcd.NewKeysAPI(cli), nil
  78. }
  79. func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error) {
  80. r := &etcdSubnetRegistry{
  81. etcdCfg: config,
  82. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  83. }
  84. if cliNewFunc != nil {
  85. r.cliNewFunc = cliNewFunc
  86. } else {
  87. r.cliNewFunc = newEtcdClient
  88. }
  89. var err error
  90. r.cli, err = r.cliNewFunc(config)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return r, nil
  95. }
  96. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  97. key := path.Join(esr.etcdCfg.Prefix, network, "config")
  98. resp, err := esr.client().Get(ctx, key, nil)
  99. if err != nil {
  100. return "", err
  101. }
  102. return resp.Node.Value, nil
  103. }
  104. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  105. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  106. // point for etcd watch.
  107. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  108. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  109. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true})
  110. if err != nil {
  111. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  112. // key not found: treat it as empty set
  113. return []Lease{}, etcdErr.Index, nil
  114. }
  115. return nil, 0, err
  116. }
  117. leases := []Lease{}
  118. for _, node := range resp.Node.Nodes {
  119. l, err := nodeToLease(node)
  120. if err != nil {
  121. log.Warningf("Ignoring bad subnet node: %v", err)
  122. continue
  123. }
  124. leases = append(leases, *l)
  125. }
  126. return leases, resp.Index, nil
  127. }
  128. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  129. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  130. resp, err := esr.client().Get(ctx, key, nil)
  131. if err != nil {
  132. return nil, 0, err
  133. }
  134. l, err := nodeToLease(resp.Node)
  135. return l, resp.Index, err
  136. }
  137. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  138. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  139. value, err := json.Marshal(attrs)
  140. if err != nil {
  141. return time.Time{}, err
  142. }
  143. opts := &etcd.SetOptions{
  144. PrevExist: etcd.PrevNoExist,
  145. TTL: ttl,
  146. }
  147. resp, err := esr.client().Set(ctx, key, string(value), opts)
  148. if err != nil {
  149. return time.Time{}, err
  150. }
  151. exp := time.Time{}
  152. if resp.Node.Expiration != nil {
  153. exp = *resp.Node.Expiration
  154. }
  155. return exp, nil
  156. }
  157. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  158. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  159. value, err := json.Marshal(attrs)
  160. if err != nil {
  161. return time.Time{}, err
  162. }
  163. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  164. PrevIndex: asof,
  165. TTL: ttl,
  166. })
  167. if err != nil {
  168. return time.Time{}, err
  169. }
  170. exp := time.Time{}
  171. if resp.Node.Expiration != nil {
  172. exp = *resp.Node.Expiration
  173. }
  174. return exp, nil
  175. }
  176. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  177. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  178. _, err := esr.client().Delete(ctx, key, nil)
  179. return err
  180. }
  181. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  182. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  183. opts := &etcd.WatcherOptions{
  184. AfterIndex: since,
  185. Recursive: true,
  186. }
  187. e, err := esr.client().Watcher(key, opts).Next(ctx)
  188. if err != nil {
  189. return Event{}, 0, err
  190. }
  191. evt, err := parseSubnetWatchResponse(e)
  192. return evt, e.Node.ModifiedIndex, err
  193. }
  194. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  195. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  196. opts := &etcd.WatcherOptions{
  197. AfterIndex: since,
  198. }
  199. e, err := esr.client().Watcher(key, opts).Next(ctx)
  200. if err != nil {
  201. return Event{}, 0, err
  202. }
  203. evt, err := parseSubnetWatchResponse(e)
  204. return evt, e.Node.ModifiedIndex, err
  205. }
  206. // getNetworks queries etcd to get a list of network names. It returns the
  207. // networks along with the 'as-of' etcd-index that can be used as the starting
  208. // point for etcd watch.
  209. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  210. resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true})
  211. networks := []string{}
  212. if err == nil {
  213. for _, node := range resp.Node.Nodes {
  214. // Look for '/config' on the child nodes
  215. for _, child := range node.Nodes {
  216. netname, isConfig := esr.parseNetworkKey(child.Key)
  217. if isConfig {
  218. networks = append(networks, netname)
  219. }
  220. }
  221. }
  222. return networks, resp.Index, nil
  223. }
  224. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  225. // key not found: treat it as empty set
  226. return networks, etcdErr.Index, nil
  227. }
  228. return nil, 0, err
  229. }
  230. func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  231. key := esr.etcdCfg.Prefix
  232. opts := &etcd.WatcherOptions{
  233. AfterIndex: since,
  234. Recursive: true,
  235. }
  236. e, err := esr.client().Watcher(key, opts).Next(ctx)
  237. if err != nil {
  238. return Event{}, 0, err
  239. }
  240. return esr.parseNetworkWatchResponse(e)
  241. }
  242. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  243. esr.mux.Lock()
  244. defer esr.mux.Unlock()
  245. return esr.cli
  246. }
  247. func (esr *etcdSubnetRegistry) resetClient() {
  248. esr.mux.Lock()
  249. defer esr.mux.Unlock()
  250. var err error
  251. esr.cli, err = newEtcdClient(esr.etcdCfg)
  252. if err != nil {
  253. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  254. }
  255. }
  256. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  257. sn := ParseSubnetKey(resp.Node.Key)
  258. if sn == nil {
  259. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  260. }
  261. switch resp.Action {
  262. case "delete", "expire":
  263. return Event{
  264. EventRemoved,
  265. Lease{Subnet: *sn},
  266. "",
  267. }, nil
  268. default:
  269. attrs := &LeaseAttrs{}
  270. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  271. if err != nil {
  272. return Event{}, err
  273. }
  274. exp := time.Time{}
  275. if resp.Node.Expiration != nil {
  276. exp = *resp.Node.Expiration
  277. }
  278. evt := Event{
  279. EventAdded,
  280. Lease{
  281. Subnet: *sn,
  282. Attrs: *attrs,
  283. Expiration: exp,
  284. },
  285. "",
  286. }
  287. return evt, nil
  288. }
  289. }
  290. func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
  291. index := resp.Node.ModifiedIndex
  292. netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
  293. if netname == "" {
  294. return Event{}, index, errTryAgain
  295. }
  296. evt := Event{}
  297. switch resp.Action {
  298. case "delete":
  299. evt = Event{
  300. EventRemoved,
  301. Lease{},
  302. netname,
  303. }
  304. default:
  305. if !isConfig {
  306. // Ignore non .../<netname>/config keys; tell caller to try again
  307. return Event{}, index, errTryAgain
  308. }
  309. _, err := ParseConfig(resp.Node.Value)
  310. if err != nil {
  311. return Event{}, index, err
  312. }
  313. evt = Event{
  314. EventAdded,
  315. Lease{},
  316. netname,
  317. }
  318. }
  319. return evt, index, nil
  320. }
  321. // Returns network name from config key (eg, /coreos.com/network/foobar/config),
  322. // if the 'config' key isn't present we don't consider the network valid
  323. func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
  324. if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
  325. return parts[1], parts[2] != ""
  326. }
  327. return "", false
  328. }
  329. func nodeToLease(node *etcd.Node) (*Lease, error) {
  330. sn := ParseSubnetKey(node.Key)
  331. if sn == nil {
  332. return nil, fmt.Errorf("failed to parse subnet key %q", *sn)
  333. }
  334. attrs := &LeaseAttrs{}
  335. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  336. return nil, err
  337. }
  338. exp := time.Time{}
  339. if node.Expiration != nil {
  340. exp = *node.Expiration
  341. }
  342. lease := Lease{
  343. Subnet: *sn,
  344. Attrs: *attrs,
  345. Expiration: exp,
  346. asof: node.ModifiedIndex,
  347. }
  348. return &lease, nil
  349. }