123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- // Copyright 2015 flannel authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package etcdv2
- import (
- "fmt"
- "strings"
- "sync"
- "time"
- etcd "github.com/coreos/etcd/client"
- "golang.org/x/net/context"
- )
- const DEFAULT_TTL time.Duration = 8760 * time.Hour // one year
- type mockEtcd struct {
- mux sync.Mutex
- nodes map[string]*etcd.Node
- watchers map[*watcher]struct{}
- // A given number of past events must be available for watchers, because
- // flannel always uses a new watcher instead of re-using old ones, and
- // the new watcher's index may be slightly in the past
- events []*etcd.Response
- index uint64
- }
- func newMockEtcd() *mockEtcd {
- me := &mockEtcd{
- index: 1000,
- nodes: make(map[string]*etcd.Node),
- watchers: make(map[*watcher]struct{}),
- events: make([]*etcd.Response, 0, 50),
- }
- me.nodes["/"] = me.newNode("/", "", true)
- return me
- }
- func (me *mockEtcd) newNode(key, value string, dir bool) *etcd.Node {
- exp := time.Now().Add(DEFAULT_TTL)
- if dir {
- value = ""
- }
- return &etcd.Node{
- Key: key,
- Value: value,
- CreatedIndex: me.index,
- ModifiedIndex: me.index,
- Dir: dir,
- Expiration: &exp,
- Nodes: make([]*etcd.Node, 0, 20)}
- }
- func (me *mockEtcd) newError(code int, format string, args ...interface{}) etcd.Error {
- msg := fmt.Sprintf(format, args...)
- return etcd.Error{
- Code: code,
- Message: msg,
- Cause: "",
- Index: me.index,
- }
- }
- func (me *mockEtcd) getKeyPath(key string) ([]string, error) {
- if !strings.HasPrefix(key, "/") {
- return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
- }
- // Build up a list of each intermediate key's path
- path := []string{""}
- for i, p := range strings.Split(strings.Trim(key, "/"), "/") {
- if p == "" {
- return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
- }
- path = append(path, fmt.Sprintf("%s/%s", path[i], p))
- }
- return path[1:], nil
- }
- // Returns the node and its parent respectively. Returns a nil node (but not
- // an error) if the requested node doest not exist.
- func (me *mockEtcd) findNode(key string) (*etcd.Node, *etcd.Node, error) {
- if key == "/" {
- return me.nodes["/"], nil, nil
- }
- path, err := me.getKeyPath(key)
- if err != nil {
- return nil, nil, err
- }
- var node *etcd.Node
- var parent *etcd.Node
- var ok bool
- for i, part := range path {
- parent = node
- node, ok = me.nodes[part]
- if !ok {
- return nil, nil, nil
- }
- // intermediates must be directories
- if i < len(path)-1 && node.Dir != true {
- return nil, nil, me.newError(etcd.ErrorCodeNotDir, "Intermediate node %s not a directory", part)
- }
- }
- return node, parent, nil
- }
- // Returns whether @child is a child of @node, and whether it is an immediate child respsectively
- func isChild(node *etcd.Node, child *etcd.Node) (bool, bool) {
- if !strings.HasPrefix(child.Key, fmt.Sprintf("%s/", node.Key)) {
- return false, false
- }
- nodeParts := strings.Split(node.Key, "/")
- childParts := strings.Split(child.Key, "/")
- return true, len(childParts) == len(nodeParts)+1
- }
- func (me *mockEtcd) copyNode(node *etcd.Node, recursive bool) *etcd.Node {
- n := *node
- n.Nodes = make([]*etcd.Node, 0)
- if recursive {
- for _, child := range me.nodes {
- if _, directChild := isChild(node, child); directChild {
- n.Nodes = append(n.Nodes, me.copyNode(child, true))
- }
- }
- }
- return &n
- }
- func (me *mockEtcd) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
- me.mux.Lock()
- defer me.mux.Unlock()
- node, _, err := me.findNode(key)
- if err != nil {
- return nil, err
- }
- if node == nil {
- return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
- }
- if opts == nil {
- opts = &etcd.GetOptions{}
- }
- return &etcd.Response{
- Action: "get",
- Node: me.copyNode(node, opts.Recursive),
- Index: me.index,
- }, nil
- }
- func (me *mockEtcd) sendEvent(resp *etcd.Response) {
- // Add to history log
- if len(me.events) == cap(me.events) {
- me.events = me.events[1:]
- }
- me.events = append(me.events, resp)
- // and notify watchers
- for w, _ := range me.watchers {
- w.notifyEvent(resp)
- }
- }
- // Returns the node created and its creation response
- // Don't need to check for intermediate directories here as that was already done
- // by the thing calling makeNode()
- func (me *mockEtcd) makeNode(path []string, value string, isDir bool, ttl time.Duration) (*etcd.Node, *etcd.Response, error) {
- var child *etcd.Node
- var resp *etcd.Response
- var ok bool
- node := me.nodes["/"]
- for i, part := range path {
- node, ok = me.nodes[part]
- if !ok {
- me.index += 1
- if i < len(path)-1 {
- // intermediate node
- child = me.newNode(part, "", true)
- } else {
- // Final node
- exp := time.Now().Add(ttl)
- child = me.newNode(part, value, isDir)
- child.Expiration = &exp
- resp = &etcd.Response{
- Action: "create",
- Node: me.copyNode(child, false),
- Index: child.CreatedIndex,
- }
- me.sendEvent(resp)
- }
- me.nodes[child.Key] = child
- node = child
- }
- }
- return node, resp, nil
- }
- func (me *mockEtcd) set(ctx context.Context, key, value string, opts *etcd.SetOptions, action string) (*etcd.Response, error) {
- node, _, err := me.findNode(key)
- if err != nil {
- return nil, err
- }
- if opts.PrevExist == etcd.PrevExist && node == nil {
- return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
- } else if opts.PrevExist == etcd.PrevNoExist && node != nil {
- return nil, me.newError(etcd.ErrorCodeNodeExist, "Key %s already exists", key)
- }
- if opts.Dir {
- value = ""
- }
- var resp *etcd.Response
- if node != nil {
- if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
- return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
- }
- if opts.Dir != node.Dir {
- if opts.Dir == true {
- return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
- } else {
- return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
- }
- }
- if opts.PrevValue != "" && opts.PrevValue != node.Value {
- return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
- }
- prevNode := me.copyNode(node, false)
- node.Value = value
- me.index += 1
- node.ModifiedIndex = me.index
- if opts.TTL > 0 {
- exp := time.Now().Add(opts.TTL)
- node.Expiration = &exp
- }
- resp = &etcd.Response{
- Action: action,
- Node: me.copyNode(node, false),
- PrevNode: prevNode,
- Index: me.index,
- }
- me.sendEvent(resp)
- } else {
- // Create the node and its parents
- path, err := me.getKeyPath(key)
- if err != nil {
- return nil, err
- }
- _, resp, err = me.makeNode(path, value, opts.Dir, opts.TTL)
- if err != nil {
- return nil, err
- }
- }
- return resp, nil
- }
- func (me *mockEtcd) Set(ctx context.Context, key, value string, opts *etcd.SetOptions) (*etcd.Response, error) {
- me.mux.Lock()
- defer me.mux.Unlock()
- return me.set(ctx, key, value, opts, "set")
- }
- // Removes a node and all children
- func (me *mockEtcd) deleteNode(node *etcd.Node, parent *etcd.Node, recursive bool) (*etcd.Response, error) {
- for _, child := range me.nodes {
- if isChild, directChild := isChild(node, child); isChild {
- if recursive == false {
- return nil, me.newError(etcd.ErrorCodeDirNotEmpty, "Key %s not empty", node.Key)
- }
- if directChild {
- me.deleteNode(child, node, true)
- me.index += 1
- node.ModifiedIndex = me.index
- }
- }
- }
- me.index += 1
- resp := &etcd.Response{
- Action: "delete",
- Node: me.copyNode(node, false),
- Index: me.index,
- }
- me.sendEvent(resp)
- delete(me.nodes, node.Key)
- return resp, nil
- }
- func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOptions) (*etcd.Response, error) {
- me.mux.Lock()
- defer me.mux.Unlock()
- node, parent, err := me.findNode(key)
- if err != nil {
- return nil, err
- }
- if node == nil {
- return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
- }
- if opts == nil {
- opts = &etcd.DeleteOptions{}
- }
- if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
- return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
- }
- if opts.PrevValue != "" && opts.PrevValue != node.Value {
- return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
- }
- if opts.Dir != node.Dir {
- if opts.Dir == true {
- return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
- } else {
- return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
- }
- }
- return me.deleteNode(node, parent, opts.Recursive)
- }
- func (me *mockEtcd) Create(ctx context.Context, key, value string) (*etcd.Response, error) {
- me.mux.Lock()
- defer me.mux.Unlock()
- return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist}, "create")
- }
- func (me *mockEtcd) CreateInOrder(ctx context.Context, dir, value string, opts *etcd.CreateInOrderOptions) (*etcd.Response, error) {
- panic(fmt.Errorf("Not implemented!"))
- }
- func (me *mockEtcd) Update(ctx context.Context, key, value string) (*etcd.Response, error) {
- me.mux.Lock()
- defer me.mux.Unlock()
- return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevExist}, "update")
- }
- type watcher struct {
- parent *mockEtcd
- key string
- childMatch string
- events chan *etcd.Response
- after uint64
- recursive bool
- }
- func (me *mockEtcd) Watcher(key string, opts *etcd.WatcherOptions) etcd.Watcher {
- watcher := &watcher{
- parent: me,
- key: key,
- childMatch: fmt.Sprintf("%s/", key),
- events: make(chan *etcd.Response, 25),
- recursive: opts.Recursive,
- }
- if opts.AfterIndex > 0 {
- watcher.after = opts.AfterIndex
- }
- return watcher
- }
- func (w *watcher) shouldGrabEvent(resp *etcd.Response) bool {
- return (resp.Index > w.after) && ((resp.Node.Key == w.key) || (w.recursive && strings.HasPrefix(resp.Node.Key, w.childMatch)))
- }
- func (w *watcher) notifyEvent(resp *etcd.Response) {
- if w.shouldGrabEvent(resp) {
- w.events <- resp
- }
- }
- func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
- w.parent.mux.Lock()
- // If the event is already in the history log return it from there
- for _, e := range w.parent.events {
- if e.Index > w.after && w.shouldGrabEvent(e) {
- w.after = e.Index
- w.parent.mux.Unlock()
- return e, nil
- }
- }
- // Watch must handle adding and removing itself from the parent when
- // it's done to ensure it can be garbage collected correctly
- w.parent.watchers[w] = struct{}{}
- w.parent.mux.Unlock()
- // Otherwise wait for new events
- for {
- select {
- case e := <-w.events:
- // Might have already been grabbed through the history log
- if e.Index <= w.after {
- continue
- }
- w.after = e.Index
- w.parent.mux.Lock()
- delete(w.parent.watchers, w)
- w.parent.mux.Unlock()
- return e, nil
- case <-ctx.Done():
- w.parent.mux.Lock()
- delete(w.parent.watchers, w)
- w.parent.mux.Unlock()
- return nil, context.Canceled
- }
- }
- }
|