registry.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
  24. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
  25. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  26. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. )
  29. var (
  30. subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
  31. errTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context, network string) (string, error)
  35. getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. getNetworks(ctx context.Context) ([]string, uint64, error)
  43. watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
  44. }
  45. type EtcdConfig struct {
  46. Endpoints []string
  47. Keyfile string
  48. Certfile string
  49. CAFile string
  50. Prefix string
  51. }
  52. type etcdSubnetRegistry struct {
  53. mux sync.Mutex
  54. cli etcd.KeysAPI
  55. etcdCfg *EtcdConfig
  56. networkRegex *regexp.Regexp
  57. }
  58. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  59. tlsInfo := transport.TLSInfo{
  60. CertFile: c.Certfile,
  61. KeyFile: c.Keyfile,
  62. CAFile: c.CAFile,
  63. }
  64. t, err := transport.NewTransport(tlsInfo)
  65. if err != nil {
  66. return nil, err
  67. }
  68. cli, err := etcd.New(etcd.Config{
  69. Endpoints: c.Endpoints,
  70. Transport: t,
  71. })
  72. if err != nil {
  73. return nil, err
  74. }
  75. return etcd.NewKeysAPI(cli), nil
  76. }
  77. func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
  78. r := &etcdSubnetRegistry{
  79. etcdCfg: config,
  80. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  81. }
  82. var err error
  83. r.cli, err = newEtcdClient(config)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return r, nil
  88. }
  89. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  90. key := path.Join(esr.etcdCfg.Prefix, network, "config")
  91. resp, err := esr.client().Get(ctx, key, nil)
  92. if err != nil {
  93. return "", err
  94. }
  95. return resp.Node.Value, nil
  96. }
  97. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  98. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  99. // point for etcd watch.
  100. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  101. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  102. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true})
  103. if err != nil {
  104. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  105. // key not found: treat it as empty set
  106. return []Lease{}, etcdErr.Index, nil
  107. }
  108. return nil, 0, err
  109. }
  110. leases := []Lease{}
  111. for _, node := range resp.Node.Nodes {
  112. l, err := nodeToLease(node)
  113. if err != nil {
  114. log.Warningf("Ignoring bad subnet node: %v", err)
  115. continue
  116. }
  117. leases = append(leases, *l)
  118. }
  119. return leases, resp.Index, nil
  120. }
  121. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  122. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  123. resp, err := esr.client().Get(ctx, key, nil)
  124. if err != nil {
  125. return nil, 0, err
  126. }
  127. l, err := nodeToLease(resp.Node)
  128. return l, resp.Index, err
  129. }
  130. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  131. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  132. value, err := json.Marshal(attrs)
  133. if err != nil {
  134. return time.Time{}, err
  135. }
  136. opts := &etcd.SetOptions{
  137. PrevExist: etcd.PrevNoExist,
  138. TTL: ttl,
  139. }
  140. resp, err := esr.client().Set(ctx, key, string(value), opts)
  141. if err != nil {
  142. return time.Time{}, err
  143. }
  144. exp := time.Time{}
  145. if resp.Node.Expiration != nil {
  146. exp = *resp.Node.Expiration
  147. }
  148. return exp, nil
  149. }
  150. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  151. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  152. value, err := json.Marshal(attrs)
  153. if err != nil {
  154. return time.Time{}, err
  155. }
  156. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  157. PrevIndex: asof,
  158. TTL: ttl,
  159. })
  160. if err != nil {
  161. return time.Time{}, err
  162. }
  163. exp := time.Time{}
  164. if resp.Node.Expiration != nil {
  165. exp = *resp.Node.Expiration
  166. }
  167. return exp, nil
  168. }
  169. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  170. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  171. _, err := esr.client().Delete(ctx, key, nil)
  172. return err
  173. }
  174. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  175. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  176. opts := &etcd.WatcherOptions{
  177. AfterIndex: since,
  178. Recursive: true,
  179. }
  180. e, err := esr.client().Watcher(key, opts).Next(ctx)
  181. if err != nil {
  182. return Event{}, 0, err
  183. }
  184. evt, err := parseSubnetWatchResponse(e)
  185. return evt, e.Node.ModifiedIndex, err
  186. }
  187. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  188. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  189. opts := &etcd.WatcherOptions{
  190. AfterIndex: since,
  191. }
  192. e, err := esr.client().Watcher(key, opts).Next(ctx)
  193. if err != nil {
  194. return Event{}, 0, err
  195. }
  196. evt, err := parseSubnetWatchResponse(e)
  197. return evt, e.Node.ModifiedIndex, err
  198. }
  199. // getNetworks queries etcd to get a list of network names. It returns the
  200. // networks along with the 'as-of' etcd-index that can be used as the starting
  201. // point for etcd watch.
  202. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  203. resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true})
  204. networks := []string{}
  205. if err == nil {
  206. for _, node := range resp.Node.Nodes {
  207. // Look for '/config' on the child nodes
  208. for _, child := range node.Nodes {
  209. netname, isConfig := esr.parseNetworkKey(child.Key)
  210. if isConfig {
  211. networks = append(networks, netname)
  212. }
  213. }
  214. }
  215. return networks, resp.Index, nil
  216. }
  217. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  218. // key not found: treat it as empty set
  219. return networks, etcdErr.Index, nil
  220. }
  221. return nil, 0, err
  222. }
  223. func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  224. key := esr.etcdCfg.Prefix
  225. opts := &etcd.WatcherOptions{
  226. AfterIndex: since,
  227. Recursive: true,
  228. }
  229. e, err := esr.client().Watcher(key, opts).Next(ctx)
  230. if err != nil {
  231. return Event{}, 0, err
  232. }
  233. return esr.parseNetworkWatchResponse(e)
  234. }
  235. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  236. esr.mux.Lock()
  237. defer esr.mux.Unlock()
  238. return esr.cli
  239. }
  240. func (esr *etcdSubnetRegistry) resetClient() {
  241. esr.mux.Lock()
  242. defer esr.mux.Unlock()
  243. var err error
  244. esr.cli, err = newEtcdClient(esr.etcdCfg)
  245. if err != nil {
  246. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  247. }
  248. }
  249. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  250. sn := ParseSubnetKey(resp.Node.Key)
  251. if sn == nil {
  252. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  253. }
  254. switch resp.Action {
  255. case "delete", "expire":
  256. return Event{
  257. EventRemoved,
  258. Lease{Subnet: *sn},
  259. "",
  260. }, nil
  261. default:
  262. attrs := &LeaseAttrs{}
  263. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  264. if err != nil {
  265. return Event{}, err
  266. }
  267. exp := time.Time{}
  268. if resp.Node.Expiration != nil {
  269. exp = *resp.Node.Expiration
  270. }
  271. evt := Event{
  272. EventAdded,
  273. Lease{
  274. Subnet: *sn,
  275. Attrs: *attrs,
  276. Expiration: exp,
  277. },
  278. "",
  279. }
  280. return evt, nil
  281. }
  282. }
  283. func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
  284. index := resp.Node.ModifiedIndex
  285. netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
  286. if netname == "" {
  287. return Event{}, index, errTryAgain
  288. }
  289. evt := Event{}
  290. switch resp.Action {
  291. case "delete":
  292. evt = Event{
  293. EventRemoved,
  294. Lease{},
  295. netname,
  296. }
  297. default:
  298. if !isConfig {
  299. // Ignore non .../<netname>/config keys; tell caller to try again
  300. return Event{}, index, errTryAgain
  301. }
  302. _, err := ParseConfig(resp.Node.Value)
  303. if err != nil {
  304. return Event{}, index, err
  305. }
  306. evt = Event{
  307. EventAdded,
  308. Lease{},
  309. netname,
  310. }
  311. }
  312. return evt, index, nil
  313. }
  314. // Returns network name from config key (eg, /coreos.com/network/foobar/config),
  315. // if the 'config' key isn't present we don't consider the network valid
  316. func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
  317. if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
  318. return parts[1], parts[2] != ""
  319. }
  320. return "", false
  321. }
  322. func nodeToLease(node *etcd.Node) (*Lease, error) {
  323. sn := ParseSubnetKey(node.Key)
  324. if sn == nil {
  325. return nil, fmt.Errorf("failed to parse subnet key %q", *sn)
  326. }
  327. attrs := &LeaseAttrs{}
  328. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  329. return nil, err
  330. }
  331. exp := time.Time{}
  332. if node.Expiration != nil {
  333. exp = *node.Expiration
  334. }
  335. lease := Lease{
  336. Subnet: *sn,
  337. Attrs: *attrs,
  338. Expiration: exp,
  339. asof: node.ModifiedIndex,
  340. }
  341. return &lease, nil
  342. }