Browse Source

vendor: update etcd/client

We need at least https://github.com/coreos/etcd/commit/a1ef699aebbe7ebb4da2f90a42e0f612eec08384 to
fix the etcd/client bug that returned the context errors wrapped in ClusterError.  According
to https://github.com/coreos/etcd/tree/master/client#error-handling all context errors
should be returned as-is, but client.go::Do() was returning a ClusterError instead:

-			if err == context.DeadlineExceeded || err == context.Canceled {
-				return nil, nil, cerr
+			// mask previous errors with context error, which is controlled by user
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return nil, nil, err
 			}
Dan Williams 9 years ago
parent
commit
cf91333739

+ 7 - 2
Godeps/Godeps.json

@@ -27,8 +27,13 @@
 		},
 		{
 			"ImportPath": "github.com/coreos/etcd/client",
-			"Comment": "v2.1.1-85-gff0b872",
-			"Rev": "ff0b8723c747e76d2651bcddaff26563128ab216"
+			"Comment": "v2.1.1-135-ga1ef699",
+			"Rev": "a1ef699aebbe7ebb4da2f90a42e0f612eec08384"
+		},
+		{
+			"ImportPath": "github.com/coreos/etcd/pkg/pathutil",
+			"Comment": "v2.1.1-135-ga1ef699",
+			"Rev": "a1ef699aebbe7ebb4da2f90a42e0f612eec08384"
 		},
 		{
 			"ImportPath": "github.com/coreos/etcd/pkg/transport",

+ 20 - 0
Godeps/_workspace/src/github.com/coreos/etcd/client/cancelreq.go

@@ -0,0 +1,20 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq.go
+
+// +build go1.5
+
+package client
+
+import "net/http"
+
+func requestCanceler(tr CancelableTransport, req *http.Request) func() {
+	ch := make(chan struct{})
+	req.Cancel = ch
+
+	return func() {
+		close(ch)
+	}
+}

+ 17 - 0
Godeps/_workspace/src/github.com/coreos/etcd/client/cancelreq_go14.go

@@ -0,0 +1,17 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// borrowed from golang/net/context/ctxhttp/cancelreq_go14.go
+
+// +build !go1.5
+
+package client
+
+import "net/http"
+
+func requestCanceler(tr CancelableTransport, req *http.Request) func() {
+	return func() {
+		tr.CancelRequest(req)
+	}
+}

+ 101 - 17
Godeps/_workspace/src/github.com/coreos/etcd/client/client.go

@@ -22,6 +22,8 @@ import (
 	"net"
 	"net/http"
 	"net/url"
+	"reflect"
+	"sort"
 	"sync"
 	"time"
 
@@ -85,6 +87,23 @@ type Config struct {
 	// Password is the password for the specified user to add as an authorization header
 	// to the request.
 	Password string
+
+	// HeaderTimeoutPerRequest specifies the time limit to wait for response
+	// header in a single request made by the Client. The timeout includes
+	// connection time, any redirects, and header wait time.
+	//
+	// For non-watch GET request, server returns the response body immediately.
+	// For PUT/POST/DELETE request, server will attempt to commit request
+	// before responding, which is expected to take `100ms + 2 * RTT`.
+	// For watch request, server returns the header immediately to notify Client
+	// watch start. But if server is behind some kind of proxy, the response
+	// header may be cached at proxy, and Client cannot rely on this behavior.
+	//
+	// One API call may send multiple requests to different etcd servers until it
+	// succeeds. Use context of the API to specify the overall timeout.
+	//
+	// A HeaderTimeoutPerRequest of zero means no timeout.
+	HeaderTimeoutPerRequest time.Duration
 }
 
 func (cfg *Config) transport() CancelableTransport {
@@ -122,6 +141,22 @@ type Client interface {
 	// Sync updates the internal cache of the etcd cluster's membership.
 	Sync(context.Context) error
 
+	// AutoSync periodically calls Sync() every given interval.
+	// The recommended sync interval is 10 seconds to 1 minute, which does
+	// not bring too much overhead to server and makes client catch up the
+	// cluster change in time.
+	//
+	// The example to use it:
+	//
+	//  for {
+	//      err := client.AutoSync(ctx, 10*time.Second)
+	//      if err == context.DeadlineExceeded || err == context.Canceled {
+	//          break
+	//      }
+	//      log.Print(err)
+	//  }
+	AutoSync(context.Context, time.Duration) error
+
 	// Endpoints returns a copy of the current set of API endpoints used
 	// by Client to resolve HTTP requests. If Sync has ever been called,
 	// this may differ from the initial Endpoints provided in the Config.
@@ -132,7 +167,7 @@ type Client interface {
 
 func New(cfg Config) (Client, error) {
 	c := &httpClusterClient{
-		clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect()),
+		clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
 		rand:          rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
 	}
 	if cfg.Username != "" {
@@ -151,13 +186,14 @@ type httpClient interface {
 	Do(context.Context, httpAction) (*http.Response, []byte, error)
 }
 
-func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc) httpClientFactory {
+func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory {
 	return func(ep url.URL) httpClient {
 		return &redirectFollowingHTTPClient{
 			checkRedirect: cr,
 			client: &simpleHTTPClient{
-				transport: tr,
-				endpoint:  ep,
+				transport:     tr,
+				endpoint:      ep,
+				headerTimeout: headerTimeout,
 			},
 		}
 	}
@@ -239,14 +275,20 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
 		resp, body, err = hc.Do(ctx, action)
 		if err != nil {
 			cerr.Errors = append(cerr.Errors, err)
-			if err == context.DeadlineExceeded || err == context.Canceled {
-				return nil, nil, cerr
+			// mask previous errors with context error, which is controlled by user
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return nil, nil, err
 			}
 			continue
 		}
 		if resp.StatusCode/100 == 5 {
-			// TODO: make sure this is a no leader response
-			cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
+			switch resp.StatusCode {
+			case http.StatusInternalServerError, http.StatusServiceUnavailable:
+				// TODO: make sure this is a no leader response
+				cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
+			default:
+				cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
+			}
 			continue
 		}
 		if k != pinned {
@@ -286,18 +328,47 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
 	for _, m := range ms {
 		eps = append(eps, m.ClientURLs...)
 	}
+	sort.Sort(sort.StringSlice(eps))
+
+	ceps := make([]string, len(c.endpoints))
+	for i, cep := range c.endpoints {
+		ceps[i] = cep.String()
+	}
+	sort.Sort(sort.StringSlice(ceps))
+	// fast path if no change happens
+	// this helps client to pin the endpoint when no cluster change
+	if reflect.DeepEqual(eps, ceps) {
+		return nil
+	}
 
 	return c.reset(eps)
 }
 
+func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
+	ticker := time.NewTicker(interval)
+	defer ticker.Stop()
+	for {
+		err := c.Sync(ctx)
+		if err != nil {
+			return err
+		}
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-ticker.C:
+		}
+	}
+}
+
 type roundTripResponse struct {
 	resp *http.Response
 	err  error
 }
 
 type simpleHTTPClient struct {
-	transport CancelableTransport
-	endpoint  url.URL
+	transport     CancelableTransport
+	endpoint      url.URL
+	headerTimeout time.Duration
 }
 
 func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
@@ -307,6 +378,14 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
 		return nil, nil, err
 	}
 
+	hctx, hcancel := context.WithCancel(ctx)
+	if c.headerTimeout > 0 {
+		hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
+	}
+	defer hcancel()
+
+	reqcancel := requestCanceler(c.transport, req)
+
 	rtchan := make(chan roundTripResponse, 1)
 	go func() {
 		resp, err := c.transport.RoundTrip(req)
@@ -320,12 +399,19 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
 	select {
 	case rtresp := <-rtchan:
 		resp, err = rtresp.resp, rtresp.err
-	case <-ctx.Done():
+	case <-hctx.Done():
 		// cancel and wait for request to actually exit before continuing
-		c.transport.CancelRequest(req)
+		reqcancel()
 		rtresp := <-rtchan
 		resp = rtresp.resp
-		err = ctx.Err()
+		switch {
+		case ctx.Err() != nil:
+			err = ctx.Err()
+		case hctx.Err() != nil:
+			err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String())
+		default:
+			panic("failed to get error from context")
+		}
 	}
 
 	// always check for resp nil-ness to deal with possible
@@ -349,11 +435,9 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
 
 	select {
 	case <-ctx.Done():
-		err = resp.Body.Close()
+		resp.Body.Close()
 		<-done
-		if err == nil {
-			err = ctx.Err()
-		}
+		return nil, nil, ctx.Err()
 	case <-done:
 	}
 

+ 163 - 64
Godeps/_workspace/src/github.com/coreos/etcd/client/client_test.go

@@ -109,25 +109,6 @@ func newFakeTransport() *fakeTransport {
 	}
 }
 
-func (t *fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {
-	select {
-	case resp := <-t.respchan:
-		return resp, nil
-	case err := <-t.errchan:
-		return nil, err
-	case <-t.startCancel:
-		select {
-		// this simulates that the request is finished before cancel effects
-		case resp := <-t.respchan:
-			return resp, nil
-		// wait on finishCancel to simulate taking some amount of
-		// time while calling CancelRequest
-		case <-t.finishCancel:
-			return nil, errors.New("cancelled")
-		}
-	}
-}
-
 func (t *fakeTransport) CancelRequest(*http.Request) {
 	t.startCancel <- struct{}{}
 }
@@ -257,8 +238,8 @@ func TestSimpleHTTPClientDoCancelContextResponseBodyClosedWithBlockingBody(t *te
 	}()
 
 	_, _, err := c.Do(ctx, &fakeAction{})
-	if err == nil {
-		t.Fatalf("expected non-nil error, got nil")
+	if err != context.Canceled {
+		t.Fatalf("expected %+v, got %+v", context.Canceled, err)
 	}
 
 	if !body.closed {
@@ -297,6 +278,27 @@ func TestSimpleHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) {
 	}
 }
 
+func TestSimpleHTTPClientDoHeaderTimeout(t *testing.T) {
+	tr := newFakeTransport()
+	tr.finishCancel <- struct{}{}
+	c := &simpleHTTPClient{transport: tr, headerTimeout: time.Millisecond}
+
+	errc := make(chan error)
+	go func() {
+		_, _, err := c.Do(context.Background(), &fakeAction{})
+		errc <- err
+	}()
+
+	select {
+	case err := <-errc:
+		if err == nil {
+			t.Fatalf("expected non-nil error, got nil")
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("unexpected timeout when waitting for the test to finish")
+	}
+}
+
 func TestHTTPClusterClientDo(t *testing.T) {
 	fakeErr := errors.New("fake!")
 	fakeURL := url.URL{}
@@ -312,8 +314,8 @@ func TestHTTPClusterClientDo(t *testing.T) {
 				endpoints: []url.URL{fakeURL, fakeURL},
 				clientFactory: newStaticHTTPClientFactory(
 					[]staticHTTPResponse{
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
-						staticHTTPResponse{err: fakeErr},
+						{resp: http.Response{StatusCode: http.StatusTeapot}},
+						{err: fakeErr},
 					},
 				),
 				rand: rand.New(rand.NewSource(0)),
@@ -327,8 +329,8 @@ func TestHTTPClusterClientDo(t *testing.T) {
 				endpoints: []url.URL{fakeURL, fakeURL},
 				clientFactory: newStaticHTTPClientFactory(
 					[]staticHTTPResponse{
-						staticHTTPResponse{err: fakeErr},
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
+						{err: fakeErr},
+						{resp: http.Response{StatusCode: http.StatusTeapot}},
 					},
 				),
 				rand: rand.New(rand.NewSource(0)),
@@ -337,41 +339,26 @@ func TestHTTPClusterClientDo(t *testing.T) {
 			wantPinned: 1,
 		},
 
-		// context.DeadlineExceeded short-circuits Do
-		{
-			client: &httpClusterClient{
-				endpoints: []url.URL{fakeURL, fakeURL},
-				clientFactory: newStaticHTTPClientFactory(
-					[]staticHTTPResponse{
-						staticHTTPResponse{err: context.DeadlineExceeded},
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
-					},
-				),
-				rand: rand.New(rand.NewSource(0)),
-			},
-			wantErr: &ClusterError{Errors: []error{context.DeadlineExceeded}},
-		},
-
 		// context.Canceled short-circuits Do
 		{
 			client: &httpClusterClient{
 				endpoints: []url.URL{fakeURL, fakeURL},
 				clientFactory: newStaticHTTPClientFactory(
 					[]staticHTTPResponse{
-						staticHTTPResponse{err: context.Canceled},
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
+						{err: context.Canceled},
+						{resp: http.Response{StatusCode: http.StatusTeapot}},
 					},
 				),
 				rand: rand.New(rand.NewSource(0)),
 			},
-			wantErr: &ClusterError{Errors: []error{context.Canceled}},
+			wantErr: context.Canceled,
 		},
 
 		// return err if there are no endpoints
 		{
 			client: &httpClusterClient{
 				endpoints:     []url.URL{},
-				clientFactory: newHTTPClientFactory(nil, nil),
+				clientFactory: newHTTPClientFactory(nil, nil, 0),
 				rand:          rand.New(rand.NewSource(0)),
 			},
 			wantErr: ErrNoEndpoints,
@@ -383,8 +370,8 @@ func TestHTTPClusterClientDo(t *testing.T) {
 				endpoints: []url.URL{fakeURL, fakeURL},
 				clientFactory: newStaticHTTPClientFactory(
 					[]staticHTTPResponse{
-						staticHTTPResponse{err: fakeErr},
-						staticHTTPResponse{err: fakeErr},
+						{err: fakeErr},
+						{err: fakeErr},
 					},
 				),
 				rand: rand.New(rand.NewSource(0)),
@@ -398,8 +385,8 @@ func TestHTTPClusterClientDo(t *testing.T) {
 				endpoints: []url.URL{fakeURL, fakeURL},
 				clientFactory: newStaticHTTPClientFactory(
 					[]staticHTTPResponse{
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusBadGateway}},
-						staticHTTPResponse{resp: http.Response{StatusCode: http.StatusTeapot}},
+						{resp: http.Response{StatusCode: http.StatusBadGateway}},
+						{resp: http.Response{StatusCode: http.StatusTeapot}},
 					},
 				),
 				rand: rand.New(rand.NewSource(0)),
@@ -434,6 +421,33 @@ func TestHTTPClusterClientDo(t *testing.T) {
 	}
 }
 
+func TestHTTPClusterClientDoDeadlineExceedContext(t *testing.T) {
+	fakeURL := url.URL{}
+	tr := newFakeTransport()
+	tr.finishCancel <- struct{}{}
+	c := &httpClusterClient{
+		clientFactory: newHTTPClientFactory(tr, DefaultCheckRedirect, 0),
+		endpoints:     []url.URL{fakeURL},
+	}
+
+	errc := make(chan error)
+	go func() {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
+		defer cancel()
+		_, _, err := c.Do(ctx, &fakeAction{})
+		errc <- err
+	}()
+
+	select {
+	case err := <-errc:
+		if err != context.DeadlineExceeded {
+			t.Errorf("err = %+v, want %+v", err, context.DeadlineExceeded)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("unexpected timeout when waitting for request to deadline exceed")
+	}
+}
+
 func TestRedirectedHTTPAction(t *testing.T) {
 	act := &redirectedHTTPAction{
 		action: &staticHTTPAction{
@@ -480,7 +494,7 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			checkRedirect: func(int) error { return ErrTooManyRedirects },
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						err: errors.New("fail!"),
 					},
 				},
@@ -493,7 +507,7 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			checkRedirect: func(int) error { return ErrTooManyRedirects },
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTeapot,
 						},
@@ -513,13 +527,13 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			},
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{"http://example.com"}},
 						},
 					},
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTeapot,
 						},
@@ -539,19 +553,19 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			},
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{"http://example.com"}},
 						},
 					},
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{"http://example.com"}},
 						},
 					},
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTeapot,
 						},
