mock_etcd.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "fmt"
  17. "strings"
  18. "time"
  19. etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
  20. "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
  21. )
  22. const DEFAULT_TTL time.Duration = 8760 * time.Hour // one year
  23. type mockEtcd struct {
  24. nodes map[string]*etcd.Node
  25. watchers map[*watcher]*watcher
  26. // A given number of past events must be available for watchers, because
  27. // flannel always uses a new watcher instead of re-using old ones, and
  28. // the new watcher's index may be slightly in the past
  29. events []*etcd.Response
  30. index uint64
  31. }
  32. func newMockEtcd() *mockEtcd {
  33. me := &mockEtcd{
  34. index: 1000,
  35. nodes: make(map[string]*etcd.Node),
  36. watchers: make(map[*watcher]*watcher),
  37. events: make([]*etcd.Response, 0, 50),
  38. }
  39. me.nodes["/"] = me.newNode("/", "", true)
  40. return me
  41. }
  42. func (me *mockEtcd) newNode(key, value string, dir bool) *etcd.Node {
  43. exp := time.Now().Add(DEFAULT_TTL)
  44. if dir {
  45. value = ""
  46. }
  47. return &etcd.Node{
  48. Key: key,
  49. Value: value,
  50. CreatedIndex: me.index,
  51. ModifiedIndex: me.index,
  52. Dir: dir,
  53. Expiration: &exp,
  54. Nodes: make([]*etcd.Node, 0, 20)}
  55. }
  56. func (me *mockEtcd) newError(code int, format string, args ...interface{}) etcd.Error {
  57. msg := fmt.Sprintf(format, args...)
  58. return etcd.Error{
  59. Code: code,
  60. Message: msg,
  61. Cause: "",
  62. Index: me.index,
  63. }
  64. }
  65. func (me *mockEtcd) getKeyPath(key string) ([]string, error) {
  66. if !strings.HasPrefix(key, "/") {
  67. return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
  68. }
  69. // Build up a list of each intermediate key's path
  70. path := []string{""}
  71. for i, p := range strings.Split(strings.Trim(key, "/"), "/") {
  72. if p == "" {
  73. return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
  74. }
  75. path = append(path, fmt.Sprintf("%s/%s", path[i], p))
  76. }
  77. return path[1:], nil
  78. }
  79. // Returns the node and its parent respectively. Returns a nil node (but not
  80. // an error) if the requested node doest not exist.
  81. func (me *mockEtcd) findNode(key string) (*etcd.Node, *etcd.Node, error) {
  82. if key == "/" {
  83. return me.nodes["/"], nil, nil
  84. }
  85. path, err := me.getKeyPath(key)
  86. if err != nil {
  87. return nil, nil, err
  88. }
  89. var node *etcd.Node
  90. var parent *etcd.Node
  91. var ok bool
  92. for i, part := range path {
  93. parent = node
  94. node, ok = me.nodes[part]
  95. if !ok {
  96. return nil, nil, nil
  97. }
  98. // intermediates must be directories
  99. if i < len(path)-1 && node.Dir != true {
  100. return nil, nil, me.newError(etcd.ErrorCodeNotDir, "Intermediate node %s not a directory", part)
  101. }
  102. }
  103. return node, parent, nil
  104. }
  105. // Returns whether @child is a child of @node, and whether it is an immediate child respsectively
  106. func isChild(node *etcd.Node, child *etcd.Node) (bool, bool) {
  107. if !strings.HasPrefix(child.Key, fmt.Sprintf("%s/", node.Key)) {
  108. return false, false
  109. }
  110. nodeParts := strings.Split(node.Key, "/")
  111. childParts := strings.Split(child.Key, "/")
  112. return true, len(childParts) == len(nodeParts)+1
  113. }
  114. func (me *mockEtcd) copyNode(node *etcd.Node, recursive bool) *etcd.Node {
  115. n := *node
  116. n.Nodes = make([]*etcd.Node, 0)
  117. if recursive {
  118. for _, child := range me.nodes {
  119. if _, directChild := isChild(node, child); directChild {
  120. n.Nodes = append(n.Nodes, me.copyNode(child, true))
  121. }
  122. }
  123. }
  124. return &n
  125. }
  126. func (me *mockEtcd) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
  127. node, _, err := me.findNode(key)
  128. if err != nil {
  129. return nil, err
  130. }
  131. if node == nil {
  132. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  133. }
  134. if opts == nil {
  135. opts = &etcd.GetOptions{}
  136. }
  137. return &etcd.Response{
  138. Action: "get",
  139. Node: me.copyNode(node, opts.Recursive),
  140. Index: me.index,
  141. }, nil
  142. }
  143. func (me *mockEtcd) sendEvent(resp *etcd.Response) {
  144. // Add to history log
  145. if len(me.events) == cap(me.events) {
  146. me.events = me.events[1:]
  147. }
  148. me.events = append(me.events, resp)
  149. // and notify watchers
  150. for w := range me.watchers {
  151. w.notifyEvent(resp)
  152. }
  153. }
  154. // Returns the node created and its creation response
  155. // Don't need to check for intermediate directories here as that was already done
  156. // by the thing calling makeNode()
  157. func (me *mockEtcd) makeNode(path []string, value string, isDir bool, ttl time.Duration) (*etcd.Node, *etcd.Response, error) {
  158. var child *etcd.Node
  159. var resp *etcd.Response
  160. var ok bool
  161. node := me.nodes["/"]
  162. for i, part := range path {
  163. node, ok = me.nodes[part]
  164. if !ok {
  165. me.index += 1
  166. if i < len(path)-1 {
  167. // intermediate node
  168. child = me.newNode(part, "", true)
  169. } else {
  170. // Final node
  171. exp := time.Now().Add(ttl)
  172. child = me.newNode(part, value, isDir)
  173. child.Expiration = &exp
  174. resp = &etcd.Response{
  175. Action: "create",
  176. Node: me.copyNode(child, false),
  177. Index: child.CreatedIndex,
  178. }
  179. me.sendEvent(resp)
  180. }
  181. me.nodes[child.Key] = child
  182. node = child
  183. }
  184. }
  185. return node, resp, nil
  186. }
  187. func (me *mockEtcd) set(ctx context.Context, key, value string, opts *etcd.SetOptions, action string) (*etcd.Response, error) {
  188. node, _, err := me.findNode(key)
  189. if err != nil {
  190. return nil, err
  191. }
  192. if opts.PrevExist == etcd.PrevExist && node == nil {
  193. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  194. } else if opts.PrevExist == etcd.PrevNoExist && node != nil {
  195. return nil, me.newError(etcd.ErrorCodeNodeExist, "Key %s already exists", key)
  196. }
  197. if opts.Dir {
  198. value = ""
  199. }
  200. var resp *etcd.Response
  201. if node != nil {
  202. if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
  203. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
  204. }
  205. if opts.Dir != node.Dir {
  206. if opts.Dir == true {
  207. return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
  208. } else {
  209. return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
  210. }
  211. }
  212. if opts.PrevValue != "" && opts.PrevValue != node.Value {
  213. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
  214. }
  215. prevNode := me.copyNode(node, false)
  216. node.Value = value
  217. me.index += 1
  218. node.ModifiedIndex = me.index
  219. if opts.TTL > 0 {
  220. exp := time.Now().Add(opts.TTL)
  221. node.Expiration = &exp
  222. }
  223. resp = &etcd.Response{
  224. Action: action,
  225. Node: me.copyNode(node, false),
  226. PrevNode: prevNode,
  227. Index: me.index,
  228. }
  229. me.sendEvent(resp)
  230. } else {
  231. // Create the node and its parents
  232. path, err := me.getKeyPath(key)
  233. if err != nil {
  234. return nil, err
  235. }
  236. _, resp, err = me.makeNode(path, value, opts.Dir, opts.TTL)
  237. if err != nil {
  238. return nil, err
  239. }
  240. }
  241. return resp, nil
  242. }
  243. func (me *mockEtcd) Set(ctx context.Context, key, value string, opts *etcd.SetOptions) (*etcd.Response, error) {
  244. return me.set(ctx, key, value, opts, "set")
  245. }
  246. // Removes a node and all children
  247. func (me *mockEtcd) deleteNode(node *etcd.Node, parent *etcd.Node, recursive bool) (*etcd.Response, error) {
  248. for _, child := range me.nodes {
  249. if isChild, directChild := isChild(node, child); isChild {
  250. if recursive == false {
  251. return nil, me.newError(etcd.ErrorCodeDirNotEmpty, "Key %s not empty", node.Key)
  252. }
  253. if directChild {
  254. me.deleteNode(child, node, true)
  255. me.index += 1
  256. node.ModifiedIndex = me.index
  257. }
  258. }
  259. }
  260. me.index += 1
  261. resp := &etcd.Response{
  262. Action: "delete",
  263. Node: me.copyNode(node, false),
  264. Index: me.index,
  265. }
  266. me.sendEvent(resp)
  267. delete(me.nodes, node.Key)
  268. return resp, nil
  269. }
  270. func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOptions) (*etcd.Response, error) {
  271. node, parent, err := me.findNode(key)
  272. if err != nil {
  273. return nil, err
  274. }
  275. if node == nil {
  276. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  277. }
  278. if opts == nil {
  279. opts = &etcd.DeleteOptions{}
  280. }
  281. if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
  282. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
  283. }
  284. if opts.PrevValue != "" && opts.PrevValue != node.Value {
  285. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
  286. }
  287. if opts.Dir != node.Dir {
  288. if opts.Dir == true {
  289. return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
  290. } else {
  291. return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
  292. }
  293. }
  294. return me.deleteNode(node, parent, opts.Recursive)
  295. }
  296. func (me *mockEtcd) Create(ctx context.Context, key, value string) (*etcd.Response, error) {
  297. return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist}, "create")
  298. }
  299. func (me *mockEtcd) CreateInOrder(ctx context.Context, dir, value string, opts *etcd.CreateInOrderOptions) (*etcd.Response, error) {
  300. panic(fmt.Errorf("Not implemented!"))
  301. }
  302. func (me *mockEtcd) Update(ctx context.Context, key, value string) (*etcd.Response, error) {
  303. return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevExist}, "update")
  304. }
  305. type watcher struct {
  306. parent *mockEtcd
  307. key string
  308. childMatch string
  309. events chan *etcd.Response
  310. after uint64
  311. recursive bool
  312. }
  313. func (me *mockEtcd) Watcher(key string, opts *etcd.WatcherOptions) etcd.Watcher {
  314. watcher := &watcher{
  315. parent: me,
  316. key: key,
  317. childMatch: fmt.Sprintf("%s/", key),
  318. events: make(chan *etcd.Response, 25),
  319. recursive: opts.Recursive,
  320. }
  321. if opts.AfterIndex > 0 {
  322. watcher.after = opts.AfterIndex
  323. }
  324. return watcher
  325. }
  326. func (w *watcher) shouldGrabEvent(resp *etcd.Response) bool {
  327. return (resp.Index > w.after) && ((resp.Node.Key == w.key) || (w.recursive && strings.HasPrefix(resp.Node.Key, w.childMatch)))
  328. }
  329. func (w *watcher) notifyEvent(resp *etcd.Response) {
  330. if w.shouldGrabEvent(resp) {
  331. w.events <- resp
  332. }
  333. }
  334. func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
  335. // If the event is already in the history log return it from there
  336. for _, e := range w.parent.events {
  337. if e.Index > w.after && w.shouldGrabEvent(e) {
  338. w.after = e.Index
  339. return e, nil
  340. }
  341. }
  342. // Watch must handle adding and removing itself from the parent when
  343. // it's done to ensure it can be garbage collected correctly
  344. w.parent.watchers[w] = w
  345. // Otherwise wait for new events
  346. for {
  347. select {
  348. case e := <-w.events:
  349. // Might have already been grabbed through the history log
  350. if e.Index <= w.after {
  351. continue
  352. }
  353. w.after = e.Index
  354. delete(w.parent.watchers, w)
  355. return e, nil
  356. case <-ctx.Done():
  357. delete(w.parent.watchers, w)
  358. return nil, context.Canceled
  359. }
  360. }
  361. }