registry.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package subnet
  2. import (
  3. "sync"
  4. "github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  5. )
  6. type subnetRegistry interface {
  7. getConfig() (*etcd.Response, error)
  8. getSubnets() (*etcd.Response, error)
  9. createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  10. updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  11. watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
  12. }
  13. type etcdSubnetRegistry struct {
  14. mux sync.Mutex
  15. cli *etcd.Client
  16. endpoint string
  17. prefix string
  18. }
  19. func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
  20. return &etcdSubnetRegistry{
  21. cli: etcd.NewClient([]string{endpoint}),
  22. endpoint: endpoint,
  23. prefix: prefix,
  24. }
  25. }
  26. func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
  27. resp, err := esr.client().Get(esr.prefix+"/config", false, false)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return resp, nil
  32. }
  33. func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
  34. return esr.client().Get(esr.prefix+"/subnets", false, true)
  35. }
  36. func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  37. return esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
  38. }
  39. func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  40. return esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
  41. }
  42. func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
  43. for {
  44. resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
  45. if err != nil {
  46. return nil, err
  47. }
  48. if len(resp.Body) == 0 {
  49. // etcd timed out, go back but recreate the client as the underlying
  50. // http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
  51. esr.resetClient()
  52. continue
  53. }
  54. return resp.Unmarshal()
  55. }
  56. }
  57. func (esr *etcdSubnetRegistry) client() *etcd.Client {
  58. esr.mux.Lock()
  59. defer esr.mux.Unlock()
  60. return esr.cli
  61. }
  62. func (esr *etcdSubnetRegistry) resetClient() {
  63. esr.mux.Lock()
  64. defer esr.mux.Unlock()
  65. esr.cli = etcd.NewClient([]string{esr.endpoint})
  66. }