registry.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package subnet
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  6. log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
  7. )
  8. type subnetRegistry interface {
  9. getConfig() (*etcd.Response, error)
  10. getSubnets() (*etcd.Response, error)
  11. createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  12. updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  13. watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
  14. }
  15. type etcdSubnetRegistry struct {
  16. mux sync.Mutex
  17. cli *etcd.Client
  18. endpoint string
  19. prefix string
  20. }
  21. func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
  22. return &etcdSubnetRegistry{
  23. cli: etcd.NewClient([]string{endpoint}),
  24. endpoint: endpoint,
  25. prefix: prefix,
  26. }
  27. }
  28. func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
  29. resp, err := esr.client().Get(esr.prefix+"/config", false, false)
  30. if err != nil {
  31. return nil, err
  32. }
  33. return resp, nil
  34. }
  35. func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
  36. return esr.client().Get(esr.prefix+"/subnets", false, true)
  37. }
  38. func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  39. resp, err := esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
  40. if err != nil {
  41. return nil, err
  42. }
  43. ensureExpiration(resp, ttl)
  44. return resp, nil
  45. }
  46. func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  47. resp, err := esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
  48. if err != nil {
  49. return nil, err
  50. }
  51. ensureExpiration(resp, ttl)
  52. return resp, nil
  53. }
  54. func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
  55. for {
  56. resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
  57. if err != nil {
  58. if err == etcd.ErrWatchStoppedByUser {
  59. return nil, nil
  60. } else {
  61. return nil, err
  62. }
  63. }
  64. if len(resp.Body) == 0 {
  65. // etcd timed out, go back but recreate the client as the underlying
  66. // http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
  67. esr.resetClient()
  68. continue
  69. }
  70. return resp.Unmarshal()
  71. }
  72. }
  73. func (esr *etcdSubnetRegistry) client() *etcd.Client {
  74. esr.mux.Lock()
  75. defer esr.mux.Unlock()
  76. return esr.cli
  77. }
  78. func (esr *etcdSubnetRegistry) resetClient() {
  79. esr.mux.Lock()
  80. defer esr.mux.Unlock()
  81. esr.cli = etcd.NewClient([]string{esr.endpoint})
  82. }
  83. func ensureExpiration(resp *etcd.Response, ttl uint64) {
  84. if resp.Node.Expiration == nil {
  85. // should not be but calc it ourselves in this case
  86. log.Info("Expiration field missing on etcd response, calculating locally")
  87. exp := time.Now().Add(time.Duration(ttl) * time.Second)
  88. resp.Node.Expiration = &exp
  89. }
  90. }