registry.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package subnet
  2. import (
  3. "fmt"
  4. "path"
  5. "sync"
  6. "time"
  7. "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  8. log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
  9. )
  10. type subnetRegistry interface {
  11. getConfig() (*etcd.Response, error)
  12. getSubnets() (*etcd.Response, error)
  13. createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  14. updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  15. watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
  16. }
  17. type EtcdConfig struct {
  18. Endpoints []string
  19. Keyfile string
  20. Certfile string
  21. CAFile string
  22. Prefix string
  23. }
  24. type etcdSubnetRegistry struct {
  25. mux sync.Mutex
  26. cli *etcd.Client
  27. etcdCfg *EtcdConfig
  28. }
  29. func newEtcdClient(c *EtcdConfig) (*etcd.Client, error) {
  30. if c.Keyfile != "" || c.Certfile != "" || c.CAFile != "" {
  31. return etcd.NewTLSClient(c.Endpoints, c.Certfile, c.Keyfile, c.CAFile)
  32. } else {
  33. return etcd.NewClient(c.Endpoints), nil
  34. }
  35. }
  36. func newEtcdSubnetRegistry(config *EtcdConfig) (subnetRegistry, error) {
  37. r := &etcdSubnetRegistry{
  38. etcdCfg: config,
  39. }
  40. var err error
  41. r.cli, err = newEtcdClient(config)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return r, nil
  46. }
  47. func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
  48. key := path.Join(esr.etcdCfg.Prefix, "config")
  49. resp, err := esr.client().Get(key, false, false)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return resp, nil
  54. }
  55. func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
  56. key := path.Join(esr.etcdCfg.Prefix, "subnets")
  57. return esr.client().Get(key, false, true)
  58. }
  59. func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  60. key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
  61. resp, err := esr.client().Create(key, data, ttl)
  62. if err != nil {
  63. return nil, err
  64. }
  65. ensureExpiration(resp, ttl)
  66. return resp, nil
  67. }
  68. func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  69. key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
  70. resp, err := esr.client().Set(key, data, ttl)
  71. if err != nil {
  72. return nil, err
  73. }
  74. ensureExpiration(resp, ttl)
  75. return resp, nil
  76. }
  77. func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
  78. for {
  79. key := path.Join(esr.etcdCfg.Prefix, "subnets")
  80. resp, err := esr.client().RawWatch(key, since, true, nil, stop)
  81. if err != nil {
  82. if err == etcd.ErrWatchStoppedByUser {
  83. return nil, nil
  84. } else {
  85. return nil, err
  86. }
  87. }
  88. if len(resp.Body) == 0 {
  89. // etcd timed out, go back but recreate the client as the underlying
  90. // http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
  91. esr.resetClient()
  92. continue
  93. }
  94. return resp.Unmarshal()
  95. }
  96. }
  97. func (esr *etcdSubnetRegistry) client() *etcd.Client {
  98. esr.mux.Lock()
  99. defer esr.mux.Unlock()
  100. return esr.cli
  101. }
  102. func (esr *etcdSubnetRegistry) resetClient() {
  103. esr.mux.Lock()
  104. defer esr.mux.Unlock()
  105. var err error
  106. esr.cli, err = newEtcdClient(esr.etcdCfg)
  107. if err != nil {
  108. panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
  109. }
  110. }
  111. func ensureExpiration(resp *etcd.Response, ttl uint64) {
  112. if resp.Node.Expiration == nil {
  113. // should not be but calc it ourselves in this case
  114. log.Info("Expiration field missing on etcd response, calculating locally")
  115. exp := time.Now().Add(time.Duration(ttl) * time.Second)
  116. resp.Node.Expiration = &exp
  117. }
  118. }