registry.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdv2
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/etcd/client"
  24. "github.com/coreos/etcd/pkg/transport"
  25. log "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. . "github.com/coreos/flannel/subnet"
  29. )
  30. var (
  31. errTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context, network string) (string, error)
  35. getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. getNetworks(ctx context.Context) ([]string, uint64, error)
  43. watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
  44. }
  45. type EtcdConfig struct {
  46. Endpoints []string
  47. Keyfile string
  48. Certfile string
  49. CAFile string
  50. Prefix string
  51. Username string
  52. Password string
  53. }
  54. type etcdNewFunc func(c *EtcdConfig) (etcd.KeysAPI, error)
  55. type etcdSubnetRegistry struct {
  56. cliNewFunc etcdNewFunc
  57. mux sync.Mutex
  58. cli etcd.KeysAPI
  59. etcdCfg *EtcdConfig
  60. networkRegex *regexp.Regexp
  61. }
  62. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  63. tlsInfo := transport.TLSInfo{
  64. CertFile: c.Certfile,
  65. KeyFile: c.Keyfile,
  66. CAFile: c.CAFile,
  67. }
  68. t, err := transport.NewTransport(tlsInfo, time.Second)
  69. if err != nil {
  70. return nil, err
  71. }
  72. cli, err := etcd.New(etcd.Config{
  73. Endpoints: c.Endpoints,
  74. Transport: t,
  75. Username: c.Username,
  76. Password: c.Password,
  77. })
  78. if err != nil {
  79. return nil, err
  80. }
  81. return etcd.NewKeysAPI(cli), nil
  82. }
  83. func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error) {
  84. r := &etcdSubnetRegistry{
  85. etcdCfg: config,
  86. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  87. }
  88. if cliNewFunc != nil {
  89. r.cliNewFunc = cliNewFunc
  90. } else {
  91. r.cliNewFunc = newEtcdClient
  92. }
  93. var err error
  94. r.cli, err = r.cliNewFunc(config)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return r, nil
  99. }
  100. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  101. key := path.Join(esr.etcdCfg.Prefix, network, "config")
  102. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  103. if err != nil {
  104. return "", err
  105. }
  106. return resp.Node.Value, nil
  107. }
  108. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  109. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  110. // point for etcd watch.
  111. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  112. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  113. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true, Quorum: true})
  114. if err != nil {
  115. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  116. // key not found: treat it as empty set
  117. return []Lease{}, etcdErr.Index, nil
  118. }
  119. return nil, 0, err
  120. }
  121. leases := []Lease{}
  122. for _, node := range resp.Node.Nodes {
  123. l, err := nodeToLease(node)
  124. if err != nil {
  125. log.Warningf("Ignoring bad subnet node: %v", err)
  126. continue
  127. }
  128. leases = append(leases, *l)
  129. }
  130. return leases, resp.Index, nil
  131. }
  132. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  133. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  134. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  135. if err != nil {
  136. return nil, 0, err
  137. }
  138. l, err := nodeToLease(resp.Node)
  139. return l, resp.Index, err
  140. }
  141. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  142. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  143. value, err := json.Marshal(attrs)
  144. if err != nil {
  145. return time.Time{}, err
  146. }
  147. opts := &etcd.SetOptions{
  148. PrevExist: etcd.PrevNoExist,
  149. TTL: ttl,
  150. }
  151. resp, err := esr.client().Set(ctx, key, string(value), opts)
  152. if err != nil {
  153. return time.Time{}, err
  154. }
  155. exp := time.Time{}
  156. if resp.Node.Expiration != nil {
  157. exp = *resp.Node.Expiration
  158. }
  159. return exp, nil
  160. }
  161. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  162. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  163. value, err := json.Marshal(attrs)
  164. if err != nil {
  165. return time.Time{}, err
  166. }
  167. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  168. PrevIndex: asof,
  169. TTL: ttl,
  170. })
  171. if err != nil {
  172. return time.Time{}, err
  173. }
  174. exp := time.Time{}
  175. if resp.Node.Expiration != nil {
  176. exp = *resp.Node.Expiration
  177. }
  178. return exp, nil
  179. }
  180. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  181. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  182. _, err := esr.client().Delete(ctx, key, nil)
  183. return err
  184. }
  185. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  186. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  187. opts := &etcd.WatcherOptions{
  188. AfterIndex: since,
  189. Recursive: true,
  190. }
  191. e, err := esr.client().Watcher(key, opts).Next(ctx)
  192. if err != nil {
  193. return Event{}, 0, err
  194. }
  195. evt, err := parseSubnetWatchResponse(e)
  196. return evt, e.Node.ModifiedIndex, err
  197. }
  198. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  199. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  200. opts := &etcd.WatcherOptions{
  201. AfterIndex: since,
  202. }
  203. e, err := esr.client().Watcher(key, opts).Next(ctx)
  204. if err != nil {
  205. return Event{}, 0, err
  206. }
  207. evt, err := parseSubnetWatchResponse(e)
  208. return evt, e.Node.ModifiedIndex, err
  209. }
  210. // getNetworks queries etcd to get a list of network names. It returns the
  211. // networks along with the 'as-of' etcd-index that can be used as the starting
  212. // point for etcd watch.
  213. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  214. resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true, Quorum: true})
  215. networks := []string{}
  216. if err == nil {
  217. for _, node := range resp.Node.Nodes {
  218. // Look for '/config' on the child nodes
  219. for _, child := range node.Nodes {
  220. netname, isConfig := esr.parseNetworkKey(child.Key)
  221. if isConfig {
  222. networks = append(networks, netname)
  223. }
  224. }
  225. }
  226. return networks, resp.Index, nil
  227. }
  228. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  229. // key not found: treat it as empty set
  230. return networks, etcdErr.Index, nil
  231. }
  232. return nil, 0, err
  233. }
  234. func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  235. key := esr.etcdCfg.Prefix
  236. opts := &etcd.WatcherOptions{
  237. AfterIndex: since,
  238. Recursive: true,
  239. }
  240. e, err := esr.client().Watcher(key, opts).Next(ctx)
  241. if err != nil {
  242. return Event{}, 0, err
  243. }
  244. return esr.parseNetworkWatchResponse(e)
  245. }
  246. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  247. esr.mux.Lock()
  248. defer esr.mux.Unlock()
  249. return esr.cli
  250. }
  251. func (esr *etcdSubnetRegistry) resetClient() {
  252. esr.mux.Lock()
  253. defer esr.mux.Unlock()
  254. var err error
  255. esr.cli, err = newEtcdClient(esr.etcdCfg)
  256. if err != nil {
  257. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  258. }
  259. }
  260. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  261. sn := ParseSubnetKey(resp.Node.Key)
  262. if sn == nil {
  263. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  264. }
  265. switch resp.Action {
  266. case "delete", "expire":
  267. return Event{
  268. EventRemoved,
  269. Lease{Subnet: *sn},
  270. "",
  271. }, nil
  272. default:
  273. attrs := &LeaseAttrs{}
  274. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  275. if err != nil {
  276. return Event{}, err
  277. }
  278. exp := time.Time{}
  279. if resp.Node.Expiration != nil {
  280. exp = *resp.Node.Expiration
  281. }
  282. evt := Event{
  283. EventAdded,
  284. Lease{
  285. Subnet: *sn,
  286. Attrs: *attrs,
  287. Expiration: exp,
  288. },
  289. "",
  290. }
  291. return evt, nil
  292. }
  293. }
  294. func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
  295. index := resp.Node.ModifiedIndex
  296. netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
  297. if netname == "" {
  298. return Event{}, index, errTryAgain
  299. }
  300. var evt Event
  301. switch resp.Action {
  302. case "delete":
  303. evt = Event{
  304. EventRemoved,
  305. Lease{},
  306. netname,
  307. }
  308. default:
  309. if !isConfig {
  310. // Ignore non .../<netname>/config keys; tell caller to try again
  311. return Event{}, index, errTryAgain
  312. }
  313. _, err := ParseConfig(resp.Node.Value)
  314. if err != nil {
  315. return Event{}, index, err
  316. }
  317. evt = Event{
  318. EventAdded,
  319. Lease{},
  320. netname,
  321. }
  322. }
  323. return evt, index, nil
  324. }
  325. // Returns network name from config key (eg, /coreos.com/network/foobar/config),
  326. // if the 'config' key isn't present we don't consider the network valid
  327. func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
  328. if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
  329. return parts[1], parts[2] != ""
  330. }
  331. return "", false
  332. }
  333. func nodeToLease(node *etcd.Node) (*Lease, error) {
  334. sn := ParseSubnetKey(node.Key)
  335. if sn == nil {
  336. return nil, fmt.Errorf("failed to parse subnet key %s", node.Key)
  337. }
  338. attrs := &LeaseAttrs{}
  339. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  340. return nil, err
  341. }
  342. exp := time.Time{}
  343. if node.Expiration != nil {
  344. exp = *node.Expiration
  345. }
  346. lease := Lease{
  347. Subnet: *sn,
  348. Attrs: *attrs,
  349. Expiration: exp,
  350. Asof: node.ModifiedIndex,
  351. }
  352. return &lease, nil
  353. }