registry.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdv2
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "path"
  20. "regexp"
  21. "sync"
  22. "time"
  23. etcd "github.com/coreos/etcd/client"
  24. "github.com/coreos/etcd/pkg/transport"
  25. log "github.com/golang/glog"
  26. "golang.org/x/net/context"
  27. "github.com/coreos/flannel/pkg/ip"
  28. . "github.com/coreos/flannel/subnet"
  29. )
  30. var (
  31. errTryAgain = errors.New("try again")
  32. )
  33. type Registry interface {
  34. getNetworkConfig(ctx context.Context) (string, error)
  35. getSubnets(ctx context.Context) ([]Lease, uint64, error)
  36. getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error)
  37. createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
  38. updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
  39. deleteSubnet(ctx context.Context, sn ip.IP4Net) error
  40. watchSubnets(ctx context.Context, since uint64) (Event, uint64, error)
  41. watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error)
  42. }
  43. type EtcdConfig struct {
  44. Endpoints []string
  45. Keyfile string
  46. Certfile string
  47. CAFile string
  48. Prefix string
  49. Username string
  50. Password string
  51. }
  52. type etcdNewFunc func(c *EtcdConfig) (etcd.KeysAPI, error)
  53. type etcdSubnetRegistry struct {
  54. cliNewFunc etcdNewFunc
  55. mux sync.Mutex
  56. cli etcd.KeysAPI
  57. etcdCfg *EtcdConfig
  58. networkRegex *regexp.Regexp
  59. }
  60. func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
  61. tlsInfo := transport.TLSInfo{
  62. CertFile: c.Certfile,
  63. KeyFile: c.Keyfile,
  64. CAFile: c.CAFile,
  65. }
  66. t, err := transport.NewTransport(tlsInfo, time.Second)
  67. if err != nil {
  68. return nil, err
  69. }
  70. cli, err := etcd.New(etcd.Config{
  71. Endpoints: c.Endpoints,
  72. Transport: t,
  73. Username: c.Username,
  74. Password: c.Password,
  75. })
  76. if err != nil {
  77. return nil, err
  78. }
  79. return etcd.NewKeysAPI(cli), nil
  80. }
  81. func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error) {
  82. r := &etcdSubnetRegistry{
  83. etcdCfg: config,
  84. networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
  85. }
  86. if cliNewFunc != nil {
  87. r.cliNewFunc = cliNewFunc
  88. } else {
  89. r.cliNewFunc = newEtcdClient
  90. }
  91. var err error
  92. r.cli, err = r.cliNewFunc(config)
  93. if err != nil {
  94. return nil, err
  95. }
  96. return r, nil
  97. }
  98. func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context) (string, error) {
  99. key := path.Join(esr.etcdCfg.Prefix, "config")
  100. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  101. if err != nil {
  102. return "", err
  103. }
  104. return resp.Node.Value, nil
  105. }
  106. // getSubnets queries etcd to get a list of currently allocated leases for a given network.
  107. // It returns the leases along with the "as-of" etcd-index that can be used as the starting
  108. // point for etcd watch.
  109. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context) ([]Lease, uint64, error) {
  110. key := path.Join(esr.etcdCfg.Prefix, "subnets")
  111. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true, Quorum: true})
  112. if err != nil {
  113. if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
  114. // key not found: treat it as empty set
  115. return []Lease{}, etcdErr.Index, nil
  116. }
  117. return nil, 0, err
  118. }
  119. leases := []Lease{}
  120. for _, node := range resp.Node.Nodes {
  121. l, err := nodeToLease(node)
  122. if err != nil {
  123. log.Warningf("Ignoring bad subnet node: %v", err)
  124. continue
  125. }
  126. leases = append(leases, *l)
  127. }
  128. return leases, resp.Index, nil
  129. }
  130. func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error) {
  131. key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
  132. resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
  133. if err != nil {
  134. return nil, 0, err
  135. }
  136. l, err := nodeToLease(resp.Node)
  137. return l, resp.Index, err
  138. }
  139. func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
  140. key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
  141. value, err := json.Marshal(attrs)
  142. if err != nil {
  143. return time.Time{}, err
  144. }
  145. opts := &etcd.SetOptions{
  146. PrevExist: etcd.PrevNoExist,
  147. TTL: ttl,
  148. }
  149. resp, err := esr.client().Set(ctx, key, string(value), opts)
  150. if err != nil {
  151. return time.Time{}, err
  152. }
  153. exp := time.Time{}
  154. if resp.Node.Expiration != nil {
  155. exp = *resp.Node.Expiration
  156. }
  157. return exp, nil
  158. }
  159. func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
  160. key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
  161. value, err := json.Marshal(attrs)
  162. if err != nil {
  163. return time.Time{}, err
  164. }
  165. resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{
  166. PrevIndex: asof,
  167. TTL: ttl,
  168. })
  169. if err != nil {
  170. return time.Time{}, err
  171. }
  172. exp := time.Time{}
  173. if resp.Node.Expiration != nil {
  174. exp = *resp.Node.Expiration
  175. }
  176. return exp, nil
  177. }
  178. func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net) error {
  179. key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
  180. _, err := esr.client().Delete(ctx, key, nil)
  181. return err
  182. }
  183. func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, since uint64) (Event, uint64, error) {
  184. key := path.Join(esr.etcdCfg.Prefix, "subnets")
  185. opts := &etcd.WatcherOptions{
  186. AfterIndex: since,
  187. Recursive: true,
  188. }
  189. e, err := esr.client().Watcher(key, opts).Next(ctx)
  190. if err != nil {
  191. return Event{}, 0, err
  192. }
  193. evt, err := parseSubnetWatchResponse(e)
  194. return evt, e.Node.ModifiedIndex, err
  195. }
  196. func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error) {
  197. key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
  198. opts := &etcd.WatcherOptions{
  199. AfterIndex: since,
  200. }
  201. e, err := esr.client().Watcher(key, opts).Next(ctx)
  202. if err != nil {
  203. return Event{}, 0, err
  204. }
  205. evt, err := parseSubnetWatchResponse(e)
  206. return evt, e.Node.ModifiedIndex, err
  207. }
  208. func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
  209. esr.mux.Lock()
  210. defer esr.mux.Unlock()
  211. return esr.cli
  212. }
  213. func (esr *etcdSubnetRegistry) resetClient() {
  214. esr.mux.Lock()
  215. defer esr.mux.Unlock()
  216. var err error
  217. esr.cli, err = newEtcdClient(esr.etcdCfg)
  218. if err != nil {
  219. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  220. }
  221. }
  222. func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
  223. sn := ParseSubnetKey(resp.Node.Key)
  224. if sn == nil {
  225. return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
  226. }
  227. switch resp.Action {
  228. case "delete", "expire":
  229. return Event{
  230. EventRemoved,
  231. Lease{Subnet: *sn},
  232. }, nil
  233. default:
  234. attrs := &LeaseAttrs{}
  235. err := json.Unmarshal([]byte(resp.Node.Value), attrs)
  236. if err != nil {
  237. return Event{}, err
  238. }
  239. exp := time.Time{}
  240. if resp.Node.Expiration != nil {
  241. exp = *resp.Node.Expiration
  242. }
  243. evt := Event{
  244. EventAdded,
  245. Lease{
  246. Subnet: *sn,
  247. Attrs: *attrs,
  248. Expiration: exp,
  249. },
  250. }
  251. return evt, nil
  252. }
  253. }
  254. func (esr *etcdSubnetRegistry) createBackendData(ctx context.Context, network, data string) error {
  255. key := path.Join(esr.etcdCfg.Prefix, network, "backend-data")
  256. _, err := esr.client().Create(ctx, key, data)
  257. if err != nil {
  258. return err
  259. }
  260. return nil
  261. }
  262. func (esr *etcdSubnetRegistry) getBackendData(ctx context.Context, network string) (string, error) {
  263. key := path.Join(esr.etcdCfg.Prefix, network, "backend-data")
  264. etcdResponse, err := esr.client().Get(ctx, key, nil)
  265. if err != nil {
  266. return "", err
  267. }
  268. return etcdResponse.Node.Value, nil
  269. }
  270. func nodeToLease(node *etcd.Node) (*Lease, error) {
  271. sn := ParseSubnetKey(node.Key)
  272. if sn == nil {
  273. return nil, fmt.Errorf("failed to parse subnet key %s", node.Key)
  274. }
  275. attrs := &LeaseAttrs{}
  276. if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
  277. return nil, err
  278. }
  279. exp := time.Time{}
  280. if node.Expiration != nil {
  281. exp = *node.Expiration
  282. }
  283. lease := Lease{
  284. Subnet: *sn,
  285. Attrs: *attrs,
  286. Expiration: exp,
  287. Asof: node.ModifiedIndex,
  288. }
  289. return &lease, nil
  290. }