|
@@ -5,7 +5,7 @@ import (
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
- "math/rand"
|
|
|
+ "net"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"path"
|
|
@@ -39,15 +39,9 @@ func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan
|
|
|
// getCancelable issues a cancelable GET request
|
|
|
func (c *Client) getCancelable(key string, options Options,
|
|
|
cancel <-chan bool) (*RawResponse, error) {
|
|
|
- logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
|
|
+ logger.Debugf("get %s [%s]", key, c.cluster.pick())
|
|
|
p := keyToPath(key)
|
|
|
|
|
|
- // If consistency level is set to STRONG, append
|
|
|
- // the `consistent` query string.
|
|
|
- if c.config.Consistency == STRONG_CONSISTENCY {
|
|
|
- options["consistent"] = true
|
|
|
- }
|
|
|
-
|
|
|
str, err := options.toParameters(VALID_GET_OPTIONS)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -73,7 +67,7 @@ func (c *Client) get(key string, options Options) (*RawResponse, error) {
|
|
|
func (c *Client) put(key string, value string, ttl uint64,
|
|
|
options Options) (*RawResponse, error) {
|
|
|
|
|
|
- logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
+ logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
|
|
p := keyToPath(key)
|
|
|
|
|
|
str, err := options.toParameters(VALID_PUT_OPTIONS)
|
|
@@ -94,7 +88,7 @@ func (c *Client) put(key string, value string, ttl uint64,
|
|
|
|
|
|
// post issues a POST request
|
|
|
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
|
|
- logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
+ logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
|
|
p := keyToPath(key)
|
|
|
|
|
|
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
|
|
@@ -109,7 +103,7 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error
|
|
|
|
|
|
// delete issues a DELETE request
|
|
|
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|
|
- logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
|
|
|
+ logger.Debugf("delete %s [%s]", key, c.cluster.pick())
|
|
|
p := keyToPath(key)
|
|
|
|
|
|
str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
|
@@ -130,7 +124,6 @@ func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|
|
|
|
|
// SendRequest sends a HTTP request and returns a Response as defined by etcd
|
|
|
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
-
|
|
|
var req *http.Request
|
|
|
var resp *http.Response
|
|
|
var httpPath string
|
|
@@ -194,16 +187,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- logger.Debug("Connecting to etcd: attempt", attempt+1, "for", rr.RelativePath)
|
|
|
+ logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
|
|
|
|
|
- if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
|
|
- // If it's a GET and consistency level is set to WEAK,
|
|
|
- // then use a random machine.
|
|
|
- httpPath = c.getHttpPath(true, rr.RelativePath)
|
|
|
- } else {
|
|
|
- // Else use the leader.
|
|
|
- httpPath = c.getHttpPath(false, rr.RelativePath)
|
|
|
- }
|
|
|
+ httpPath = c.getHttpPath(rr.RelativePath)
|
|
|
|
|
|
// Return a cURL command if curlChan is set
|
|
|
if c.cURLch != nil {
|
|
@@ -258,24 +244,24 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
|
|
|
// network error, change a machine!
|
|
|
if err != nil {
|
|
|
- logger.Debug("network error:", err.Error())
|
|
|
+ logger.Debug("network error: ", err.Error())
|
|
|
lastResp := http.Response{}
|
|
|
if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil {
|
|
|
return nil, checkErr
|
|
|
}
|
|
|
|
|
|
- c.cluster.switchLeader(attempt % len(c.cluster.Machines))
|
|
|
+ c.cluster.failure()
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
// if there is no error, it should receive response
|
|
|
- logger.Debug("recv.response.from", httpPath)
|
|
|
+ logger.Debug("recv.response.from ", httpPath)
|
|
|
|
|
|
if validHttpStatusCode[resp.StatusCode] {
|
|
|
// try to read byte code and break the loop
|
|
|
respBody, err = ioutil.ReadAll(resp.Body)
|
|
|
if err == nil {
|
|
|
- logger.Debug("recv.success.", httpPath)
|
|
|
+ logger.Debug("recv.success ", httpPath)
|
|
|
break
|
|
|
}
|
|
|
// ReadAll error may be caused due to cancel request
|
|
@@ -295,22 +281,6 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // if resp is TemporaryRedirect, set the new leader and retry
|
|
|
- if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
|
- u, err := resp.Location()
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- logger.Warning(err)
|
|
|
- } else {
|
|
|
- // Update cluster leader based on redirect location
|
|
|
- // because it should point to the leader address
|
|
|
- c.cluster.updateLeaderFromURL(u)
|
|
|
- logger.Debug("recv.response.relocate", u.String())
|
|
|
- }
|
|
|
- resp.Body.Close()
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
if checkErr := checkRetry(c.cluster, numReqs, *resp,
|
|
|
errors.New("Unexpected HTTP status code")); checkErr != nil {
|
|
|
return nil, checkErr
|
|
@@ -333,34 +303,53 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
|
|
err error) error {
|
|
|
|
|
|
- if numReqs >= 2*len(cluster.Machines) {
|
|
|
- return newError(ErrCodeEtcdNotReachable,
|
|
|
- "Tried to connect to each peer twice and failed", 0)
|
|
|
+ if isEmptyResponse(lastResp) {
|
|
|
+ if !isConnectionError(err) {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ } else if !shouldRetry(lastResp) {
|
|
|
+ body := []byte("nil")
|
|
|
+ if lastResp.Body != nil {
|
|
|
+ if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
|
|
|
+ body = b
|
|
|
+ }
|
|
|
+ }
|
|
|
+ errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
|
|
|
+ return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
|
|
|
}
|
|
|
|
|
|
- code := lastResp.StatusCode
|
|
|
- if code == http.StatusInternalServerError {
|
|
|
+ if numReqs > 2*len(cluster.Machines) {
|
|
|
+ errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
|
|
|
+ return newError(ErrCodeEtcdNotReachable, errStr, 0)
|
|
|
+ }
|
|
|
+ if shouldRetry(lastResp) {
|
|
|
+ // sleep some time and expect leader election finish
|
|
|
time.Sleep(time.Millisecond * 200)
|
|
|
-
|
|
|
}
|
|
|
|
|
|
- logger.Warning("bad response status code", code)
|
|
|
+ logger.Warning("bad response status code", lastResp.StatusCode)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (c *Client) getHttpPath(random bool, s ...string) string {
|
|
|
- var machine string
|
|
|
- if random {
|
|
|
- machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
|
|
|
- } else {
|
|
|
- machine = c.cluster.Leader
|
|
|
- }
|
|
|
+func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
|
|
|
|
|
- fullPath := machine + "/" + version
|
|
|
+func isConnectionError(err error) bool {
|
|
|
+ _, ok := err.(*net.OpError)
|
|
|
+ return ok
|
|
|
+}
|
|
|
+
|
|
|
+// shouldRetry returns whether the reponse deserves retry.
|
|
|
+func shouldRetry(r http.Response) bool {
|
|
|
+ // TODO: only retry when the cluster is in leader election
|
|
|
+ // We cannot do it exactly because etcd doesn't support it well.
|
|
|
+ return r.StatusCode == http.StatusInternalServerError
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Client) getHttpPath(s ...string) string {
|
|
|
+ fullPath := c.cluster.pick() + "/" + version
|
|
|
for _, seg := range s {
|
|
|
fullPath = fullPath + "/" + seg
|
|
|
}
|
|
|
-
|
|
|
return fullPath
|
|
|
}
|
|
|
|
|
@@ -379,11 +368,13 @@ func buildValues(value string, ttl uint64) url.Values {
|
|
|
return v
|
|
|
}
|
|
|
|
|
|
-// convert key string to http path exclude version
|
|
|
+// convert key string to http path exclude version, including URL escaping
|
|
|
// for example: key[foo] -> path[keys/foo]
|
|
|
+// key[/%z] -> path[keys/%25z]
|
|
|
// key[/] -> path[keys/]
|
|
|
func keyToPath(key string) string {
|
|
|
- p := path.Join("keys", key)
|
|
|
+ // URL-escape our key, except for slashes
|
|
|
+ p := strings.Replace(url.QueryEscape(path.Join("keys", key)), "%2F", "/", -1)
|
|
|
|
|
|
// corner case: if key is "/" or "//" ect
|
|
|
// path join will clear the tailing "/"
|