registry.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
  24. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
  25. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  26. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. )
  29. var (
  30. subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
  31. ErrTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context, network string) (string, error)
  35. getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. getNetworks(ctx context.Context) ([]string, uint64, error)
  43. watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
  44. }
  45. type EtcdConfig struct {
  46. Endpoints []string
  47. Keyfile string
  48. Certfile string
  49. CAFile string
  50. Prefix string
  51. }
  52. type etcdSubnetRegistry struct {
  53. mux sync.Mutex
  54. cli etcd.KeysAPI
  55. etcdCfg *EtcdConfig
  56. networkRegex *regexp.Regexp
  57. }
  58. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  59. tlsInfo := transport.TLSInfo{
  60. CertFile: c.Certfile,
  61. KeyFile: c.Keyfile,
  62. CAFile: c.CAFile,
  63. }
  64. t, err := transport.NewTransport(tlsInfo)
  65. if err != nil {
  66. return nil, err
  67. }
  68. cli, err := etcd.New(etcd.Config{
  69. Endpoints: c.Endpoints,
  70. Transport: t,
  71. })
  72. if err != nil {
  73. return nil, err
  74. }
  75. return etcd.NewKeysAPI(cli), nil
  76. }
  77. func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
  78. r := &etcdSubnetRegistry{
  79. etcdCfg: config,
  80. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  81. }
  82. var err error
  83. r.cli, err = newEtcdClient(config)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return r, nil
  88. }
  89. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  90. key := path.Join(esr.etcdCfg.Prefix, network, "config")
  91. resp, err := esr.client().Get(ctx, key, nil)
  92. if err != nil {
  93. return "", err
  94. }
  95. return resp.Node.Value, nil
  96. }
  97. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  98. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  99. // point for etcd watch.
  100. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  101. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  102. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true})
  103. if err != nil {
  104. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  105. // key not found: treat it as empty set
  106. return []Lease{}, etcdErr.Index, nil
  107. }
  108. return nil, 0, err
  109. }
  110. leases := []Lease{}
  111. for _, node := range resp.Node.Nodes {
  112. l, err := nodeToLease(node)
  113. if err != nil {
  114. log.Warningf("Ignoring bad subnet node: %v", err)
  115. continue
  116. }
  117. leases = append(leases, *l)
  118. }
  119. return leases, resp.Index, nil
  120. }
  121. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  122. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  123. resp, err := esr.client().Get(ctx, key, nil)
  124. if err != nil {
  125. return nil, 0, err
  126. }
  127. l, err := nodeToLease(resp.Node)
  128. return l, resp.Index, err
  129. }
  130. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  131. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  132. value, err := json.Marshal(attrs)
  133. if err != nil {
  134. return time.Time{}, err
  135. }
  136. opts := &etcd.SetOptions{
  137. PrevExist: etcd.PrevNoExist,
  138. TTL: ttl,
  139. }
  140. resp, err := esr.client().Set(ctx, key, string(value), opts)
  141. if err != nil {
  142. return time.Time{}, err
  143. }
  144. ensureExpiration(resp, ttl)
  145. return *resp.Node.Expiration, nil
  146. }
  147. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  148. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  149. value, err := json.Marshal(attrs)
  150. if err != nil {
  151. return time.Time{}, err
  152. }
  153. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  154. PrevIndex: asof,
  155. TTL: ttl,
  156. })
  157. if err != nil {
  158. return time.Time{}, err
  159. }
  160. ensureExpiration(resp, ttl)
  161. return *resp.Node.Expiration, nil
  162. }
  163. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  164. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  165. _, err := esr.client().Delete(ctx, key, nil)
  166. return err
  167. }
  168. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  169. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  170. opts := &etcd.WatcherOptions{
  171. AfterIndex: since,
  172. Recursive: true,
  173. }
  174. e, err := esr.client().Watcher(key, opts).Next(ctx)
  175. if err != nil {
  176. return Event{}, 0, err
  177. }
  178. evt, err := parseSubnetWatchResponse(e)
  179. return evt, e.Node.ModifiedIndex, err
  180. }
  181. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  182. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  183. opts := &etcd.WatcherOptions{
  184. AfterIndex: since,
  185. }
  186. e, err := esr.client().Watcher(key, opts).Next(ctx)
  187. if err != nil {
  188. return Event{}, 0, err
  189. }
  190. evt, err := parseSubnetWatchResponse(e)
  191. return evt, e.Index, err
  192. }
  193. // getNetworks queries etcd to get a list of network names. It returns the
  194. // networks along with the 'as-of' etcd-index that can be used as the starting
  195. // point for etcd watch.
  196. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  197. resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true})
  198. networks := []string{}
  199. if err == nil {
  200. for _, node := range resp.Node.Nodes {
  201. // Look for '/config' on the child nodes
  202. for _, child := range node.Nodes {
  203. netname, isConfig := esr.parseNetworkKey(child.Key)
  204. if isConfig {
  205. networks = append(networks, netname)
  206. }
  207. }
  208. }
  209. return networks, resp.Index, nil
  210. }
  211. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  212. // key not found: treat it as empty set
  213. return networks, etcdErr.Index, nil
  214. }
  215. return nil, 0, err
  216. }
  217. func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  218. key := esr.etcdCfg.Prefix
  219. opts := &etcd.WatcherOptions{
  220. AfterIndex: since,
  221. Recursive: true,
  222. }
  223. e, err := esr.client().Watcher(key, opts).Next(ctx)
  224. if err != nil {
  225. return Event{}, 0, err
  226. }
  227. return esr.parseNetworkWatchResponse(e)
  228. }
  229. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  230. esr.mux.Lock()
  231. defer esr.mux.Unlock()
  232. return esr.cli
  233. }
  234. func (esr *etcdSubnetRegistry) resetClient() {
  235. esr.mux.Lock()
  236. defer esr.mux.Unlock()
  237. var err error
  238. esr.cli, err = newEtcdClient(esr.etcdCfg)
  239. if err != nil {
  240. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  241. }
  242. }
  243. func ensureExpiration(resp *etcd.Response, ttl time.Duration) {
  244. if resp.Node.Expiration == nil {
  245. // should not be but calc it ourselves in this case
  246. log.Info("Expiration field missing on etcd response, calculating locally")
  247. exp := clock.Now().Add(time.Duration(ttl) * time.Second)
  248. resp.Node.Expiration = &exp
  249. }
  250. }
  251. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  252. sn := ParseSubnetKey(resp.Node.Key)
  253. if sn == nil {
  254. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  255. }
  256. switch resp.Action {
  257. case "delete", "expire":
  258. return Event{
  259. EventRemoved,
  260. Lease{Subnet: *sn},
  261. "",
  262. }, nil
  263. default:
  264. attrs := &LeaseAttrs{}
  265. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  266. if err != nil {
  267. return Event{}, err
  268. }
  269. exp := time.Time{}
  270. if resp.Node.Expiration != nil {
  271. exp = *resp.Node.Expiration
  272. }
  273. evt := Event{
  274. EventAdded,
  275. Lease{
  276. Subnet: *sn,
  277. Attrs: *attrs,
  278. Expiration: exp,
  279. },
  280. "",
  281. }
  282. return evt, nil
  283. }
  284. }
  285. func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
  286. index := resp.Node.ModifiedIndex
  287. netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
  288. if netname == "" {
  289. return Event{}, index, ErrTryAgain
  290. }
  291. evt := Event{}
  292. switch resp.Action {
  293. case "delete":
  294. evt = Event{
  295. EventRemoved,
  296. Lease{},
  297. netname,
  298. }
  299. default:
  300. if !isConfig {
  301. // Ignore non .../<netname>/config keys; tell caller to try again
  302. return Event{}, index, ErrTryAgain
  303. }
  304. _, err := ParseConfig(resp.Node.Value)
  305. if err != nil {
  306. return Event{}, index, err
  307. }
  308. evt = Event{
  309. EventAdded,
  310. Lease{},
  311. netname,
  312. }
  313. }
  314. return evt, index, nil
  315. }
  316. // Returns network name from config key (eg, /coreos.com/network/foobar/config),
  317. // if the 'config' key isn't present we don't consider the network valid
  318. func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
  319. if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
  320. return parts[1], parts[2] != ""
  321. }
  322. return "", false
  323. }
  324. func nodeToLease(node *etcd.Node) (*Lease, error) {
  325. sn := ParseSubnetKey(node.Key)
  326. if sn == nil {
  327. return nil, fmt.Errorf("failed to parse subnet key %q", *sn)
  328. }
  329. attrs := &LeaseAttrs{}
  330. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  331. return nil, err
  332. }
  333. exp := time.Time{}
  334. if node.Expiration != nil {
  335. exp = *node.Expiration
  336. }
  337. lease := Lease{
  338. Subnet: *sn,
  339. Attrs: *attrs,
  340. Expiration: exp,
  341. asof: node.ModifiedIndex,
  342. }
  343. return &lease, nil
  344. }