registry.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/etcd/client"
  24. "github.com/coreos/etcd/pkg/transport"
  25. log "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. )
  29. var (
  30. subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
  31. errTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context, network string) (string, error)
  35. getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. getNetworks(ctx context.Context) ([]string, uint64, error)
  43. watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
  44. createBackendData(ctx context.Context, network, data string) error
  45. getBackendData(ctx context.Context, network string) (string, error)
  46. }
  47. type EtcdConfig struct {
  48. Endpoints []string
  49. Keyfile string
  50. Certfile string
  51. CAFile string
  52. Prefix string
  53. Username string
  54. Password string
  55. }
  56. type etcdNewFunc func(c *EtcdConfig) (etcd.KeysAPI, error)
  57. type etcdSubnetRegistry struct {
  58. cliNewFunc etcdNewFunc
  59. mux sync.Mutex
  60. cli etcd.KeysAPI
  61. etcdCfg *EtcdConfig
  62. networkRegex *regexp.Regexp
  63. }
  64. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  65. tlsInfo := transport.TLSInfo{
  66. CertFile: c.Certfile,
  67. KeyFile: c.Keyfile,
  68. CAFile: c.CAFile,
  69. }
  70. t, err := transport.NewTransport(tlsInfo, time.Second)
  71. if err != nil {
  72. return nil, err
  73. }
  74. cli, err := etcd.New(etcd.Config{
  75. Endpoints: c.Endpoints,
  76. Transport: t,
  77. Username: c.Username,
  78. Password: c.Password,
  79. })
  80. if err != nil {
  81. return nil, err
  82. }
  83. return etcd.NewKeysAPI(cli), nil
  84. }
  85. func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error) {
  86. r := &etcdSubnetRegistry{
  87. etcdCfg: config,
  88. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  89. }
  90. if cliNewFunc != nil {
  91. r.cliNewFunc = cliNewFunc
  92. } else {
  93. r.cliNewFunc = newEtcdClient
  94. }
  95. var err error
  96. r.cli, err = r.cliNewFunc(config)
  97. if err != nil {
  98. return nil, err
  99. }
  100. return r, nil
  101. }
  102. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
  103. key := path.Join(esr.etcdCfg.Prefix, network, "config")
  104. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  105. if err != nil {
  106. return "", err
  107. }
  108. return resp.Node.Value, nil
  109. }
  110. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  111. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  112. // point for etcd watch.
  113. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
  114. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  115. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true, Quorum: true})
  116. if err != nil {
  117. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  118. // key not found: treat it as empty set
  119. return []Lease{}, etcdErr.Index, nil
  120. }
  121. return nil, 0, err
  122. }
  123. leases := []Lease{}
  124. for _, node := range resp.Node.Nodes {
  125. l, err := nodeToLease(node)
  126. if err != nil {
  127. log.Warningf("Ignoring bad subnet node: %v", err)
  128. continue
  129. }
  130. leases = append(leases, *l)
  131. }
  132. return leases, resp.Index, nil
  133. }
  134. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
  135. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  136. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  137. if err != nil {
  138. return nil, 0, err
  139. }
  140. l, err := nodeToLease(resp.Node)
  141. return l, resp.Index, err
  142. }
  143. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  144. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  145. value, err := json.Marshal(attrs)
  146. if err != nil {
  147. return time.Time{}, err
  148. }
  149. opts := &etcd.SetOptions{
  150. PrevExist: etcd.PrevNoExist,
  151. TTL: ttl,
  152. }
  153. resp, err := esr.client().Set(ctx, key, string(value), opts)
  154. if err != nil {
  155. return time.Time{}, err
  156. }
  157. exp := time.Time{}
  158. if resp.Node.Expiration != nil {
  159. exp = *resp.Node.Expiration
  160. }
  161. return exp, nil
  162. }
  163. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  164. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  165. value, err := json.Marshal(attrs)
  166. if err != nil {
  167. return time.Time{}, err
  168. }
  169. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  170. PrevIndex: asof,
  171. TTL: ttl,
  172. })
  173. if err != nil {
  174. return time.Time{}, err
  175. }
  176. exp := time.Time{}
  177. if resp.Node.Expiration != nil {
  178. exp = *resp.Node.Expiration
  179. }
  180. return exp, nil
  181. }
  182. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
  183. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  184. _, err := esr.client().Delete(ctx, key, nil)
  185. return err
  186. }
  187. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
  188. key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
  189. opts := &etcd.WatcherOptions{
  190. AfterIndex: since,
  191. Recursive: true,
  192. }
  193. e, err := esr.client().Watcher(key, opts).Next(ctx)
  194. if err != nil {
  195. return Event{}, 0, err
  196. }
  197. evt, err := parseSubnetWatchResponse(e)
  198. return evt, e.Node.ModifiedIndex, err
  199. }
  200. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  201. key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
  202. opts := &etcd.WatcherOptions{
  203. AfterIndex: since,
  204. }
  205. e, err := esr.client().Watcher(key, opts).Next(ctx)
  206. if err != nil {
  207. return Event{}, 0, err
  208. }
  209. evt, err := parseSubnetWatchResponse(e)
  210. return evt, e.Node.ModifiedIndex, err
  211. }
  212. // getNetworks queries etcd to get a list of network names. It returns the
  213. // networks along with the 'as-of' etcd-index that can be used as the starting
  214. // point for etcd watch.
  215. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
  216. resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true, Quorum: true})
  217. networks := []string{}
  218. if err == nil {
  219. for _, node := range resp.Node.Nodes {
  220. // Look for '/config' on the child nodes
  221. for _, child := range node.Nodes {
  222. netname, isConfig := esr.parseNetworkKey(child.Key)
  223. if isConfig {
  224. networks = append(networks, netname)
  225. }
  226. }
  227. }
  228. return networks, resp.Index, nil
  229. }
  230. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  231. // key not found: treat it as empty set
  232. return networks, etcdErr.Index, nil
  233. }
  234. return nil, 0, err
  235. }
  236. func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
  237. key := esr.etcdCfg.Prefix
  238. opts := &etcd.WatcherOptions{
  239. AfterIndex: since,
  240. Recursive: true,
  241. }
  242. e, err := esr.client().Watcher(key, opts).Next(ctx)
  243. if err != nil {
  244. return Event{}, 0, err
  245. }
  246. return esr.parseNetworkWatchResponse(e)
  247. }
  248. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  249. esr.mux.Lock()
  250. defer esr.mux.Unlock()
  251. return esr.cli
  252. }
  253. func (esr *etcdSubnetRegistry) resetClient() {
  254. esr.mux.Lock()
  255. defer esr.mux.Unlock()
  256. var err error
  257. esr.cli, err = newEtcdClient(esr.etcdCfg)
  258. if err != nil {
  259. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  260. }
  261. }
  262. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  263. sn := ParseSubnetKey(resp.Node.Key)
  264. if sn == nil {
  265. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  266. }
  267. switch resp.Action {
  268. case "delete", "expire":
  269. return Event{
  270. EventRemoved,
  271. Lease{Subnet: *sn},
  272. "",
  273. }, nil
  274. default:
  275. attrs := &LeaseAttrs{}
  276. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  277. if err != nil {
  278. return Event{}, err
  279. }
  280. exp := time.Time{}
  281. if resp.Node.Expiration != nil {
  282. exp = *resp.Node.Expiration
  283. }
  284. evt := Event{
  285. EventAdded,
  286. Lease{
  287. Subnet: *sn,
  288. Attrs: *attrs,
  289. Expiration: exp,
  290. },
  291. "",
  292. }
  293. return evt, nil
  294. }
  295. }
  296. func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
  297. index := resp.Node.ModifiedIndex
  298. netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
  299. if netname == "" {
  300. return Event{}, index, errTryAgain
  301. }
  302. evt := Event{}
  303. switch resp.Action {
  304. case "delete":
  305. evt = Event{
  306. EventRemoved,
  307. Lease{},
  308. netname,
  309. }
  310. default:
  311. if !isConfig {
  312. // Ignore non .../<netname>/config keys; tell caller to try again
  313. return Event{}, index, errTryAgain
  314. }
  315. _, err := ParseConfig(resp.Node.Value)
  316. if err != nil {
  317. return Event{}, index, err
  318. }
  319. evt = Event{
  320. EventAdded,
  321. Lease{},
  322. netname,
  323. }
  324. }
  325. return evt, index, nil
  326. }
  327. // Returns network name from config key (eg, /coreos.com/network/foobar/config),
  328. // if the 'config' key isn't present we don't consider the network valid
  329. func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
  330. if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
  331. return parts[1], parts[2] != ""
  332. }
  333. return "", false
  334. }
  335. func nodeToLease(node *etcd.Node) (*Lease, error) {
  336. sn := ParseSubnetKey(node.Key)
  337. if sn == nil {
  338. return nil, fmt.Errorf("failed to parse subnet key %q", *sn)
  339. }
  340. attrs := &LeaseAttrs{}
  341. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  342. return nil, err
  343. }
  344. exp := time.Time{}
  345. if node.Expiration != nil {
  346. exp = *node.Expiration
  347. }
  348. lease := Lease{
  349. Subnet: *sn,
  350. Attrs: *attrs,
  351. Expiration: exp,
  352. asof: node.ModifiedIndex,
  353. }
  354. return &lease, nil
  355. }
  356. func (esr *etcdSubnetRegistry) createBackendData(ctx context.Context, network, data string) error {
  357. key := path.Join(esr.etcdCfg.Prefix, network, "backend-data")
  358. _, err := esr.client().Create(ctx, key, data)
  359. if err != nil {
  360. return err
  361. }
  362. return nil
  363. }
  364. func (esr *etcdSubnetRegistry) getBackendData(ctx context.Context, network string) (string, error) {
  365. key := path.Join(esr.etcdCfg.Prefix, network, "backend-data")
  366. etcdResponse, err := esr.client().Get(ctx, key, nil)
  367. if err != nil {
  368. return "", err
  369. }
  370. return etcdResponse.Node.Value, nil
  371. }