subnet.go 9.5 KB


  1. package subnet
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "net"
  6. "regexp"
  7. "strconv"
  8. "time"
  9. "github.com/coreos-inc/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  10. log "github.com/coreos-inc/rudder/Godeps/_workspace/src/github.com/golang/glog"
  11. "github.com/coreos-inc/rudder/pkg"
  12. )
  13. const (
  14. registerRetries = 10
  15. subnetTTL = 24 * 3600
  16. renewMargin = time.Hour
  17. )
  18. // etcd error codes
  19. const (
  20. etcdKeyNotFound = 100
  21. etcdKeyAlreadyExists = 105
  22. etcdEventIndexCleared = 401
  23. )
  24. const (
  25. SubnetAdded = iota
  26. SubnetRemoved
  27. )
  28. var (
  29. subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
  30. )
  31. type SubnetLease struct {
  32. Network pkg.IP4Net
  33. Data string
  34. }
  35. type SubnetManager struct {
  36. registry subnetRegistry
  37. config *Config
  38. myLease SubnetLease
  39. leaseExp time.Time
  40. lastIndex uint64
  41. leases []SubnetLease
  42. stop chan bool
  43. }
  44. type EventType int
  45. type Event struct {
  46. Type EventType
  47. Lease SubnetLease
  48. }
  49. type EventBatch []Event
  50. func NewSubnetManager(etcdCli *etcd.Client, prefix string) (*SubnetManager, error) {
  51. esr := etcdSubnetRegistry{etcdCli, prefix}
  52. return newSubnetManager(&esr)
  53. }
  54. func (sm *SubnetManager) AcquireLease(ip pkg.IP4, data string) (pkg.IP4Net, error) {
  55. for i := 0; i < registerRetries; i++ {
  56. var err error
  57. sm.leases, err = sm.getLeases()
  58. if err != nil {
  59. return pkg.IP4Net{}, err
  60. }
  61. // try to reuse a subnet if there's one that match our IP
  62. for _, l := range sm.leases {
  63. var ba BaseAttrs
  64. err = json.Unmarshal([]byte(l.Data), &ba)
  65. if err != nil {
  66. log.Error("Error parsing subnet lease JSON: ", err)
  67. } else {
  68. if ip == ba.PublicIP {
  69. resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL)
  70. if err != nil {
  71. return pkg.IP4Net{}, nil
  72. }
  73. sm.myLease.Network = l.Network
  74. sm.leaseExp = *resp.Node.Expiration
  75. return l.Network, nil
  76. }
  77. }
  78. }
  79. // no existing match, grab a new one
  80. sn, err := sm.allocateSubnet()
  81. if err != nil {
  82. return pkg.IP4Net{}, err
  83. }
  84. resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), data, subnetTTL)
  85. switch {
  86. case err == nil:
  87. sm.myLease.Network = sn
  88. sm.leaseExp = *resp.Node.Expiration
  89. return sn, nil
  90. // if etcd returned Key Already Exists, try again.
  91. case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
  92. continue
  93. default:
  94. return pkg.IP4Net{}, err
  95. }
  96. }
  97. return pkg.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
  98. }
  99. func (sm *SubnetManager) UpdateSubnet(data string) error {
  100. resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), data, subnetTTL)
  101. sm.leaseExp = *resp.Node.Expiration
  102. return err
  103. }
  104. func (sm *SubnetManager) Start(receiver chan EventBatch) {
  105. go sm.watchLeases(receiver)
  106. go sm.leaseRenewer()
  107. }
  108. func (sm *SubnetManager) Stop() {
  109. // once for each goroutine
  110. sm.stop <- true
  111. sm.stop <- true
  112. }
  113. func (sm *SubnetManager) GetConfig() *Config {
  114. return sm.config
  115. }
  116. /// Implementation
  117. func parseSubnetKey(s string) (pkg.IP4Net, error) {
  118. if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
  119. ip := net.ParseIP(parts[1]).To4()
  120. prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
  121. if ip != nil && err == nil {
  122. return pkg.IP4Net{pkg.FromIP(ip), uint(prefixLen)}, nil
  123. }
  124. }
  125. return pkg.IP4Net{}, errors.New("Error parsing IP Subnet")
  126. }
  127. type subnetRegistry interface {
  128. getConfig() (*etcd.Response, error)
  129. getSubnets() (*etcd.Response, error)
  130. createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  131. updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
  132. watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
  133. }
  134. type etcdSubnetRegistry struct {
  135. cli *etcd.Client
  136. prefix string
  137. }
  138. func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
  139. resp, err := esr.cli.Get(esr.prefix+"/config", false, false)
  140. if err != nil {
  141. return nil, err
  142. }
  143. return resp, nil
  144. }
  145. func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
  146. return esr.cli.Get(esr.prefix+"/subnets", false, true)
  147. }
  148. func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  149. return esr.cli.Create(esr.prefix+"/subnets/"+sn, data, ttl)
  150. }
  151. func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
  152. return esr.cli.Set(esr.prefix+"/subnets/"+sn, data, ttl)
  153. }
  154. func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
  155. return esr.cli.Watch(esr.prefix+"/subnets", since, true, nil, stop)
  156. }
  157. func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
  158. cfgResp, err := r.getConfig()
  159. if err != nil {
  160. return nil, err
  161. }
  162. cfg, err := ParseConfig(cfgResp.Node.Value)
  163. if err != nil {
  164. return nil, err
  165. }
  166. return &SubnetManager{
  167. registry: r,
  168. config: cfg,
  169. stop: make(chan bool, 2),
  170. }, nil
  171. }
  172. func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
  173. resp, err := sm.registry.getSubnets()
  174. var leases []SubnetLease
  175. switch {
  176. case err == nil:
  177. for _, node := range resp.Node.Nodes {
  178. sn, err := parseSubnetKey(node.Key)
  179. if err == nil {
  180. lease := SubnetLease{sn, node.Value}
  181. leases = append(leases, lease)
  182. }
  183. }
  184. sm.lastIndex = resp.EtcdIndex
  185. case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
  186. // key not found: treat it as empty set
  187. sm.lastIndex = err.(*etcd.EtcdError).Index
  188. default:
  189. return nil, err
  190. }
  191. return leases, nil
  192. }
  193. func deleteLease(l []SubnetLease, i int) []SubnetLease {
  194. l[i], l = l[len(l)-1], l[:len(l)-1]
  195. return l
  196. }
  197. func (sm *SubnetManager) applyLeases(newLeases []SubnetLease) EventBatch {
  198. var batch EventBatch
  199. for _, l := range newLeases {
  200. // skip self
  201. if l.Network.Equal(sm.myLease.Network) {
  202. continue
  203. }
  204. found := false
  205. for i, c := range sm.leases {
  206. if c.Network.Equal(l.Network) {
  207. sm.leases = deleteLease(sm.leases, i)
  208. found = true
  209. break
  210. }
  211. }
  212. if !found {
  213. // new subnet
  214. batch = append(batch, Event{SubnetAdded, l})
  215. }
  216. }
  217. // everything left in sm.leases has been deleted
  218. for _, c := range sm.leases {
  219. batch = append(batch, Event{SubnetRemoved, c})
  220. }
  221. sm.leases = newLeases
  222. return batch
  223. }
  224. func (sm *SubnetManager) applySubnetChange(action string, ipn pkg.IP4Net, data string) Event {
  225. switch action {
  226. case "delete", "expire":
  227. for i, l := range sm.leases {
  228. if l.Network.Equal(ipn) {
  229. deleteLease(sm.leases, i)
  230. return Event{SubnetRemoved, l}
  231. }
  232. }
  233. log.Errorf("Removed subnet (%s) was not found", ipn)
  234. return Event{
  235. SubnetRemoved,
  236. SubnetLease{ipn, ""},
  237. }
  238. default:
  239. for i, l := range sm.leases {
  240. if l.Network.Equal(ipn) {
  241. sm.leases[i] = SubnetLease{ipn, data}
  242. return Event{SubnetAdded, sm.leases[i]}
  243. }
  244. }
  245. sm.leases = append(sm.leases, SubnetLease{ipn, data})
  246. return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}
  247. }
  248. }
  249. type BaseAttrs struct {
  250. PublicIP pkg.IP4
  251. }
  252. func (sm *SubnetManager) allocateSubnet() (pkg.IP4Net, error) {
  253. log.Infof("Picking subnet in range %s ... %s", sm.config.SubnetMin, sm.config.SubnetMax)
  254. var bag []pkg.IP4
  255. sn := pkg.IP4Net{sm.config.SubnetMin, sm.config.SubnetLen}
  256. OuterLoop:
  257. for ; sn.IP <= sm.config.SubnetMax && len(bag) < 100; sn = sn.Next() {
  258. for _, l := range sm.leases {
  259. if sn.Overlaps(l.Network) {
  260. continue OuterLoop
  261. }
  262. }
  263. bag = append(bag, sn.IP)
  264. }
  265. if len(bag) == 0 {
  266. return pkg.IP4Net{}, errors.New("out of subnets")
  267. } else {
  268. i := pkg.RandInt(0, len(bag))
  269. return pkg.IP4Net{bag[i], sm.config.SubnetLen}, nil
  270. }
  271. }
  272. func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
  273. // "catch up" by replaying all the leases we discovered during
  274. // AcquireLease
  275. var batch EventBatch
  276. for _, l := range sm.leases {
  277. if !sm.myLease.Network.Equal(l.Network) {
  278. batch = append(batch, Event{SubnetAdded, l})
  279. }
  280. }
  281. if len(batch) > 0 {
  282. receiver <- batch
  283. }
  284. for {
  285. resp, err := sm.registry.watchSubnets(sm.lastIndex+1, sm.stop)
  286. if err == nil {
  287. if resp == nil {
  288. // watchSubnets exited by stop chan being signaled
  289. return
  290. }
  291. sm.lastIndex = resp.EtcdIndex
  292. sn, err := parseSubnetKey(resp.Node.Key)
  293. if err != nil {
  294. log.Error("Error parsing subnet IP: ", resp.Node.Key)
  295. time.Sleep(time.Second)
  296. continue
  297. }
  298. // Don't process our own changes
  299. if !sm.myLease.Network.Equal(sn) {
  300. evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
  301. receiver <- EventBatch{evt}
  302. }
  303. } else if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == etcdEventIndexCleared {
  304. // etcd maintains a history window for events and it's possible to fall behind.
  305. // to recover, get the current state and then "diff" against our cache to generate
  306. // events for the caller
  307. log.Warning("Watch of subnet leases failed b/c index outside history window")
  308. leases, err := sm.getLeases()
  309. if err != nil {
  310. log.Errorf("Failed to retrieve subnet leases: ", err)
  311. time.Sleep(time.Second)
  312. continue
  313. }
  314. batch = sm.applyLeases(leases)
  315. receiver <- batch
  316. } else {
  317. log.Error("Watch of subnet leases failed: ", err)
  318. continue
  319. }
  320. }
  321. }
  322. func (sm *SubnetManager) leaseRenewer() {
  323. dur := sm.leaseExp.Sub(time.Now()) - renewMargin
  324. for {
  325. select {
  326. case <-time.After(dur):
  327. resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), sm.myLease.Data, subnetTTL)
  328. if err != nil {
  329. log.Error("Error renewing lease (trying again in 1 min): ", err)
  330. dur = time.Minute
  331. continue
  332. }
  333. sm.leaseExp = *(resp.Node.Expiration)
  334. log.Info("Lease renewed, new expiration: ", sm.leaseExp)
  335. dur = sm.leaseExp.Sub(time.Now()) - renewMargin
  336. case <-sm.stop:
  337. return
  338. }
  339. }
  340. }