@@ -571,19 +585,19 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			},
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{"http://example.com"}},
 						},
 					},
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{"http://example.com"}},
 						},
 					},
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTeapot,
 						},
@@ -598,7 +612,7 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			checkRedirect: func(int) error { return ErrTooManyRedirects },
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 						},
@@ -613,7 +627,7 @@ func TestRedirectFollowingHTTPClient(t *testing.T) {
 			checkRedirect: func(int) error { return ErrTooManyRedirects },
 			client: &multiStaticHTTPClient{
 				responses: []staticHTTPResponse{
-					staticHTTPResponse{
+					{
 						resp: http.Response{
 							StatusCode: http.StatusTemporaryRedirect,
 							Header:     http.Header{"Location": []string{":"}},
@@ -681,7 +695,7 @@ func TestDefaultCheckRedirect(t *testing.T) {
 
 func TestHTTPClusterClientSync(t *testing.T) {
 	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
-		staticHTTPResponse{
+		{
 			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
 			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
 		},
@@ -728,7 +742,7 @@ func TestHTTPClusterClientSync(t *testing.T) {
 
 func TestHTTPClusterClientSyncFail(t *testing.T) {
 	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
-		staticHTTPResponse{err: errors.New("fail!")},
+		{err: errors.New("fail!")},
 	})
 
 	hc := &httpClusterClient{
@@ -757,13 +771,98 @@ func TestHTTPClusterClientSyncFail(t *testing.T) {
 	}
 }
 
+func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+	ctx, cancel := context.WithCancel(context.Background())
+	cancel()
+
+	err = hc.AutoSync(ctx, time.Hour)
+	if err != context.Canceled {
+		t.Fatalf("incorrect error value: want=%v got=%v", context.Canceled, err)
+	}
+}
+
+func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		{err: errors.New("fail!")},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:2379"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+
+	err = hc.AutoSync(context.Background(), time.Hour)
+	if err.Error() != ErrClusterUnavailable.Error() {
+		t.Fatalf("incorrect error value: want=%v got=%v", ErrClusterUnavailable, err)
+	}
+}
+
+// TestHTTPClusterClientSyncPinEndpoint tests that Sync() pins the endpoint when
+// it gets the exactly same member list as before.
+func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+	pinnedEndpoint := hc.endpoints[hc.pinned]
+
+	for i := 0; i < 3; i++ {
+		err = hc.Sync(context.Background())
+		if err != nil {
+			t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
+		}
+
+		if g := hc.endpoints[hc.pinned]; g != pinnedEndpoint {
+			t.Errorf("#%d: pinned endpoint = %s, want %s", i, g, pinnedEndpoint)
+		}
+	}
+}
+
 func TestHTTPClusterClientResetFail(t *testing.T) {
 	tests := [][]string{
 		// need at least one endpoint
-		[]string{},
+		{},
 
 		// urls must be valid
-		[]string{":"},
+		{":"},
 	}
 
 	for i, tt := range tests {

+ 24 - 12
Godeps/_workspace/src/github.com/coreos/etcd/client/keys.go

@@ -14,28 +14,31 @@
 
 package client
 
+//go:generate codecgen -r "Node|Response" -o keys.generated.go keys.go
+
 import (
 	"encoding/json"
 	"errors"
 	"fmt"
 	"net/http"
 	"net/url"
-	"path"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/pathutil"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
 const (
-	ErrorCodeKeyNotFound = 100
-	ErrorCodeTestFailed  = 101
-	ErrorCodeNotFile     = 102
-	ErrorCodeNotDir      = 104
-	ErrorCodeNodeExist   = 105
-	ErrorCodeRootROnly   = 107
-	ErrorCodeDirNotEmpty = 108
+	ErrorCodeKeyNotFound  = 100
+	ErrorCodeTestFailed   = 101
+	ErrorCodeNotFile      = 102
+	ErrorCodeNotDir       = 104
+	ErrorCodeNodeExist    = 105
+	ErrorCodeRootROnly    = 107
+	ErrorCodeDirNotEmpty  = 108
+	ErrorCodeUnauthorized = 110
 
 	ErrorCodePrevValueRequired = 201
 	ErrorCodeTTLNaN            = 202
@@ -66,7 +69,7 @@ var (
 	ErrEmptyBody   = errors.New("client: response body is empty")
 )
 
-// PrevExistType is used to define an existence condition when setting
+// PrevExistType is used to ;define an existence condition when setting
 // or deleting Nodes.
 type PrevExistType string
 
@@ -136,7 +139,7 @@ type WatcherOptions struct {
 	// index, whatever that may be.
 	AfterIndex uint64
 
-	// Recursive specifices whether or not the Watcher should emit
+	// Recursive specifies whether or not the Watcher should emit
 	// events that occur in children of the given keyspace. If set
 	// to false (default), events will be limited to those that
 	// occur for the exact key.
@@ -252,7 +255,7 @@ type Response struct {
 	Node *Node `json:"node"`
 
 	// PrevNode represents the previous state of the Node. PrevNode is non-nil
-	// only if the Node existed before the action occured and the action
+	// only if the Node existed before the action occurred and the action
 	// caused a change to the Node.
 	PrevNode *Node `json:"prevNode"`
 
@@ -445,7 +448,16 @@ func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
 // provided endpoint's path to the root of the keys API
 // (typically "/v2/keys").
 func v2KeysURL(ep url.URL, prefix, key string) *url.URL {
-	ep.Path = path.Join(ep.Path, prefix, key)
+	// We concatenate all parts together manually. We cannot use
+	// path.Join because it does not reserve trailing slash.
+	// We call CanonicalURLPath to further cleanup the path.
+	if prefix != "" && prefix[0] != '/' {
+		prefix = "/" + prefix
+	}
+	if key != "" && key[0] != '/' {
+		key = "/" + key
+	}
+	ep.Path = pathutil.CanonicalURLPath(ep.Path + prefix + key)
 	return &ep
 }
 

+ 23 - 9
Godeps/_workspace/src/github.com/coreos/etcd/client/keys_test.go

@@ -80,6 +80,20 @@ func TestV2KeysURLHelper(t *testing.T) {
 			key:      "/baz",
 			want:     url.URL{Scheme: "https", Host: "example.com", Path: "/foo/bar/baz"},
 		},
+		// Prefix is joined to path
+		{
+			endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/foo"},
+			prefix:   "/bar",
+			key:      "",
+			want:     url.URL{Scheme: "https", Host: "example.com", Path: "/foo/bar"},
+		},
+		// Keep trailing slash
+		{
+			endpoint: url.URL{Scheme: "https", Host: "example.com", Path: "/foo"},
+			prefix:   "/bar",
+			key:      "/baz/",
+			want:     url.URL{Scheme: "https", Host: "example.com", Path: "/foo/bar/baz/"},
+		},
 	}
 
 	for i, tt := range tests {
@@ -91,7 +105,7 @@ func TestV2KeysURLHelper(t *testing.T) {
 }
 
 func TestGetAction(t *testing.T) {
-	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
+	ep := url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}
 	baseWantURL := &url.URL{
 		Scheme: "http",
 		Host:   "example.com",
@@ -157,7 +171,7 @@ func TestGetAction(t *testing.T) {
 }
 
 func TestWaitAction(t *testing.T) {
-	ep := url.URL{Scheme: "http", Host: "example.com/v2/keys"}
+	ep := url.URL{Scheme: "http", Host: "example.com", Path: "/v2/keys"}
 	baseWantURL := &url.URL{
 		Scheme: "http",
 		Host:   "example.com",
@@ -207,7 +221,7 @@ func TestWaitAction(t *testing.T) {
 
 func TestSetAction(t *testing.T) {
 	wantHeader := http.Header(map[string][]string{
-		"Content-Type": []string{"application/x-www-form-urlencoded"},
+		"Content-Type": {"application/x-www-form-urlencoded"},
 	})
 
 	tests := []struct {
@@ -269,7 +283,7 @@ func TestSetAction(t *testing.T) {
 			act: setAction{
 				Key: "/foo/",
 			},
-			wantURL:  "http://example.com/foo",
+			wantURL:  "http://example.com/foo/",
 			wantBody: "value=",
 		},
 
@@ -398,7 +412,7 @@ func TestSetAction(t *testing.T) {
 
 func TestCreateInOrderAction(t *testing.T) {
 	wantHeader := http.Header(map[string][]string{
-		"Content-Type": []string{"application/x-www-form-urlencoded"},
+		"Content-Type": {"application/x-www-form-urlencoded"},
 	})
 
 	tests := []struct {
@@ -460,7 +474,7 @@ func TestCreateInOrderAction(t *testing.T) {
 			act: createInOrderAction{
 				Dir: "/foo/",
 			},
-			wantURL:  "http://example.com/foo",
+			wantURL:  "http://example.com/foo/",
 			wantBody: "value=",
 		},
 
@@ -499,7 +513,7 @@ func TestCreateInOrderAction(t *testing.T) {
 
 func TestDeleteAction(t *testing.T) {
 	wantHeader := http.Header(map[string][]string{
-		"Content-Type": []string{"application/x-www-form-urlencoded"},
+		"Content-Type": {"application/x-www-form-urlencoded"},
 	})
 
 	tests := []struct {
@@ -555,7 +569,7 @@ func TestDeleteAction(t *testing.T) {
 			act: deleteAction{
 				Key: "/foo/",
 			},
-			wantURL: "http://example.com/foo",
+			wantURL: "http://example.com/foo/",
 		},
 
 		// Recursive set to true
@@ -1199,7 +1213,7 @@ func TestHTTPKeysAPIGetResponse(t *testing.T) {
 		Node: &Node{
 			Key: "/pants/foo/bar",
 			Nodes: []*Node{
-				&Node{Key: "/pants/foo/bar/baz", Value: "snarf", CreatedIndex: 21, ModifiedIndex: 25},
+				{Key: "/pants/foo/bar/baz", Value: "snarf", CreatedIndex: 21, ModifiedIndex: 25},
 			},
 			CreatedIndex:  uint64(19),
 			ModifiedIndex: uint64(25),

+ 8 - 8
Godeps/_workspace/src/github.com/coreos/etcd/client/members_test.go

@@ -47,8 +47,8 @@ func TestMembersAPIActionAdd(t *testing.T) {
 	ep := url.URL{Scheme: "http", Host: "example.com"}
 	act := &membersAPIActionAdd{
 		peerURLs: types.URLs([]url.URL{
-			url.URL{Scheme: "https", Host: "127.0.0.1:8081"},
-			url.URL{Scheme: "http", Host: "127.0.0.1:8080"},
+			{Scheme: "https", Host: "127.0.0.1:8081"},
+			{Scheme: "http", Host: "127.0.0.1:8080"},
 		}),
 	}
 
@@ -74,8 +74,8 @@ func TestMembersAPIActionUpdate(t *testing.T) {
 	act := &membersAPIActionUpdate{
 		memberID: "0xabcd",
 		peerURLs: types.URLs([]url.URL{
-			url.URL{Scheme: "https", Host: "127.0.0.1:8081"},
-			url.URL{Scheme: "http", Host: "127.0.0.1:8080"},
+			{Scheme: "https", Host: "127.0.0.1:8081"},
+			{Scheme: "http", Host: "127.0.0.1:8080"},
 		}),
 	}
 
@@ -288,8 +288,8 @@ func TestMemberCollectionUnmarshal(t *testing.T) {
 func TestMemberCreateRequestMarshal(t *testing.T) {
 	req := memberCreateOrUpdateRequest{
 		PeerURLs: types.URLs([]url.URL{
-			url.URL{Scheme: "http", Host: "127.0.0.1:8081"},
-			url.URL{Scheme: "https", Host: "127.0.0.1:8080"},
+			{Scheme: "http", Host: "127.0.0.1:8081"},
+			{Scheme: "https", Host: "127.0.0.1:8080"},
 		}),
 	}
 	want := []byte(`{"peerURLs":["http://127.0.0.1:8081","https://127.0.0.1:8080"]}`)
@@ -307,7 +307,7 @@ func TestMemberCreateRequestMarshal(t *testing.T) {
 func TestHTTPMembersAPIAddSuccess(t *testing.T) {
 	wantAction := &membersAPIActionAdd{
 		peerURLs: types.URLs([]url.URL{
-			url.URL{Scheme: "http", Host: "127.0.0.1:7002"},
+			{Scheme: "http", Host: "127.0.0.1:7002"},
 		}),
 	}
 
@@ -472,7 +472,7 @@ func TestHTTPMembersAPIListSuccess(t *testing.T) {
 	}
 
 	wantResponseMembers := []Member{
-		Member{
+		{
 			ID:         "94088180e21eb87b",
 			Name:       "node2",
 			PeerURLs:   []string{"http://127.0.0.1:7002"},

+ 14 - 14
Godeps/_workspace/src/github.com/coreos/etcd/client/srv_test.go

@@ -36,40 +36,40 @@ func TestSRVDiscover(t *testing.T) {
 		},
 		{
 			[]*net.SRV{
-				&net.SRV{Target: "10.0.0.1", Port: 2480},
-				&net.SRV{Target: "10.0.0.2", Port: 2480},
-				&net.SRV{Target: "10.0.0.3", Port: 2480},
+				{Target: "10.0.0.1", Port: 2480},
+				{Target: "10.0.0.2", Port: 2480},
+				{Target: "10.0.0.3", Port: 2480},
 			},
 			[]*net.SRV{},
 			[]string{"https://10.0.0.1:2480", "https://10.0.0.2:2480", "https://10.0.0.3:2480"},
 		},
 		{
 			[]*net.SRV{
-				&net.SRV{Target: "10.0.0.1", Port: 2480},
-				&net.SRV{Target: "10.0.0.2", Port: 2480},
-				&net.SRV{Target: "10.0.0.3", Port: 2480},
+				{Target: "10.0.0.1", Port: 2480},
+				{Target: "10.0.0.2", Port: 2480},
+				{Target: "10.0.0.3", Port: 2480},
 			},
 			[]*net.SRV{
-				&net.SRV{Target: "10.0.0.1", Port: 7001},
+				{Target: "10.0.0.1", Port: 7001},
 			},
 			[]string{"https://10.0.0.1:2480", "https://10.0.0.2:2480", "https://10.0.0.3:2480", "http://10.0.0.1:7001"},
 		},
 		{
 			[]*net.SRV{
-				&net.SRV{Target: "10.0.0.1", Port: 2480},
-				&net.SRV{Target: "10.0.0.2", Port: 2480},
-				&net.SRV{Target: "10.0.0.3", Port: 2480},
+				{Target: "10.0.0.1", Port: 2480},
+				{Target: "10.0.0.2", Port: 2480},
+				{Target: "10.0.0.3", Port: 2480},
 			},
 			[]*net.SRV{
-				&net.SRV{Target: "10.0.0.1", Port: 7001},
+				{Target: "10.0.0.1", Port: 7001},
 			},
 			[]string{"https://10.0.0.1:2480", "https://10.0.0.2:2480", "https://10.0.0.3:2480", "http://10.0.0.1:7001"},
 		},
 		{
 			[]*net.SRV{
-				&net.SRV{Target: "a.example.com", Port: 2480},
-				&net.SRV{Target: "b.example.com", Port: 2480},
-				&net.SRV{Target: "c.example.com", Port: 2480},
+				{Target: "a.example.com", Port: 2480},
+				{Target: "b.example.com", Port: 2480},
+				{Target: "c.example.com", Port: 2480},
 			},
 			[]*net.SRV{},
 			[]string{"https://a.example.com:2480", "https://b.example.com:2480", "https://c.example.com:2480"},

+ 29 - 0
Godeps/_workspace/src/github.com/coreos/etcd/pkg/pathutil/path.go

@@ -0,0 +1,29 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pathutil
+
+import "path"
+
+// CanonicalURLPath returns the canonical url path for p, which follows the rules:
+// 1. the path always starts with "/"
+// 2. replace multiple slashes with a single slash
+// 3. replace each '.' '..' path name element with equivalent one
+// 4. keep the trailing slash
+// The function is borrowed from stdlib http.cleanPath in server.go.
+func CanonicalURLPath(p string) string {
+	if p == "" {
+		return "/"
+	}
+	if p[0] != '/' {
+		p = "/" + p
+	}
+	np := path.Clean(p)
+	// path.Clean removes trailing slash except for root,
+	// put the trailing slash back if necessary.
+	if p[len(p)-1] == '/' && np != "/" {
+		np += "/"
+	}
+	return np
+}

+ 38 - 0
Godeps/_workspace/src/github.com/coreos/etcd/pkg/pathutil/path_test.go

@@ -0,0 +1,38 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pathutil
+
+import "testing"
+
+func TestCanonicalURLPath(t *testing.T) {
+	tests := []struct {
+		p  string
+		wp string
+	}{
+		{"/a", "/a"},
+		{"", "/"},
+		{"a", "/a"},
+		{"//a", "/a"},
+		{"/a/.", "/a"},
+		{"/a/..", "/"},
+		{"/a/", "/a/"},
+		{"/a//", "/a/"},
+	}
+	for i, tt := range tests {
+		if g := CanonicalURLPath(tt.p); g != tt.wp {
+			t.Errorf("#%d: canonical path = %s, want %s", i, g, tt.wp)
+		}
+	}
+}