mock_etcd.go 11 KB


  1. // Copyright 2015 flannel authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package subnet
  15. import (
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. etcd "github.com/coreos/etcd/client"
  21. "golang.org/x/net/context"
  22. )
  23. const DEFAULT_TTL time.Duration = 8760 * time.Hour // one year
  24. type mockEtcd struct {
  25. mux sync.Mutex
  26. nodes map[string]*etcd.Node
  27. watchers map[*watcher]struct{}
  28. // A given number of past events must be available for watchers, because
  29. // flannel always uses a new watcher instead of re-using old ones, and
  30. // the new watcher's index may be slightly in the past
  31. events []*etcd.Response
  32. index uint64
  33. }
  34. func newMockEtcd() *mockEtcd {
  35. me := &mockEtcd{
  36. index: 1000,
  37. nodes: make(map[string]*etcd.Node),
  38. watchers: make(map[*watcher]struct{}),
  39. events: make([]*etcd.Response, 0, 50),
  40. }
  41. me.nodes["/"] = me.newNode("/", "", true)
  42. return me
  43. }
  44. func (me *mockEtcd) newNode(key, value string, dir bool) *etcd.Node {
  45. exp := time.Now().Add(DEFAULT_TTL)
  46. if dir {
  47. value = ""
  48. }
  49. return &etcd.Node{
  50. Key: key,
  51. Value: value,
  52. CreatedIndex: me.index,
  53. ModifiedIndex: me.index,
  54. Dir: dir,
  55. Expiration: &exp,
  56. Nodes: make([]*etcd.Node, 0, 20)}
  57. }
  58. func (me *mockEtcd) newError(code int, format string, args ...interface{}) etcd.Error {
  59. msg := fmt.Sprintf(format, args...)
  60. return etcd.Error{
  61. Code: code,
  62. Message: msg,
  63. Cause: "",
  64. Index: me.index,
  65. }
  66. }
  67. func (me *mockEtcd) getKeyPath(key string) ([]string, error) {
  68. if !strings.HasPrefix(key, "/") {
  69. return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
  70. }
  71. // Build up a list of each intermediate key's path
  72. path := []string{""}
  73. for i, p := range strings.Split(strings.Trim(key, "/"), "/") {
  74. if p == "" {
  75. return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
  76. }
  77. path = append(path, fmt.Sprintf("%s/%s", path[i], p))
  78. }
  79. return path[1:], nil
  80. }
  81. // Returns the node and its parent respectively. Returns a nil node (but not
  82. // an error) if the requested node doest not exist.
  83. func (me *mockEtcd) findNode(key string) (*etcd.Node, *etcd.Node, error) {
  84. if key == "/" {
  85. return me.nodes["/"], nil, nil
  86. }
  87. path, err := me.getKeyPath(key)
  88. if err != nil {
  89. return nil, nil, err
  90. }
  91. var node *etcd.Node
  92. var parent *etcd.Node
  93. var ok bool
  94. for i, part := range path {
  95. parent = node
  96. node, ok = me.nodes[part]
  97. if !ok {
  98. return nil, nil, nil
  99. }
  100. // intermediates must be directories
  101. if i < len(path)-1 && node.Dir != true {
  102. return nil, nil, me.newError(etcd.ErrorCodeNotDir, "Intermediate node %s not a directory", part)
  103. }
  104. }
  105. return node, parent, nil
  106. }
  107. // Returns whether @child is a child of @node, and whether it is an immediate child respsectively
  108. func isChild(node *etcd.Node, child *etcd.Node) (bool, bool) {
  109. if !strings.HasPrefix(child.Key, fmt.Sprintf("%s/", node.Key)) {
  110. return false, false
  111. }
  112. nodeParts := strings.Split(node.Key, "/")
  113. childParts := strings.Split(child.Key, "/")
  114. return true, len(childParts) == len(nodeParts)+1
  115. }
  116. func (me *mockEtcd) copyNode(node *etcd.Node, recursive bool) *etcd.Node {
  117. n := *node
  118. n.Nodes = make([]*etcd.Node, 0)
  119. if recursive {
  120. for _, child := range me.nodes {
  121. if _, directChild := isChild(node, child); directChild {
  122. n.Nodes = append(n.Nodes, me.copyNode(child, true))
  123. }
  124. }
  125. }
  126. return &n
  127. }
  128. func (me *mockEtcd) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
  129. me.mux.Lock()
  130. defer me.mux.Unlock()
  131. node, _, err := me.findNode(key)
  132. if err != nil {
  133. return nil, err
  134. }
  135. if node == nil {
  136. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  137. }
  138. if opts == nil {
  139. opts = &etcd.GetOptions{}
  140. }
  141. return &etcd.Response{
  142. Action: "get",
  143. Node: me.copyNode(node, opts.Recursive),
  144. Index: me.index,
  145. }, nil
  146. }
  147. func (me *mockEtcd) sendEvent(resp *etcd.Response) {
  148. // Add to history log
  149. if len(me.events) == cap(me.events) {
  150. me.events = me.events[1:]
  151. }
  152. me.events = append(me.events, resp)
  153. // and notify watchers
  154. for w, _ := range me.watchers {
  155. w.notifyEvent(resp)
  156. }
  157. }
  158. // Returns the node created and its creation response
  159. // Don't need to check for intermediate directories here as that was already done
  160. // by the thing calling makeNode()
  161. func (me *mockEtcd) makeNode(path []string, value string, isDir bool, ttl time.Duration) (*etcd.Node, *etcd.Response, error) {
  162. var child *etcd.Node
  163. var resp *etcd.Response
  164. var ok bool
  165. node := me.nodes["/"]
  166. for i, part := range path {
  167. node, ok = me.nodes[part]
  168. if !ok {
  169. me.index += 1
  170. if i < len(path)-1 {
  171. // intermediate node
  172. child = me.newNode(part, "", true)
  173. } else {
  174. // Final node
  175. exp := time.Now().Add(ttl)
  176. child = me.newNode(part, value, isDir)
  177. child.Expiration = &exp
  178. resp = &etcd.Response{
  179. Action: "create",
  180. Node: me.copyNode(child, false),
  181. Index: child.CreatedIndex,
  182. }
  183. me.sendEvent(resp)
  184. }
  185. me.nodes[child.Key] = child
  186. node = child
  187. }
  188. }
  189. return node, resp, nil
  190. }
  191. func (me *mockEtcd) set(ctx context.Context, key, value string, opts *etcd.SetOptions, action string) (*etcd.Response, error) {
  192. node, _, err := me.findNode(key)
  193. if err != nil {
  194. return nil, err
  195. }
  196. if opts.PrevExist == etcd.PrevExist && node == nil {
  197. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  198. } else if opts.PrevExist == etcd.PrevNoExist && node != nil {
  199. return nil, me.newError(etcd.ErrorCodeNodeExist, "Key %s already exists", key)
  200. }
  201. if opts.Dir {
  202. value = ""
  203. }
  204. var resp *etcd.Response
  205. if node != nil {
  206. if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
  207. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
  208. }
  209. if opts.Dir != node.Dir {
  210. if opts.Dir == true {
  211. return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
  212. } else {
  213. return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
  214. }
  215. }
  216. if opts.PrevValue != "" && opts.PrevValue != node.Value {
  217. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
  218. }
  219. prevNode := me.copyNode(node, false)
  220. node.Value = value
  221. me.index += 1
  222. node.ModifiedIndex = me.index
  223. if opts.TTL > 0 {
  224. exp := time.Now().Add(opts.TTL)
  225. node.Expiration = &exp
  226. }
  227. resp = &etcd.Response{
  228. Action: action,
  229. Node: me.copyNode(node, false),
  230. PrevNode: prevNode,
  231. Index: me.index,
  232. }
  233. me.sendEvent(resp)
  234. } else {
  235. // Create the node and its parents
  236. path, err := me.getKeyPath(key)
  237. if err != nil {
  238. return nil, err
  239. }
  240. _, resp, err = me.makeNode(path, value, opts.Dir, opts.TTL)
  241. if err != nil {
  242. return nil, err
  243. }
  244. }
  245. return resp, nil
  246. }
  247. func (me *mockEtcd) Set(ctx context.Context, key, value string, opts *etcd.SetOptions) (*etcd.Response, error) {
  248. me.mux.Lock()
  249. defer me.mux.Unlock()
  250. return me.set(ctx, key, value, opts, "set")
  251. }
  252. // Removes a node and all children
  253. func (me *mockEtcd) deleteNode(node *etcd.Node, parent *etcd.Node, recursive bool) (*etcd.Response, error) {
  254. for _, child := range me.nodes {
  255. if isChild, directChild := isChild(node, child); isChild {
  256. if recursive == false {
  257. return nil, me.newError(etcd.ErrorCodeDirNotEmpty, "Key %s not empty", node.Key)
  258. }
  259. if directChild {
  260. me.deleteNode(child, node, true)
  261. me.index += 1
  262. node.ModifiedIndex = me.index
  263. }
  264. }
  265. }
  266. me.index += 1
  267. resp := &etcd.Response{
  268. Action: "delete",
  269. Node: me.copyNode(node, false),
  270. Index: me.index,
  271. }
  272. me.sendEvent(resp)
  273. delete(me.nodes, node.Key)
  274. return resp, nil
  275. }
  276. func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOptions) (*etcd.Response, error) {
  277. me.mux.Lock()
  278. defer me.mux.Unlock()
  279. node, parent, err := me.findNode(key)
  280. if err != nil {
  281. return nil, err
  282. }
  283. if node == nil {
  284. return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
  285. }
  286. if opts == nil {
  287. opts = &etcd.DeleteOptions{}
  288. }
  289. if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
  290. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
  291. }
  292. if opts.PrevValue != "" && opts.PrevValue != node.Value {
  293. return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
  294. }
  295. if opts.Dir != node.Dir {
  296. if opts.Dir == true {
  297. return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
  298. } else {
  299. return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
  300. }
  301. }
  302. return me.deleteNode(node, parent, opts.Recursive)
  303. }
  304. func (me *mockEtcd) Create(ctx context.Context, key, value string) (*etcd.Response, error) {
  305. me.mux.Lock()
  306. defer me.mux.Unlock()
  307. return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist}, "create")
  308. }
  309. func (me *mockEtcd) CreateInOrder(ctx context.Context, dir, value string, opts *etcd.CreateInOrderOptions) (*etcd.Response, error) {
  310. panic(fmt.Errorf("Not implemented!"))
  311. }
  312. func (me *mockEtcd) Update(ctx context.Context, key, value string) (*etcd.Response, error) {
  313. me.mux.Lock()
  314. defer me.mux.Unlock()
  315. return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevExist}, "update")
  316. }
  317. type watcher struct {
  318. parent *mockEtcd
  319. key string
  320. childMatch string
  321. events chan *etcd.Response
  322. after uint64
  323. recursive bool
  324. }
  325. func (me *mockEtcd) Watcher(key string, opts *etcd.WatcherOptions) etcd.Watcher {
  326. watcher := &watcher{
  327. parent: me,
  328. key: key,
  329. childMatch: fmt.Sprintf("%s/", key),
  330. events: make(chan *etcd.Response, 25),
  331. recursive: opts.Recursive,
  332. }
  333. if opts.AfterIndex > 0 {
  334. watcher.after = opts.AfterIndex
  335. }
  336. return watcher
  337. }
  338. func (w *watcher) shouldGrabEvent(resp *etcd.Response) bool {
  339. return (resp.Index > w.after) && ((resp.Node.Key == w.key) || (w.recursive && strings.HasPrefix(resp.Node.Key, w.childMatch)))
  340. }
  341. func (w *watcher) notifyEvent(resp *etcd.Response) {
  342. if w.shouldGrabEvent(resp) {
  343. w.events <- resp
  344. }
  345. }
  346. func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
  347. w.parent.mux.Lock()
  348. // If the event is already in the history log return it from there
  349. for _, e := range w.parent.events {
  350. if e.Index > w.after && w.shouldGrabEvent(e) {
  351. w.after = e.Index
  352. w.parent.mux.Unlock()
  353. return e, nil
  354. }
  355. }
  356. // Watch must handle adding and removing itself from the parent when
  357. // it's done to ensure it can be garbage collected correctly
  358. w.parent.watchers[w] = struct{}{}
  359. w.parent.mux.Unlock()
  360. // Otherwise wait for new events
  361. for {
  362. select {
  363. case e := <-w.events:
  364. // Might have already been grabbed through the history log
  365. if e.Index <= w.after {
  366. continue
  367. }
  368. w.after = e.Index
  369. w.parent.mux.Lock()
  370. delete(w.parent.watchers, w)
  371. w.parent.mux.Unlock()
  372. return e, nil
  373. case <-ctx.Done():
  374. w.parent.mux.Lock()
  375. delete(w.parent.watchers, w)
  376. w.parent.mux.Unlock()
  377. return nil, context.Canceled
  378. }
  379. }
  380. }