request.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package restclient
  14. import (
  15. "bytes"
  16. "encoding/hex"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "mime"
  21. "net/http"
  22. "net/url"
  23. "path"
  24. "reflect"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "github.com/golang/glog"
  29. "k8s.io/kubernetes/pkg/api/errors"
  30. "k8s.io/kubernetes/pkg/api/unversioned"
  31. "k8s.io/kubernetes/pkg/api/v1"
  32. "k8s.io/kubernetes/pkg/api/validation"
  33. "k8s.io/kubernetes/pkg/client/metrics"
  34. "k8s.io/kubernetes/pkg/fields"
  35. "k8s.io/kubernetes/pkg/labels"
  36. "k8s.io/kubernetes/pkg/runtime"
  37. "k8s.io/kubernetes/pkg/runtime/serializer/streaming"
  38. "k8s.io/kubernetes/pkg/util/flowcontrol"
  39. "k8s.io/kubernetes/pkg/util/net"
  40. "k8s.io/kubernetes/pkg/util/sets"
  41. "k8s.io/kubernetes/pkg/watch"
  42. "k8s.io/kubernetes/pkg/watch/versioned"
  43. )
  44. var (
  45. // specialParams lists parameters that are handled specially and which users of Request
  46. // are therefore not allowed to set manually.
  47. specialParams = sets.NewString("timeout")
  48. // longThrottleLatency defines threshold for logging requests. All requests being
  49. // throttle for more than longThrottleLatency will be logged.
  50. longThrottleLatency = 50 * time.Millisecond
  51. )
  52. // HTTPClient is an interface for testing a request object.
  53. type HTTPClient interface {
  54. Do(req *http.Request) (*http.Response, error)
  55. }
  56. // ResponseWrapper is an interface for getting a response.
  57. // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
  58. type ResponseWrapper interface {
  59. DoRaw() ([]byte, error)
  60. Stream() (io.ReadCloser, error)
  61. }
  62. // RequestConstructionError is returned when there's an error assembling a request.
  63. type RequestConstructionError struct {
  64. Err error
  65. }
  66. // Error returns a textual description of 'r'.
  67. func (r *RequestConstructionError) Error() string {
  68. return fmt.Sprintf("request construction error: '%v'", r.Err)
  69. }
  70. // Request allows for building up a request to a server in a chained fashion.
  71. // Any errors are stored until the end of your call, so you only have to
  72. // check once.
  73. type Request struct {
  74. // required
  75. client HTTPClient
  76. verb string
  77. baseURL *url.URL
  78. content ContentConfig
  79. serializers Serializers
  80. // generic components accessible via method setters
  81. pathPrefix string
  82. subpath string
  83. params url.Values
  84. headers http.Header
  85. // structural elements of the request that are part of the Kubernetes API conventions
  86. namespace string
  87. namespaceSet bool
  88. resource string
  89. resourceName string
  90. subresource string
  91. selector labels.Selector
  92. timeout time.Duration
  93. // output
  94. err error
  95. body io.Reader
  96. // The constructed request and the response
  97. req *http.Request
  98. resp *http.Response
  99. backoffMgr BackoffManager
  100. throttle flowcontrol.RateLimiter
  101. }
  102. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
  103. func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
  104. if backoff == nil {
  105. glog.V(2).Infof("Not implementing request backoff strategy.")
  106. backoff = &NoBackoff{}
  107. }
  108. pathPrefix := "/"
  109. if baseURL != nil {
  110. pathPrefix = path.Join(pathPrefix, baseURL.Path)
  111. }
  112. r := &Request{
  113. client: client,
  114. verb: verb,
  115. baseURL: baseURL,
  116. pathPrefix: path.Join(pathPrefix, versionedAPIPath),
  117. content: content,
  118. serializers: serializers,
  119. backoffMgr: backoff,
  120. throttle: throttle,
  121. }
  122. switch {
  123. case len(content.AcceptContentTypes) > 0:
  124. r.SetHeader("Accept", content.AcceptContentTypes)
  125. case len(content.ContentType) > 0:
  126. r.SetHeader("Accept", content.ContentType+", */*")
  127. }
  128. return r
  129. }
  130. // Prefix adds segments to the relative beginning to the request path. These
  131. // items will be placed before the optional Namespace, Resource, or Name sections.
  132. // Setting AbsPath will clear any previously set Prefix segments
  133. func (r *Request) Prefix(segments ...string) *Request {
  134. if r.err != nil {
  135. return r
  136. }
  137. r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
  138. return r
  139. }
  140. // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
  141. // Namespace, Resource, or Name sections.
  142. func (r *Request) Suffix(segments ...string) *Request {
  143. if r.err != nil {
  144. return r
  145. }
  146. r.subpath = path.Join(r.subpath, path.Join(segments...))
  147. return r
  148. }
  149. // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
  150. func (r *Request) Resource(resource string) *Request {
  151. if r.err != nil {
  152. return r
  153. }
  154. if len(r.resource) != 0 {
  155. r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
  156. return r
  157. }
  158. if msgs := validation.IsValidPathSegmentName(resource); len(msgs) != 0 {
  159. r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
  160. return r
  161. }
  162. r.resource = resource
  163. return r
  164. }
  165. // SubResource sets a sub-resource path which can be multiple segments segment after the resource
  166. // name but before the suffix.
  167. func (r *Request) SubResource(subresources ...string) *Request {
  168. if r.err != nil {
  169. return r
  170. }
  171. subresource := path.Join(subresources...)
  172. if len(r.subresource) != 0 {
  173. r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
  174. return r
  175. }
  176. for _, s := range subresources {
  177. if msgs := validation.IsValidPathSegmentName(s); len(msgs) != 0 {
  178. r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
  179. return r
  180. }
  181. }
  182. r.subresource = subresource
  183. return r
  184. }
  185. // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
  186. func (r *Request) Name(resourceName string) *Request {
  187. if r.err != nil {
  188. return r
  189. }
  190. if len(resourceName) == 0 {
  191. r.err = fmt.Errorf("resource name may not be empty")
  192. return r
  193. }
  194. if len(r.resourceName) != 0 {
  195. r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
  196. return r
  197. }
  198. if msgs := validation.IsValidPathSegmentName(resourceName); len(msgs) != 0 {
  199. r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
  200. return r
  201. }
  202. r.resourceName = resourceName
  203. return r
  204. }
  205. // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
  206. func (r *Request) Namespace(namespace string) *Request {
  207. if r.err != nil {
  208. return r
  209. }
  210. if r.namespaceSet {
  211. r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
  212. return r
  213. }
  214. if msgs := validation.IsValidPathSegmentName(namespace); len(msgs) != 0 {
  215. r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
  216. return r
  217. }
  218. r.namespaceSet = true
  219. r.namespace = namespace
  220. return r
  221. }
  222. // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
  223. func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
  224. if scoped {
  225. return r.Namespace(namespace)
  226. }
  227. return r
  228. }
  229. // AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
  230. // when a single segment is passed.
  231. func (r *Request) AbsPath(segments ...string) *Request {
  232. if r.err != nil {
  233. return r
  234. }
  235. r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
  236. if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
  237. // preserve any trailing slashes for legacy behavior
  238. r.pathPrefix += "/"
  239. }
  240. return r
  241. }
  242. // RequestURI overwrites existing path and parameters with the value of the provided server relative
  243. // URI. Some parameters (those in specialParameters) cannot be overwritten.
  244. func (r *Request) RequestURI(uri string) *Request {
  245. if r.err != nil {
  246. return r
  247. }
  248. locator, err := url.Parse(uri)
  249. if err != nil {
  250. r.err = err
  251. return r
  252. }
  253. r.pathPrefix = locator.Path
  254. if len(locator.Query()) > 0 {
  255. if r.params == nil {
  256. r.params = make(url.Values)
  257. }
  258. for k, v := range locator.Query() {
  259. r.params[k] = v
  260. }
  261. }
  262. return r
  263. }
  264. const (
  265. // A constant that clients can use to refer in a field selector to the object name field.
  266. // Will be automatically emitted as the correct name for the API version.
  267. nodeUnschedulable = "spec.unschedulable"
  268. objectNameField = "metadata.name"
  269. podHost = "spec.nodeName"
  270. podStatus = "status.phase"
  271. secretType = "type"
  272. eventReason = "reason"
  273. eventSource = "source"
  274. eventType = "type"
  275. eventInvolvedKind = "involvedObject.kind"
  276. eventInvolvedNamespace = "involvedObject.namespace"
  277. eventInvolvedName = "involvedObject.name"
  278. eventInvolvedUID = "involvedObject.uid"
  279. eventInvolvedAPIVersion = "involvedObject.apiVersion"
  280. eventInvolvedResourceVersion = "involvedObject.resourceVersion"
  281. eventInvolvedFieldPath = "involvedObject.fieldPath"
  282. )
  283. type clientFieldNameToAPIVersionFieldName map[string]string
  284. func (c clientFieldNameToAPIVersionFieldName) filterField(field, value string) (newField, newValue string, err error) {
  285. newFieldName, ok := c[field]
  286. if !ok {
  287. return "", "", fmt.Errorf("%v - %v - no field mapping defined", field, value)
  288. }
  289. return newFieldName, value, nil
  290. }
  291. type resourceTypeToFieldMapping map[string]clientFieldNameToAPIVersionFieldName
  292. func (r resourceTypeToFieldMapping) filterField(resourceType, field, value string) (newField, newValue string, err error) {
  293. fMapping, ok := r[resourceType]
  294. if !ok {
  295. return "", "", fmt.Errorf("%v - %v - %v - no field mapping defined", resourceType, field, value)
  296. }
  297. return fMapping.filterField(field, value)
  298. }
  299. type versionToResourceToFieldMapping map[unversioned.GroupVersion]resourceTypeToFieldMapping
  300. // filterField transforms the given field/value selector for the given groupVersion and resource
  301. func (v versionToResourceToFieldMapping) filterField(groupVersion *unversioned.GroupVersion, resourceType, field, value string) (newField, newValue string, err error) {
  302. rMapping, ok := v[*groupVersion]
  303. if !ok {
  304. // no groupVersion overrides registered, default to identity mapping
  305. return field, value, nil
  306. }
  307. newField, newValue, err = rMapping.filterField(resourceType, field, value)
  308. if err != nil {
  309. // no groupVersionResource overrides registered, default to identity mapping
  310. return field, value, nil
  311. }
  312. return newField, newValue, nil
  313. }
  314. var fieldMappings = versionToResourceToFieldMapping{
  315. v1.SchemeGroupVersion: resourceTypeToFieldMapping{
  316. "nodes": clientFieldNameToAPIVersionFieldName{
  317. objectNameField: objectNameField,
  318. nodeUnschedulable: nodeUnschedulable,
  319. },
  320. "pods": clientFieldNameToAPIVersionFieldName{
  321. podHost: podHost,
  322. podStatus: podStatus,
  323. },
  324. "secrets": clientFieldNameToAPIVersionFieldName{
  325. secretType: secretType,
  326. },
  327. "serviceAccounts": clientFieldNameToAPIVersionFieldName{
  328. objectNameField: objectNameField,
  329. },
  330. "endpoints": clientFieldNameToAPIVersionFieldName{
  331. objectNameField: objectNameField,
  332. },
  333. "events": clientFieldNameToAPIVersionFieldName{
  334. objectNameField: objectNameField,
  335. eventReason: eventReason,
  336. eventSource: eventSource,
  337. eventType: eventType,
  338. eventInvolvedKind: eventInvolvedKind,
  339. eventInvolvedNamespace: eventInvolvedNamespace,
  340. eventInvolvedName: eventInvolvedName,
  341. eventInvolvedUID: eventInvolvedUID,
  342. eventInvolvedAPIVersion: eventInvolvedAPIVersion,
  343. eventInvolvedResourceVersion: eventInvolvedResourceVersion,
  344. eventInvolvedFieldPath: eventInvolvedFieldPath,
  345. },
  346. },
  347. }
  348. // FieldsSelectorParam adds the given selector as a query parameter with the name paramName.
  349. func (r *Request) FieldsSelectorParam(s fields.Selector) *Request {
  350. if r.err != nil {
  351. return r
  352. }
  353. if s == nil {
  354. return r
  355. }
  356. if s.Empty() {
  357. return r
  358. }
  359. s2, err := s.Transform(func(field, value string) (newField, newValue string, err error) {
  360. return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value)
  361. })
  362. if err != nil {
  363. r.err = err
  364. return r
  365. }
  366. return r.setParam(unversioned.FieldSelectorQueryParam(r.content.GroupVersion.String()), s2.String())
  367. }
  368. // LabelsSelectorParam adds the given selector as a query parameter
  369. func (r *Request) LabelsSelectorParam(s labels.Selector) *Request {
  370. if r.err != nil {
  371. return r
  372. }
  373. if s == nil {
  374. return r
  375. }
  376. if s.Empty() {
  377. return r
  378. }
  379. return r.setParam(unversioned.LabelSelectorQueryParam(r.content.GroupVersion.String()), s.String())
  380. }
  381. // UintParam creates a query parameter with the given value.
  382. func (r *Request) UintParam(paramName string, u uint64) *Request {
  383. if r.err != nil {
  384. return r
  385. }
  386. return r.setParam(paramName, strconv.FormatUint(u, 10))
  387. }
  388. // Param creates a query parameter with the given string value.
  389. func (r *Request) Param(paramName, s string) *Request {
  390. if r.err != nil {
  391. return r
  392. }
  393. return r.setParam(paramName, s)
  394. }
  395. // VersionedParams will take the provided object, serialize it to a map[string][]string using the
  396. // implicit RESTClient API version and the default parameter codec, and then add those as parameters
  397. // to the request. Use this to provide versioned query parameters from client libraries.
  398. func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
  399. if r.err != nil {
  400. return r
  401. }
  402. params, err := codec.EncodeParameters(obj, *r.content.GroupVersion)
  403. if err != nil {
  404. r.err = err
  405. return r
  406. }
  407. for k, v := range params {
  408. for _, value := range v {
  409. // TODO: Move it to setParam method, once we get rid of
  410. // FieldSelectorParam & LabelSelectorParam methods.
  411. if k == unversioned.LabelSelectorQueryParam(r.content.GroupVersion.String()) && value == "" {
  412. // Don't set an empty selector for backward compatibility.
  413. // Since there is no way to get the difference between empty
  414. // and unspecified string, we don't set it to avoid having
  415. // labelSelector= param in every request.
  416. continue
  417. }
  418. if k == unversioned.FieldSelectorQueryParam(r.content.GroupVersion.String()) {
  419. if len(value) == 0 {
  420. // Don't set an empty selector for backward compatibility.
  421. // Since there is no way to get the difference between empty
  422. // and unspecified string, we don't set it to avoid having
  423. // fieldSelector= param in every request.
  424. continue
  425. }
  426. // TODO: Filtering should be handled somewhere else.
  427. selector, err := fields.ParseSelector(value)
  428. if err != nil {
  429. r.err = fmt.Errorf("unparsable field selector: %v", err)
  430. return r
  431. }
  432. filteredSelector, err := selector.Transform(
  433. func(field, value string) (newField, newValue string, err error) {
  434. return fieldMappings.filterField(r.content.GroupVersion, r.resource, field, value)
  435. })
  436. if err != nil {
  437. r.err = fmt.Errorf("untransformable field selector: %v", err)
  438. return r
  439. }
  440. value = filteredSelector.String()
  441. }
  442. r.setParam(k, value)
  443. }
  444. }
  445. return r
  446. }
  447. func (r *Request) setParam(paramName, value string) *Request {
  448. if specialParams.Has(paramName) {
  449. r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
  450. return r
  451. }
  452. if r.params == nil {
  453. r.params = make(url.Values)
  454. }
  455. r.params[paramName] = append(r.params[paramName], value)
  456. return r
  457. }
  458. func (r *Request) SetHeader(key, value string) *Request {
  459. if r.headers == nil {
  460. r.headers = http.Header{}
  461. }
  462. r.headers.Set(key, value)
  463. return r
  464. }
  465. // Timeout makes the request use the given duration as a timeout. Sets the "timeout"
  466. // parameter.
  467. func (r *Request) Timeout(d time.Duration) *Request {
  468. if r.err != nil {
  469. return r
  470. }
  471. r.timeout = d
  472. return r
  473. }
  474. // Body makes the request use obj as the body. Optional.
  475. // If obj is a string, try to read a file of that name.
  476. // If obj is a []byte, send it directly.
  477. // If obj is an io.Reader, use it directly.
  478. // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
  479. // If obj is a runtime.Object and nil, do nothing.
  480. // Otherwise, set an error.
  481. func (r *Request) Body(obj interface{}) *Request {
  482. if r.err != nil {
  483. return r
  484. }
  485. switch t := obj.(type) {
  486. case string:
  487. data, err := ioutil.ReadFile(t)
  488. if err != nil {
  489. r.err = err
  490. return r
  491. }
  492. glog.V(8).Infof("Request Body: %#v", string(data))
  493. r.body = bytes.NewReader(data)
  494. case []byte:
  495. glog.V(8).Infof("Request Body: %#v", string(t))
  496. r.body = bytes.NewReader(t)
  497. case io.Reader:
  498. r.body = t
  499. case runtime.Object:
  500. // callers may pass typed interface pointers, therefore we must check nil with reflection
  501. if reflect.ValueOf(t).IsNil() {
  502. return r
  503. }
  504. data, err := runtime.Encode(r.serializers.Encoder, t)
  505. if err != nil {
  506. r.err = err
  507. return r
  508. }
  509. glog.V(8).Infof("Request Body: %#v", string(data))
  510. r.body = bytes.NewReader(data)
  511. r.SetHeader("Content-Type", r.content.ContentType)
  512. default:
  513. r.err = fmt.Errorf("unknown type used for body: %+v", obj)
  514. }
  515. return r
  516. }
  517. // URL returns the current working URL.
  518. func (r *Request) URL() *url.URL {
  519. p := r.pathPrefix
  520. if r.namespaceSet && len(r.namespace) > 0 {
  521. p = path.Join(p, "namespaces", r.namespace)
  522. }
  523. if len(r.resource) != 0 {
  524. p = path.Join(p, strings.ToLower(r.resource))
  525. }
  526. // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
  527. if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
  528. p = path.Join(p, r.resourceName, r.subresource, r.subpath)
  529. }
  530. finalURL := &url.URL{}
  531. if r.baseURL != nil {
  532. *finalURL = *r.baseURL
  533. }
  534. finalURL.Path = p
  535. query := url.Values{}
  536. for key, values := range r.params {
  537. for _, value := range values {
  538. query.Add(key, value)
  539. }
  540. }
  541. // timeout is handled specially here.
  542. if r.timeout != 0 {
  543. query.Set("timeout", r.timeout.String())
  544. }
  545. finalURL.RawQuery = query.Encode()
  546. return finalURL
  547. }
  548. // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
  549. // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
  550. // parameters will be reset. This creates a copy of the request so as not to change the
  551. // underyling object. This means some useful request info (like the types of field
  552. // selectors in use) will be lost.
  553. // TODO: preserve field selector keys
  554. func (r Request) finalURLTemplate() url.URL {
  555. if len(r.resourceName) != 0 {
  556. r.resourceName = "{name}"
  557. }
  558. if r.namespaceSet && len(r.namespace) != 0 {
  559. r.namespace = "{namespace}"
  560. }
  561. newParams := url.Values{}
  562. v := []string{"{value}"}
  563. for k := range r.params {
  564. newParams[k] = v
  565. }
  566. r.params = newParams
  567. url := r.URL()
  568. return *url
  569. }
  570. func (r *Request) tryThrottle() {
  571. now := time.Now()
  572. if r.throttle != nil {
  573. r.throttle.Accept()
  574. }
  575. if latency := time.Since(now); latency > longThrottleLatency {
  576. glog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
  577. }
  578. }
  579. // Watch attempts to begin watching the requested location.
  580. // Returns a watch.Interface, or an error.
  581. func (r *Request) Watch() (watch.Interface, error) {
  582. // We specifically don't want to rate limit watches, so we
  583. // don't use r.throttle here.
  584. if r.err != nil {
  585. return nil, r.err
  586. }
  587. if r.serializers.Framer == nil {
  588. return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
  589. }
  590. url := r.URL().String()
  591. req, err := http.NewRequest(r.verb, url, r.body)
  592. if err != nil {
  593. return nil, err
  594. }
  595. req.Header = r.headers
  596. client := r.client
  597. if client == nil {
  598. client = http.DefaultClient
  599. }
  600. r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
  601. resp, err := client.Do(req)
  602. updateURLMetrics(r, resp, err)
  603. if r.baseURL != nil {
  604. if err != nil {
  605. r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
  606. } else {
  607. r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
  608. }
  609. }
  610. if err != nil {
  611. // The watch stream mechanism handles many common partial data errors, so closed
  612. // connections can be retried in many cases.
  613. if net.IsProbableEOF(err) {
  614. return watch.NewEmptyWatch(), nil
  615. }
  616. return nil, err
  617. }
  618. if resp.StatusCode != http.StatusOK {
  619. defer resp.Body.Close()
  620. if result := r.transformResponse(resp, req); result.err != nil {
  621. return nil, result.err
  622. }
  623. return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
  624. }
  625. framer := r.serializers.Framer.NewFrameReader(resp.Body)
  626. decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
  627. return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil
  628. }
  629. // updateURLMetrics is a convenience function for pushing metrics.
  630. // It also handles corner cases for incomplete/invalid request data.
  631. func updateURLMetrics(req *Request, resp *http.Response, err error) {
  632. url := "none"
  633. if req.baseURL != nil {
  634. url = req.baseURL.Host
  635. }
  636. // If we have an error (i.e. apiserver down) we report that as a metric label.
  637. if err != nil {
  638. metrics.RequestResult.Increment(err.Error(), req.verb, url)
  639. } else {
  640. //Metrics for failure codes
  641. metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
  642. }
  643. }
  644. // Stream formats and executes the request, and offers streaming of the response.
  645. // Returns io.ReadCloser which could be used for streaming of the response, or an error
  646. // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
  647. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
  648. func (r *Request) Stream() (io.ReadCloser, error) {
  649. if r.err != nil {
  650. return nil, r.err
  651. }
  652. r.tryThrottle()
  653. url := r.URL().String()
  654. req, err := http.NewRequest(r.verb, url, nil)
  655. if err != nil {
  656. return nil, err
  657. }
  658. req.Header = r.headers
  659. client := r.client
  660. if client == nil {
  661. client = http.DefaultClient
  662. }
  663. r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
  664. resp, err := client.Do(req)
  665. updateURLMetrics(r, resp, err)
  666. if r.baseURL != nil {
  667. if err != nil {
  668. r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
  669. } else {
  670. r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
  671. }
  672. }
  673. if err != nil {
  674. return nil, err
  675. }
  676. switch {
  677. case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
  678. return resp.Body, nil
  679. default:
  680. // ensure we close the body before returning the error
  681. defer resp.Body.Close()
  682. // we have a decent shot at taking the object returned, parsing it as a status object and returning a more normal error
  683. bodyBytes, err := ioutil.ReadAll(resp.Body)
  684. if err != nil {
  685. return nil, fmt.Errorf("%v while accessing %v", resp.Status, url)
  686. }
  687. // TODO: Check ContentType.
  688. if runtimeObject, err := runtime.Decode(r.serializers.Decoder, bodyBytes); err == nil {
  689. statusError := errors.FromObject(runtimeObject)
  690. if _, ok := statusError.(errors.APIStatus); ok {
  691. return nil, statusError
  692. }
  693. }
  694. bodyText := string(bodyBytes)
  695. return nil, fmt.Errorf("%s while accessing %v: %s", resp.Status, url, bodyText)
  696. }
  697. }
  698. // request connects to the server and invokes the provided function when a server response is
  699. // received. It handles retry behavior and up front validation of requests. It will invoke
  700. // fn at most once. It will return an error if a problem occurred prior to connecting to the
  701. // server - the provided function is responsible for handling server errors.
  702. func (r *Request) request(fn func(*http.Request, *http.Response)) error {
  703. //Metrics for total request latency
  704. start := time.Now()
  705. defer func() {
  706. metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
  707. }()
  708. if r.err != nil {
  709. glog.V(4).Infof("Error in request: %v", r.err)
  710. return r.err
  711. }
  712. // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
  713. if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
  714. return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
  715. }
  716. if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
  717. return fmt.Errorf("an empty namespace may not be set during creation")
  718. }
  719. client := r.client
  720. if client == nil {
  721. client = http.DefaultClient
  722. }
  723. // Right now we make about ten retry attempts if we get a Retry-After response.
  724. // TODO: Change to a timeout based approach.
  725. maxRetries := 10
  726. retries := 0
  727. for {
  728. url := r.URL().String()
  729. req, err := http.NewRequest(r.verb, url, r.body)
  730. if err != nil {
  731. return err
  732. }
  733. req.Header = r.headers
  734. r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
  735. resp, err := client.Do(req)
  736. updateURLMetrics(r, resp, err)
  737. if err != nil {
  738. r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
  739. } else {
  740. r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
  741. }
  742. if err != nil {
  743. return err
  744. }
  745. done := func() bool {
  746. // Ensure the response body is fully read and closed
  747. // before we reconnect, so that we reuse the same TCP
  748. // connection.
  749. defer func() {
  750. const maxBodySlurpSize = 2 << 10
  751. if resp.ContentLength <= maxBodySlurpSize {
  752. io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
  753. }
  754. resp.Body.Close()
  755. }()
  756. retries++
  757. if seconds, wait := checkWait(resp); wait && retries < maxRetries {
  758. if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
  759. _, err := seeker.Seek(0, 0)
  760. if err != nil {
  761. glog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
  762. fn(req, resp)
  763. return true
  764. }
  765. }
  766. glog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", seconds, retries, url)
  767. r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
  768. return false
  769. }
  770. fn(req, resp)
  771. return true
  772. }()
  773. if done {
  774. return nil
  775. }
  776. }
  777. }
  778. // Do formats and executes the request. Returns a Result object for easy response
  779. // processing.
  780. //
  781. // Error type:
  782. // * If the request can't be constructed, or an error happened earlier while building its
  783. // arguments: *RequestConstructionError
  784. // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
  785. // * http.Client.Do errors are returned directly.
  786. func (r *Request) Do() Result {
  787. r.tryThrottle()
  788. var result Result
  789. err := r.request(func(req *http.Request, resp *http.Response) {
  790. result = r.transformResponse(resp, req)
  791. })
  792. if err != nil {
  793. return Result{err: err}
  794. }
  795. return result
  796. }
  797. // DoRaw executes the request but does not process the response body.
  798. func (r *Request) DoRaw() ([]byte, error) {
  799. r.tryThrottle()
  800. var result Result
  801. err := r.request(func(req *http.Request, resp *http.Response) {
  802. result.body, result.err = ioutil.ReadAll(resp.Body)
  803. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
  804. result.err = r.transformUnstructuredResponseError(resp, req, result.body)
  805. }
  806. })
  807. if err != nil {
  808. return nil, err
  809. }
  810. return result.body, result.err
  811. }
  812. // transformResponse converts an API response into a structured API object
  813. func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
  814. var body []byte
  815. if resp.Body != nil {
  816. if data, err := ioutil.ReadAll(resp.Body); err == nil {
  817. body = data
  818. }
  819. }
  820. if glog.V(8) {
  821. switch {
  822. case bytes.IndexFunc(body, func(r rune) bool { return r < 0x0a }) != -1:
  823. glog.Infof("Response Body:\n%s", hex.Dump(body))
  824. default:
  825. glog.Infof("Response Body: %s", string(body))
  826. }
  827. }
  828. // verify the content type is accurate
  829. contentType := resp.Header.Get("Content-Type")
  830. decoder := r.serializers.Decoder
  831. if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
  832. mediaType, params, err := mime.ParseMediaType(contentType)
  833. if err != nil {
  834. return Result{err: errors.NewInternalError(err)}
  835. }
  836. decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
  837. if err != nil {
  838. // if we fail to negotiate a decoder, treat this as an unstructured error
  839. switch {
  840. case resp.StatusCode == http.StatusSwitchingProtocols:
  841. // no-op, we've been upgraded
  842. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  843. return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
  844. }
  845. return Result{
  846. body: body,
  847. contentType: contentType,
  848. statusCode: resp.StatusCode,
  849. }
  850. }
  851. }
  852. // Did the server give us a status response?
  853. isStatusResponse := false
  854. status := &unversioned.Status{}
  855. // Because release-1.1 server returns Status with empty APIVersion at paths
  856. // to the Extensions resources, we need to use DecodeInto here to provide
  857. // default groupVersion, otherwise a status response won't be correctly
  858. // decoded.
  859. err := runtime.DecodeInto(decoder, body, status)
  860. if err == nil && len(status.Status) > 0 {
  861. isStatusResponse = true
  862. }
  863. switch {
  864. case resp.StatusCode == http.StatusSwitchingProtocols:
  865. // no-op, we've been upgraded
  866. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  867. if !isStatusResponse {
  868. return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
  869. }
  870. return Result{err: errors.FromObject(status)}
  871. }
  872. // If the server gave us a status back, look at what it was.
  873. success := resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusPartialContent
  874. if isStatusResponse && (status.Status != unversioned.StatusSuccess && !success) {
  875. // "Failed" requests are clearly just an error and it makes sense to return them as such.
  876. return Result{err: errors.FromObject(status)}
  877. }
  878. return Result{
  879. body: body,
  880. contentType: contentType,
  881. statusCode: resp.StatusCode,
  882. decoder: decoder,
  883. }
  884. }
  885. // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
  886. // It is expected to transform any response that is not recognizable as a clear server sent error from the
  887. // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
  888. // introduce a level of uncertainty to the responses returned by servers that in common use result in
  889. // unexpected responses. The rough structure is:
  890. //
  891. // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
  892. // - this is the happy path
  893. // - when you get this output, trust what the server sends
  894. // 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
  895. // generate a reasonable facsimile of the original failure.
  896. // - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
  897. // 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
  898. // 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
  899. // initial contact, the presence of mismatched body contents from posted content types
  900. // - Give these a separate distinct error type and capture as much as possible of the original message
  901. //
  902. // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
  903. func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
  904. if body == nil && resp.Body != nil {
  905. if data, err := ioutil.ReadAll(resp.Body); err == nil {
  906. body = data
  907. }
  908. }
  909. glog.V(8).Infof("Response Body: %#v", string(body))
  910. message := "unknown"
  911. if isTextResponse(resp) {
  912. message = strings.TrimSpace(string(body))
  913. }
  914. retryAfter, _ := retryAfterSeconds(resp)
  915. return errors.NewGenericServerResponse(
  916. resp.StatusCode,
  917. req.Method,
  918. unversioned.GroupResource{
  919. Group: r.content.GroupVersion.Group,
  920. Resource: r.resource,
  921. },
  922. r.resourceName,
  923. message,
  924. retryAfter,
  925. true,
  926. )
  927. }
  928. // isTextResponse returns true if the response appears to be a textual media type.
  929. func isTextResponse(resp *http.Response) bool {
  930. contentType := resp.Header.Get("Content-Type")
  931. if len(contentType) == 0 {
  932. return true
  933. }
  934. media, _, err := mime.ParseMediaType(contentType)
  935. if err != nil {
  936. return false
  937. }
  938. return strings.HasPrefix(media, "text/")
  939. }
  940. // checkWait returns true along with a number of seconds if the server instructed us to wait
  941. // before retrying.
  942. func checkWait(resp *http.Response) (int, bool) {
  943. switch r := resp.StatusCode; {
  944. // any 500 error code and 429 can trigger a wait
  945. case r == errors.StatusTooManyRequests, r >= 500:
  946. default:
  947. return 0, false
  948. }
  949. i, ok := retryAfterSeconds(resp)
  950. return i, ok
  951. }
  952. // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
  953. // the header was missing or not a valid number.
  954. func retryAfterSeconds(resp *http.Response) (int, bool) {
  955. if h := resp.Header.Get("Retry-After"); len(h) > 0 {
  956. if i, err := strconv.Atoi(h); err == nil {
  957. return i, true
  958. }
  959. }
  960. return 0, false
  961. }
  962. // Result contains the result of calling Request.Do().
  963. type Result struct {
  964. body []byte
  965. contentType string
  966. err error
  967. statusCode int
  968. decoder runtime.Decoder
  969. }
  970. // Raw returns the raw result.
  971. func (r Result) Raw() ([]byte, error) {
  972. return r.body, r.err
  973. }
  974. // Get returns the result as an object.
  975. func (r Result) Get() (runtime.Object, error) {
  976. if r.err != nil {
  977. return nil, r.err
  978. }
  979. if r.decoder == nil {
  980. return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  981. }
  982. return runtime.Decode(r.decoder, r.body)
  983. }
  984. // StatusCode returns the HTTP status code of the request. (Only valid if no
  985. // error was returned.)
  986. func (r Result) StatusCode(statusCode *int) Result {
  987. *statusCode = r.statusCode
  988. return r
  989. }
  990. // Into stores the result into obj, if possible. If obj is nil it is ignored.
  991. func (r Result) Into(obj runtime.Object) error {
  992. if r.err != nil {
  993. return r.err
  994. }
  995. if r.decoder == nil {
  996. return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  997. }
  998. return runtime.DecodeInto(r.decoder, r.body, obj)
  999. }
  1000. // WasCreated updates the provided bool pointer to whether the server returned
  1001. // 201 created or a different response.
  1002. func (r Result) WasCreated(wasCreated *bool) Result {
  1003. *wasCreated = r.statusCode == http.StatusCreated
  1004. return r
  1005. }
  1006. // Error returns the error executing the request, nil if no error occurred.
  1007. // See the Request.Do() comment for what errors you might get.
  1008. func (r Result) Error() error {
  1009. return r.err
  1010. }