|
@@ -5,8 +5,6 @@ import (
|
|
"fmt"
|
|
"fmt"
|
|
"io"
|
|
"io"
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
- "math/rand"
|
|
|
|
- "net"
|
|
|
|
"net/http"
|
|
"net/http"
|
|
"net/url"
|
|
"net/url"
|
|
"path"
|
|
"path"
|
|
@@ -40,15 +38,9 @@ func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan
|
|
// getCancelable issues a cancelable GET request
|
|
// getCancelable issues a cancelable GET request
|
|
func (c *Client) getCancelable(key string, options Options,
|
|
func (c *Client) getCancelable(key string, options Options,
|
|
cancel <-chan bool) (*RawResponse, error) {
|
|
cancel <-chan bool) (*RawResponse, error) {
|
|
- logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
|
|
|
|
|
+ logger.Debugf("get %s [%s]", key, c.cluster.pick())
|
|
p := keyToPath(key)
|
|
p := keyToPath(key)
|
|
|
|
|
|
- // If consistency level is set to STRONG, append
|
|
|
|
- // the `consistent` query string.
|
|
|
|
- if c.config.Consistency == STRONG_CONSISTENCY {
|
|
|
|
- options["consistent"] = true
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
str, err := options.toParameters(VALID_GET_OPTIONS)
|
|
str, err := options.toParameters(VALID_GET_OPTIONS)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
@@ -74,7 +66,7 @@ func (c *Client) get(key string, options Options) (*RawResponse, error) {
|
|
func (c *Client) put(key string, value string, ttl uint64,
|
|
func (c *Client) put(key string, value string, ttl uint64,
|
|
options Options) (*RawResponse, error) {
|
|
options Options) (*RawResponse, error) {
|
|
|
|
|
|
- logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
|
|
|
+ logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
|
p := keyToPath(key)
|
|
p := keyToPath(key)
|
|
|
|
|
|
str, err := options.toParameters(VALID_PUT_OPTIONS)
|
|
str, err := options.toParameters(VALID_PUT_OPTIONS)
|
|
@@ -95,7 +87,7 @@ func (c *Client) put(key string, value string, ttl uint64,
|
|
|
|
|
|
// post issues a POST request
|
|
// post issues a POST request
|
|
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
|
func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
|
- logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
|
|
|
+ logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick())
|
|
p := keyToPath(key)
|
|
p := keyToPath(key)
|
|
|
|
|
|
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
|
|
req := NewRawRequest("POST", p, buildValues(value, ttl), nil)
|
|
@@ -110,7 +102,7 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error
|
|
|
|
|
|
// delete issues a DELETE request
|
|
// delete issues a DELETE request
|
|
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|
func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|
- logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
|
|
|
|
|
|
+ logger.Debugf("delete %s [%s]", key, c.cluster.pick())
|
|
p := keyToPath(key)
|
|
p := keyToPath(key)
|
|
|
|
|
|
str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
|
str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
|
@@ -131,7 +123,6 @@ func (c *Client) delete(key string, options Options) (*RawResponse, error) {
|
|
|
|
|
|
// SendRequest sends a HTTP request and returns a Response as defined by etcd
|
|
// SendRequest sends a HTTP request and returns a Response as defined by etcd
|
|
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
-
|
|
|
|
var req *http.Request
|
|
var req *http.Request
|
|
var resp *http.Response
|
|
var resp *http.Response
|
|
var httpPath string
|
|
var httpPath string
|
|
@@ -197,13 +188,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
|
|
|
|
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
|
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
|
|
|
|
|
- if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
|
|
|
- // If it's a GET and consistency level is set to WEAK,
|
|
|
|
- // then use a random machine.
|
|
|
|
- httpPath = c.getHttpPath(true, rr.RelativePath)
|
|
|
|
- } else {
|
|
|
|
- // Else use the leader.
|
|
|
|
- httpPath = c.getHttpPath(false, rr.RelativePath)
|
|
|
|
|
|
+ // get httpPath if not set
|
|
|
|
+ if httpPath == "" {
|
|
|
|
+ httpPath = c.getHttpPath(rr.RelativePath)
|
|
}
|
|
}
|
|
|
|
|
|
// Return a cURL command if curlChan is set
|
|
// Return a cURL command if curlChan is set
|
|
@@ -212,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
for key, value := range rr.Values {
|
|
for key, value := range rr.Values {
|
|
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
|
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
|
}
|
|
}
|
|
|
|
+ if c.credentials != nil {
|
|
|
|
+ command += fmt.Sprintf(" -u %s", c.credentials.username)
|
|
|
|
+ }
|
|
c.sendCURL(command)
|
|
c.sendCURL(command)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -241,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if c.credentials != nil {
|
|
|
|
+ req.SetBasicAuth(c.credentials.username, c.credentials.password)
|
|
|
|
+ }
|
|
|
|
+
|
|
resp, err = c.httpClient.Do(req)
|
|
resp, err = c.httpClient.Do(req)
|
|
|
|
+ // clear previous httpPath
|
|
|
|
+ httpPath = ""
|
|
defer func() {
|
|
defer func() {
|
|
if resp != nil {
|
|
if resp != nil {
|
|
resp.Body.Close()
|
|
resp.Body.Close()
|
|
@@ -265,7 +261,7 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
return nil, checkErr
|
|
return nil, checkErr
|
|
}
|
|
}
|
|
|
|
|
|
- c.cluster.switchLeader(attempt % len(c.cluster.Machines))
|
|
|
|
|
|
+ c.cluster.failure()
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
@@ -296,17 +292,14 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // if resp is TemporaryRedirect, set the new leader and retry
|
|
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
u, err := resp.Location()
|
|
u, err := resp.Location()
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.Warning(err)
|
|
logger.Warning(err)
|
|
} else {
|
|
} else {
|
|
- // Update cluster leader based on redirect location
|
|
|
|
- // because it should point to the leader address
|
|
|
|
- c.cluster.updateLeaderFromURL(u)
|
|
|
|
- logger.Debug("recv.response.relocate ", u.String())
|
|
|
|
|
|
+ // set httpPath for following redirection
|
|
|
|
+ httpPath = u.String()
|
|
}
|
|
}
|
|
resp.Body.Close()
|
|
resp.Body.Close()
|
|
continue
|
|
continue
|
|
@@ -335,9 +328,8 @@ func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
|
err error) error {
|
|
err error) error {
|
|
|
|
|
|
if isEmptyResponse(lastResp) {
|
|
if isEmptyResponse(lastResp) {
|
|
- if !isConnectionError(err) {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
|
|
+ // always retry if it failed to get response from one machine
|
|
|
|
+ return nil
|
|
} else if !shouldRetry(lastResp) {
|
|
} else if !shouldRetry(lastResp) {
|
|
body := []byte("nil")
|
|
body := []byte("nil")
|
|
if lastResp.Body != nil {
|
|
if lastResp.Body != nil {
|
|
@@ -364,11 +356,6 @@ func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
|
|
|
|
|
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
|
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
|
|
|
|
|
-func isConnectionError(err error) bool {
|
|
|
|
- _, ok := err.(*net.OpError)
|
|
|
|
- return ok
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
// shouldRetry returns whether the reponse deserves retry.
|
|
// shouldRetry returns whether the reponse deserves retry.
|
|
func shouldRetry(r http.Response) bool {
|
|
func shouldRetry(r http.Response) bool {
|
|
// TODO: only retry when the cluster is in leader election
|
|
// TODO: only retry when the cluster is in leader election
|
|
@@ -376,19 +363,11 @@ func shouldRetry(r http.Response) bool {
|
|
return r.StatusCode == http.StatusInternalServerError
|
|
return r.StatusCode == http.StatusInternalServerError
|
|
}
|
|
}
|
|
|
|
|
|
-func (c *Client) getHttpPath(random bool, s ...string) string {
|
|
|
|
- var machine string
|
|
|
|
- if random {
|
|
|
|
- machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))]
|
|
|
|
- } else {
|
|
|
|
- machine = c.cluster.Leader
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fullPath := machine + "/" + version
|
|
|
|
|
|
+func (c *Client) getHttpPath(s ...string) string {
|
|
|
|
+ fullPath := c.cluster.pick() + "/" + version
|
|
for _, seg := range s {
|
|
for _, seg := range s {
|
|
fullPath = fullPath + "/" + seg
|
|
fullPath = fullPath + "/" + seg
|
|
}
|
|
}
|
|
-
|
|
|
|
return fullPath
|
|
return fullPath
|
|
}
|
|
}
|
|
|
|
|