registry.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package subnet
  2. import (
  3. "sync"
  4. "github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  5. )
  6. type subnetRegistry interface {
  7. getConfig() (*etcd.Response, error)
  8. getSubnets() (*etcd.Response, error)
  9. createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  10. updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  11. watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
  12. }
  13. type etcdSubnetRegistry struct {
  14. mux sync.Mutex
  15. cli *etcd.Client
  16. endpoint string
  17. prefix string
  18. }
  19. func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
  20. return &etcdSubnetRegistry{
  21. cli: etcd.NewClient([]string{endpoint}),
  22. endpoint: endpoint,
  23. prefix: prefix,
  24. }
  25. }
  26. func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
  27. resp, err := esr.client().Get(esr.prefix+"/config", false, false)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return resp, nil
  32. }
  33. func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
  34. return esr.client().Get(esr.prefix+"/subnets", false, true)
  35. }
  36. func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  37. return esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
  38. }
  39. func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  40. return esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
  41. }
  42. func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
  43. for {
  44. resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
  45. if err != nil {
  46. if err == etcd.ErrWatchStoppedByUser {
  47. return nil, nil
  48. } else {
  49. return nil, err
  50. }
  51. }
  52. if len(resp.Body) == 0 {
  53. // etcd timed out, go back but recreate the client as the underlying
  54. // http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
  55. esr.resetClient()
  56. continue
  57. }
  58. return resp.Unmarshal()
  59. }
  60. }
  61. func (esr *etcdSubnetRegistry) client() *etcd.Client {
  62. esr.mux.Lock()
  63. defer esr.mux.Unlock()
  64. return esr.cli
  65. }
  66. func (esr *etcdSubnetRegistry) resetClient() {
  67. esr.mux.Lock()
  68. defer esr.mux.Unlock()
  69. esr.cli = etcd.NewClient([]string{esr.endpoint})
  70. }