resthandler.go 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package apiserver
  14. import (
  15. "encoding/hex"
  16. "encoding/json"
  17. "fmt"
  18. "math/rand"
  19. "net/http"
  20. "net/url"
  21. "strings"
  22. "time"
  23. "k8s.io/kubernetes/pkg/admission"
  24. "k8s.io/kubernetes/pkg/api"
  25. "k8s.io/kubernetes/pkg/api/errors"
  26. "k8s.io/kubernetes/pkg/api/meta"
  27. "k8s.io/kubernetes/pkg/api/rest"
  28. "k8s.io/kubernetes/pkg/api/unversioned"
  29. "k8s.io/kubernetes/pkg/fields"
  30. "k8s.io/kubernetes/pkg/runtime"
  31. "k8s.io/kubernetes/pkg/util"
  32. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  33. "k8s.io/kubernetes/pkg/util/strategicpatch"
  34. "github.com/emicklei/go-restful"
  35. "github.com/evanphx/json-patch"
  36. "github.com/golang/glog"
  37. )
  38. // ContextFunc returns a Context given a request - a context must be returned
  39. type ContextFunc func(req *restful.Request) api.Context
  40. // ScopeNamer handles accessing names from requests and objects
  41. type ScopeNamer interface {
  42. // Namespace returns the appropriate namespace value from the request (may be empty) or an
  43. // error.
  44. Namespace(req *restful.Request) (namespace string, err error)
  45. // Name returns the name from the request, and an optional namespace value if this is a namespace
  46. // scoped call. An error is returned if the name is not available.
  47. Name(req *restful.Request) (namespace, name string, err error)
  48. // ObjectName returns the namespace and name from an object if they exist, or an error if the object
  49. // does not support names.
  50. ObjectName(obj runtime.Object) (namespace, name string, err error)
  51. // SetSelfLink sets the provided URL onto the object. The method should return nil if the object
  52. // does not support selfLinks.
  53. SetSelfLink(obj runtime.Object, url string) error
  54. // GenerateLink creates a path and query for a given runtime object that represents the canonical path.
  55. GenerateLink(req *restful.Request, obj runtime.Object) (path, query string, err error)
  56. // GenerateLink creates a path and query for a list that represents the canonical path.
  57. GenerateListLink(req *restful.Request) (path, query string, err error)
  58. }
  59. // RequestScope encapsulates common fields across all RESTful handler methods.
  60. type RequestScope struct {
  61. Namer ScopeNamer
  62. ContextFunc
  63. Serializer runtime.NegotiatedSerializer
  64. runtime.ParameterCodec
  65. Creater runtime.ObjectCreater
  66. Convertor runtime.ObjectConvertor
  67. Copier runtime.ObjectCopier
  68. Resource unversioned.GroupVersionResource
  69. Kind unversioned.GroupVersionKind
  70. Subresource string
  71. }
  72. func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) {
  73. errorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
  74. }
  75. // getterFunc performs a get request with the given context and object name. The request
  76. // may be used to deserialize an options object to pass to the getter.
  77. type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error)
  78. // MaxPatchConflicts is the maximum number of conflicts retry for during a patch operation before returning failure
  79. const MaxPatchConflicts = 5
  80. // getResourceHandler is an HTTP handler function for get requests. It delegates to the
  81. // passed-in getterFunc to perform the actual get.
  82. func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction {
  83. return func(req *restful.Request, res *restful.Response) {
  84. w := res.ResponseWriter
  85. namespace, name, err := scope.Namer.Name(req)
  86. if err != nil {
  87. scope.err(err, res.ResponseWriter, req.Request)
  88. return
  89. }
  90. ctx := scope.ContextFunc(req)
  91. ctx = api.WithNamespace(ctx, namespace)
  92. result, err := getter(ctx, name, req)
  93. if err != nil {
  94. scope.err(err, res.ResponseWriter, req.Request)
  95. return
  96. }
  97. if err := setSelfLink(result, req, scope.Namer); err != nil {
  98. scope.err(err, res.ResponseWriter, req.Request)
  99. return
  100. }
  101. write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  102. }
  103. }
  104. // GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
  105. func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) restful.RouteFunction {
  106. return getResourceHandler(scope,
  107. func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
  108. // For performance tracking purposes.
  109. trace := util.NewTrace("Get " + req.Request.URL.Path)
  110. defer trace.LogIfLong(250 * time.Millisecond)
  111. // check for export
  112. if values := req.Request.URL.Query(); len(values) > 0 {
  113. // TODO: this is internal version, not unversioned
  114. exports := unversioned.ExportOptions{}
  115. if err := scope.ParameterCodec.DecodeParameters(values, unversioned.GroupVersion{Version: "v1"}, &exports); err != nil {
  116. return nil, err
  117. }
  118. if exports.Export {
  119. if e == nil {
  120. return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
  121. }
  122. return e.Export(ctx, name, exports)
  123. }
  124. }
  125. return r.Get(ctx, name)
  126. })
  127. }
  128. // GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object.
  129. func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope) restful.RouteFunction {
  130. return getResourceHandler(scope,
  131. func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
  132. opts, subpath, subpathKey := r.NewGetOptions()
  133. if err := getRequestOptions(req, scope, opts, subpath, subpathKey); err != nil {
  134. return nil, err
  135. }
  136. return r.Get(ctx, name, opts)
  137. })
  138. }
  139. func getRequestOptions(req *restful.Request, scope RequestScope, into runtime.Object, subpath bool, subpathKey string) error {
  140. if into == nil {
  141. return nil
  142. }
  143. query := req.Request.URL.Query()
  144. if subpath {
  145. newQuery := make(url.Values)
  146. for k, v := range query {
  147. newQuery[k] = v
  148. }
  149. newQuery[subpathKey] = []string{req.PathParameter("path")}
  150. query = newQuery
  151. }
  152. return scope.ParameterCodec.DecodeParameters(query, scope.Kind.GroupVersion(), into)
  153. }
  154. // ConnectResource returns a function that handles a connect request on a rest.Storage object.
  155. func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, restPath string) restful.RouteFunction {
  156. return func(req *restful.Request, res *restful.Response) {
  157. w := res.ResponseWriter
  158. namespace, name, err := scope.Namer.Name(req)
  159. if err != nil {
  160. scope.err(err, res.ResponseWriter, req.Request)
  161. return
  162. }
  163. ctx := scope.ContextFunc(req)
  164. ctx = api.WithNamespace(ctx, namespace)
  165. opts, subpath, subpathKey := connecter.NewConnectOptions()
  166. if err := getRequestOptions(req, scope, opts, subpath, subpathKey); err != nil {
  167. scope.err(err, res.ResponseWriter, req.Request)
  168. return
  169. }
  170. if admit.Handles(admission.Connect) {
  171. connectRequest := &rest.ConnectRequest{
  172. Name: name,
  173. Options: opts,
  174. ResourcePath: restPath,
  175. }
  176. userInfo, _ := api.UserFrom(ctx)
  177. err = admit.Admit(admission.NewAttributesRecord(connectRequest, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, userInfo))
  178. if err != nil {
  179. scope.err(err, res.ResponseWriter, req.Request)
  180. return
  181. }
  182. }
  183. handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, res: res})
  184. if err != nil {
  185. scope.err(err, res.ResponseWriter, req.Request)
  186. return
  187. }
  188. handler.ServeHTTP(w, req.Request)
  189. }
  190. }
  191. // responder implements rest.Responder for assisting a connector in writing objects or errors.
  192. type responder struct {
  193. scope RequestScope
  194. req *restful.Request
  195. res *restful.Response
  196. }
  197. func (r *responder) Object(statusCode int, obj runtime.Object) {
  198. write(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.res.ResponseWriter, r.req.Request)
  199. }
  200. func (r *responder) Error(err error) {
  201. r.scope.err(err, r.res.ResponseWriter, r.req.Request)
  202. }
  203. // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
  204. func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
  205. return func(req *restful.Request, res *restful.Response) {
  206. // For performance tracking purposes.
  207. trace := util.NewTrace("List " + req.Request.URL.Path)
  208. w := res.ResponseWriter
  209. namespace, err := scope.Namer.Namespace(req)
  210. if err != nil {
  211. scope.err(err, res.ResponseWriter, req.Request)
  212. return
  213. }
  214. // Watches for single objects are routed to this function.
  215. // Treat a /name parameter the same as a field selector entry.
  216. hasName := true
  217. _, name, err := scope.Namer.Name(req)
  218. if err != nil {
  219. hasName = false
  220. }
  221. ctx := scope.ContextFunc(req)
  222. ctx = api.WithNamespace(ctx, namespace)
  223. opts := api.ListOptions{}
  224. if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &opts); err != nil {
  225. scope.err(err, res.ResponseWriter, req.Request)
  226. return
  227. }
  228. // transform fields
  229. // TODO: DecodeParametersInto should do this.
  230. if opts.FieldSelector != nil {
  231. fn := func(label, value string) (newLabel, newValue string, err error) {
  232. return scope.Convertor.ConvertFieldLabel(scope.Kind.GroupVersion().String(), scope.Kind.Kind, label, value)
  233. }
  234. if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
  235. // TODO: allow bad request to set field causes based on query parameters
  236. err = errors.NewBadRequest(err.Error())
  237. scope.err(err, res.ResponseWriter, req.Request)
  238. return
  239. }
  240. }
  241. if hasName {
  242. // metadata.name is the canonical internal name.
  243. // generic.SelectionPredicate will notice that this is
  244. // a request for a single object and optimize the
  245. // storage query accordingly.
  246. nameSelector := fields.OneTermEqualSelector("metadata.name", name)
  247. if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
  248. // It doesn't make sense to ask for both a name
  249. // and a field selector, since just the name is
  250. // sufficient to narrow down the request to a
  251. // single object.
  252. scope.err(errors.NewBadRequest("both a name and a field selector provided; please provide one or the other."), res.ResponseWriter, req.Request)
  253. return
  254. }
  255. opts.FieldSelector = nameSelector
  256. }
  257. if (opts.Watch || forceWatch) && rw != nil {
  258. watcher, err := rw.Watch(ctx, &opts)
  259. if err != nil {
  260. scope.err(err, res.ResponseWriter, req.Request)
  261. return
  262. }
  263. // TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
  264. timeout := time.Duration(0)
  265. if opts.TimeoutSeconds != nil {
  266. timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
  267. }
  268. if timeout == 0 && minRequestTimeout > 0 {
  269. timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
  270. }
  271. serveWatch(watcher, scope, req, res, timeout)
  272. return
  273. }
  274. // Log only long List requests (ignore Watch).
  275. defer trace.LogIfLong(500 * time.Millisecond)
  276. trace.Step("About to List from storage")
  277. result, err := r.List(ctx, &opts)
  278. if err != nil {
  279. scope.err(err, res.ResponseWriter, req.Request)
  280. return
  281. }
  282. trace.Step("Listing from storage done")
  283. numberOfItems, err := setListSelfLink(result, req, scope.Namer)
  284. if err != nil {
  285. scope.err(err, res.ResponseWriter, req.Request)
  286. return
  287. }
  288. trace.Step("Self-linking done")
  289. write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  290. trace.Step(fmt.Sprintf("Writing http response done (%d items)", numberOfItems))
  291. }
  292. }
  293. func createHandler(r rest.NamedCreater, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface, includeName bool) restful.RouteFunction {
  294. return func(req *restful.Request, res *restful.Response) {
  295. // For performance tracking purposes.
  296. trace := util.NewTrace("Create " + req.Request.URL.Path)
  297. defer trace.LogIfLong(250 * time.Millisecond)
  298. w := res.ResponseWriter
  299. // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
  300. timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
  301. var (
  302. namespace, name string
  303. err error
  304. )
  305. if includeName {
  306. namespace, name, err = scope.Namer.Name(req)
  307. } else {
  308. namespace, err = scope.Namer.Namespace(req)
  309. }
  310. if err != nil {
  311. scope.err(err, res.ResponseWriter, req.Request)
  312. return
  313. }
  314. ctx := scope.ContextFunc(req)
  315. ctx = api.WithNamespace(ctx, namespace)
  316. gv := scope.Kind.GroupVersion()
  317. s, err := negotiateInputSerializer(req.Request, scope.Serializer)
  318. if err != nil {
  319. scope.err(err, res.ResponseWriter, req.Request)
  320. return
  321. }
  322. decoder := scope.Serializer.DecoderToVersion(s, unversioned.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal})
  323. body, err := readBody(req.Request)
  324. if err != nil {
  325. scope.err(err, res.ResponseWriter, req.Request)
  326. return
  327. }
  328. defaultGVK := scope.Kind
  329. original := r.New()
  330. trace.Step("About to convert to expected version")
  331. obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
  332. if err != nil {
  333. err = transformDecodeError(typer, err, original, gvk, body)
  334. scope.err(err, res.ResponseWriter, req.Request)
  335. return
  336. }
  337. if gvk.GroupVersion() != gv {
  338. err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String()))
  339. scope.err(err, res.ResponseWriter, req.Request)
  340. return
  341. }
  342. trace.Step("Conversion done")
  343. if admit != nil && admit.Handles(admission.Create) {
  344. userInfo, _ := api.UserFrom(ctx)
  345. err = admit.Admit(admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo))
  346. if err != nil {
  347. scope.err(err, res.ResponseWriter, req.Request)
  348. return
  349. }
  350. }
  351. trace.Step("About to store object in database")
  352. result, err := finishRequest(timeout, func() (runtime.Object, error) {
  353. out, err := r.Create(ctx, name, obj)
  354. if status, ok := out.(*unversioned.Status); ok && err == nil && status.Code == 0 {
  355. status.Code = http.StatusCreated
  356. }
  357. return out, err
  358. })
  359. if err != nil {
  360. scope.err(err, res.ResponseWriter, req.Request)
  361. return
  362. }
  363. trace.Step("Object stored in database")
  364. if err := setSelfLink(result, req, scope.Namer); err != nil {
  365. scope.err(err, res.ResponseWriter, req.Request)
  366. return
  367. }
  368. trace.Step("Self-link added")
  369. write(http.StatusCreated, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  370. }
  371. }
  372. // CreateNamedResource returns a function that will handle a resource creation with name.
  373. func CreateNamedResource(r rest.NamedCreater, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface) restful.RouteFunction {
  374. return createHandler(r, scope, typer, admit, true)
  375. }
  376. // CreateResource returns a function that will handle a resource creation.
  377. func CreateResource(r rest.Creater, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface) restful.RouteFunction {
  378. return createHandler(&namedCreaterAdapter{r}, scope, typer, admit, false)
  379. }
  380. type namedCreaterAdapter struct {
  381. rest.Creater
  382. }
  383. func (c *namedCreaterAdapter) Create(ctx api.Context, name string, obj runtime.Object) (runtime.Object, error) {
  384. return c.Creater.Create(ctx, obj)
  385. }
  386. // PatchResource returns a function that will handle a resource patch
  387. // TODO: Eventually PatchResource should just use GuaranteedUpdate and this routine should be a bit cleaner
  388. func PatchResource(r rest.Patcher, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface, converter runtime.ObjectConvertor) restful.RouteFunction {
  389. return func(req *restful.Request, res *restful.Response) {
  390. w := res.ResponseWriter
  391. // TODO: we either want to remove timeout or document it (if we
  392. // document, move timeout out of this function and declare it in
  393. // api_installer)
  394. timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
  395. namespace, name, err := scope.Namer.Name(req)
  396. if err != nil {
  397. scope.err(err, res.ResponseWriter, req.Request)
  398. return
  399. }
  400. ctx := scope.ContextFunc(req)
  401. ctx = api.WithNamespace(ctx, namespace)
  402. versionedObj, err := converter.ConvertToVersion(r.New(), scope.Kind.GroupVersion())
  403. if err != nil {
  404. scope.err(err, res.ResponseWriter, req.Request)
  405. return
  406. }
  407. // TODO: handle this in negotiation
  408. contentType := req.HeaderParameter("Content-Type")
  409. // Remove "; charset=" if included in header.
  410. if idx := strings.Index(contentType, ";"); idx > 0 {
  411. contentType = contentType[:idx]
  412. }
  413. patchType := api.PatchType(contentType)
  414. patchJS, err := readBody(req.Request)
  415. if err != nil {
  416. scope.err(err, res.ResponseWriter, req.Request)
  417. return
  418. }
  419. s, ok := scope.Serializer.SerializerForMediaType("application/json", nil)
  420. if !ok {
  421. scope.err(fmt.Errorf("no serializer defined for JSON"), res.ResponseWriter, req.Request)
  422. return
  423. }
  424. gv := scope.Kind.GroupVersion()
  425. codec := runtime.NewCodec(
  426. scope.Serializer.EncoderForVersion(s, gv),
  427. scope.Serializer.DecoderToVersion(s, unversioned.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}),
  428. )
  429. updateAdmit := func(updatedObject runtime.Object, currentObject runtime.Object) error {
  430. if admit != nil && admit.Handles(admission.Update) {
  431. userInfo, _ := api.UserFrom(ctx)
  432. return admit.Admit(admission.NewAttributesRecord(updatedObject, currentObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
  433. }
  434. return nil
  435. }
  436. result, err := patchResource(ctx, updateAdmit, timeout, versionedObj, r, name, patchType, patchJS, scope.Namer, scope.Copier, scope.Resource, codec)
  437. if err != nil {
  438. scope.err(err, res.ResponseWriter, req.Request)
  439. return
  440. }
  441. if err := setSelfLink(result, req, scope.Namer); err != nil {
  442. scope.err(err, res.ResponseWriter, req.Request)
  443. return
  444. }
  445. write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  446. }
  447. }
  448. type updateAdmissionFunc func(updatedObject runtime.Object, currentObject runtime.Object) error
  449. // patchResource divides PatchResource for easier unit testing
  450. func patchResource(
  451. ctx api.Context,
  452. admit updateAdmissionFunc,
  453. timeout time.Duration,
  454. versionedObj runtime.Object,
  455. patcher rest.Patcher,
  456. name string,
  457. patchType api.PatchType,
  458. patchJS []byte,
  459. namer ScopeNamer,
  460. copier runtime.ObjectCopier,
  461. resource unversioned.GroupVersionResource,
  462. codec runtime.Codec,
  463. ) (runtime.Object, error) {
  464. namespace := api.NamespaceValue(ctx)
  465. var (
  466. originalObjJS []byte
  467. originalPatchedObjJS []byte
  468. lastConflictErr error
  469. )
  470. // applyPatch is called every time GuaranteedUpdate asks for the updated object,
  471. // and is given the currently persisted object as input.
  472. applyPatch := func(_ api.Context, _, currentObject runtime.Object) (runtime.Object, error) {
  473. // Make sure we actually have a persisted currentObject
  474. if hasUID, err := hasUID(currentObject); err != nil {
  475. return nil, err
  476. } else if !hasUID {
  477. return nil, errors.NewNotFound(resource.GroupResource(), name)
  478. }
  479. switch {
  480. case len(originalObjJS) == 0 || len(originalPatchedObjJS) == 0:
  481. // first time through,
  482. // 1. apply the patch
  483. // 2. save the originalJS and patchedJS to detect whether there were conflicting changes on retries
  484. if js, err := runtime.Encode(codec, currentObject); err != nil {
  485. return nil, err
  486. } else {
  487. originalObjJS = js
  488. }
  489. if js, err := getPatchedJS(patchType, originalObjJS, patchJS, versionedObj); err != nil {
  490. return nil, err
  491. } else {
  492. originalPatchedObjJS = js
  493. }
  494. objToUpdate := patcher.New()
  495. if err := runtime.DecodeInto(codec, originalPatchedObjJS, objToUpdate); err != nil {
  496. return nil, err
  497. }
  498. if err := checkName(objToUpdate, name, namespace, namer); err != nil {
  499. return nil, err
  500. }
  501. return objToUpdate, nil
  502. default:
  503. // on a conflict,
  504. // 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can
  505. // be specified, but a strategic merge patch should be expressive enough handle them. Build the
  506. // patch with this type to handle those cases.
  507. // 2. build a strategic merge patch from originalJS and the currentJS
  508. // 3. ensure no conflicts between the two patches
  509. // 4. apply the #1 patch to the currentJS object
  510. currentObjectJS, err := runtime.Encode(codec, currentObject)
  511. if err != nil {
  512. return nil, err
  513. }
  514. currentPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, currentObjectJS, versionedObj)
  515. if err != nil {
  516. return nil, err
  517. }
  518. originalPatch, err := strategicpatch.CreateStrategicMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
  519. if err != nil {
  520. return nil, err
  521. }
  522. diff1 := make(map[string]interface{})
  523. if err := json.Unmarshal(originalPatch, &diff1); err != nil {
  524. return nil, err
  525. }
  526. diff2 := make(map[string]interface{})
  527. if err := json.Unmarshal(currentPatch, &diff2); err != nil {
  528. return nil, err
  529. }
  530. hasConflicts, err := strategicpatch.HasConflicts(diff1, diff2)
  531. if err != nil {
  532. return nil, err
  533. }
  534. if hasConflicts {
  535. glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict.\n diff1=%v\n, diff2=%v\n", name, diff1, diff2)
  536. // Return the last conflict error we got if we have one
  537. if lastConflictErr != nil {
  538. return nil, lastConflictErr
  539. }
  540. // Otherwise manufacture one of our own
  541. return nil, errors.NewConflict(resource.GroupResource(), name, nil)
  542. }
  543. newlyPatchedObjJS, err := getPatchedJS(api.StrategicMergePatchType, currentObjectJS, originalPatch, versionedObj)
  544. if err != nil {
  545. return nil, err
  546. }
  547. objToUpdate := patcher.New()
  548. if err := runtime.DecodeInto(codec, newlyPatchedObjJS, objToUpdate); err != nil {
  549. return nil, err
  550. }
  551. return objToUpdate, nil
  552. }
  553. }
  554. // applyAdmission is called every time GuaranteedUpdate asks for the updated object,
  555. // and is given the currently persisted object and the patched object as input.
  556. applyAdmission := func(ctx api.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) {
  557. return patchedObject, admit(patchedObject, currentObject)
  558. }
  559. updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, copier, applyPatch, applyAdmission)
  560. return finishRequest(timeout, func() (runtime.Object, error) {
  561. updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
  562. for i := 0; i < MaxPatchConflicts && (errors.IsConflict(updateErr)); i++ {
  563. lastConflictErr = updateErr
  564. updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo)
  565. }
  566. return updateObject, updateErr
  567. })
  568. }
  569. // UpdateResource returns a function that will handle a resource update
  570. func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectTyper, admit admission.Interface) restful.RouteFunction {
  571. return func(req *restful.Request, res *restful.Response) {
  572. // For performance tracking purposes.
  573. trace := util.NewTrace("Update " + req.Request.URL.Path)
  574. defer trace.LogIfLong(250 * time.Millisecond)
  575. w := res.ResponseWriter
  576. // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
  577. timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
  578. namespace, name, err := scope.Namer.Name(req)
  579. if err != nil {
  580. scope.err(err, res.ResponseWriter, req.Request)
  581. return
  582. }
  583. ctx := scope.ContextFunc(req)
  584. ctx = api.WithNamespace(ctx, namespace)
  585. body, err := readBody(req.Request)
  586. if err != nil {
  587. scope.err(err, res.ResponseWriter, req.Request)
  588. return
  589. }
  590. s, err := negotiateInputSerializer(req.Request, scope.Serializer)
  591. if err != nil {
  592. scope.err(err, res.ResponseWriter, req.Request)
  593. return
  594. }
  595. defaultGVK := scope.Kind
  596. original := r.New()
  597. trace.Step("About to convert to expected version")
  598. obj, gvk, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, original)
  599. if err != nil {
  600. err = transformDecodeError(typer, err, original, gvk, body)
  601. scope.err(err, res.ResponseWriter, req.Request)
  602. return
  603. }
  604. if gvk.GroupVersion() != defaultGVK.GroupVersion() {
  605. err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%s)", gvk.GroupVersion(), defaultGVK.GroupVersion()))
  606. scope.err(err, res.ResponseWriter, req.Request)
  607. return
  608. }
  609. trace.Step("Conversion done")
  610. if err := checkName(obj, name, namespace, scope.Namer); err != nil {
  611. scope.err(err, res.ResponseWriter, req.Request)
  612. return
  613. }
  614. var transformers []rest.TransformFunc
  615. if admit != nil && admit.Handles(admission.Update) {
  616. transformers = append(transformers, func(ctx api.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
  617. userInfo, _ := api.UserFrom(ctx)
  618. return newObj, admit.Admit(admission.NewAttributesRecord(newObj, oldObj, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
  619. })
  620. }
  621. trace.Step("About to store object in database")
  622. wasCreated := false
  623. result, err := finishRequest(timeout, func() (runtime.Object, error) {
  624. obj, created, err := r.Update(ctx, name, rest.DefaultUpdatedObjectInfo(obj, scope.Copier, transformers...))
  625. wasCreated = created
  626. return obj, err
  627. })
  628. if err != nil {
  629. scope.err(err, res.ResponseWriter, req.Request)
  630. return
  631. }
  632. trace.Step("Object stored in database")
  633. if err := setSelfLink(result, req, scope.Namer); err != nil {
  634. scope.err(err, res.ResponseWriter, req.Request)
  635. return
  636. }
  637. trace.Step("Self-link added")
  638. status := http.StatusOK
  639. if wasCreated {
  640. status = http.StatusCreated
  641. }
  642. write(status, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  643. }
  644. }
  645. // DeleteResource returns a function that will handle a resource deletion
  646. func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
  647. return func(req *restful.Request, res *restful.Response) {
  648. // For performance tracking purposes.
  649. trace := util.NewTrace("Delete " + req.Request.URL.Path)
  650. defer trace.LogIfLong(250 * time.Millisecond)
  651. w := res.ResponseWriter
  652. // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
  653. timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
  654. namespace, name, err := scope.Namer.Name(req)
  655. if err != nil {
  656. scope.err(err, res.ResponseWriter, req.Request)
  657. return
  658. }
  659. ctx := scope.ContextFunc(req)
  660. ctx = api.WithNamespace(ctx, namespace)
  661. options := &api.DeleteOptions{}
  662. if checkBody {
  663. body, err := readBody(req.Request)
  664. if err != nil {
  665. scope.err(err, res.ResponseWriter, req.Request)
  666. return
  667. }
  668. if len(body) > 0 {
  669. s, err := negotiateInputSerializer(req.Request, scope.Serializer)
  670. if err != nil {
  671. scope.err(err, res.ResponseWriter, req.Request)
  672. return
  673. }
  674. defaultGVK := scope.Kind.GroupVersion().WithKind("DeleteOptions")
  675. obj, _, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, options)
  676. if err != nil {
  677. scope.err(err, res.ResponseWriter, req.Request)
  678. return
  679. }
  680. if obj != options {
  681. scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), res.ResponseWriter, req.Request)
  682. return
  683. }
  684. }
  685. }
  686. if admit != nil && admit.Handles(admission.Delete) {
  687. userInfo, _ := api.UserFrom(ctx)
  688. err = admit.Admit(admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, userInfo))
  689. if err != nil {
  690. scope.err(err, res.ResponseWriter, req.Request)
  691. return
  692. }
  693. }
  694. trace.Step("About do delete object from database")
  695. result, err := finishRequest(timeout, func() (runtime.Object, error) {
  696. return r.Delete(ctx, name, options)
  697. })
  698. if err != nil {
  699. scope.err(err, res.ResponseWriter, req.Request)
  700. return
  701. }
  702. trace.Step("Object deleted from database")
  703. // if the rest.Deleter returns a nil object, fill out a status. Callers may return a valid
  704. // object with the response.
  705. if result == nil {
  706. result = &unversioned.Status{
  707. Status: unversioned.StatusSuccess,
  708. Code: http.StatusOK,
  709. Details: &unversioned.StatusDetails{
  710. Name: name,
  711. Kind: scope.Kind.Kind,
  712. },
  713. }
  714. } else {
  715. // when a non-status response is returned, set the self link
  716. if _, ok := result.(*unversioned.Status); !ok {
  717. if err := setSelfLink(result, req, scope.Namer); err != nil {
  718. scope.err(err, res.ResponseWriter, req.Request)
  719. return
  720. }
  721. }
  722. }
  723. write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
  724. }
  725. }
  726. // DeleteCollection returns a function that will handle a collection deletion
  727. func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
  728. return func(req *restful.Request, res *restful.Response) {
  729. w := res.ResponseWriter
  730. // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
  731. timeout := parseTimeout(req.Request.URL.Query().Get("timeout"))
  732. namespace, err := scope.Namer.Namespace(req)
  733. if err != nil {
  734. scope.err(err, res.ResponseWriter, req.Request)
  735. return
  736. }
  737. ctx := scope.ContextFunc(req)
  738. ctx = api.WithNamespace(ctx, namespace)
  739. if admit != nil && admit.Handles(admission.Delete) {
  740. userInfo, _ := api.UserFrom(ctx)
  741. err = admit.Admit(admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, userInfo))
  742. if err != nil {
  743. scope.err(err, res.ResponseWriter, req.Request)
  744. return
  745. }
  746. }
  747. listOptions := api.ListOptions{}
  748. if err := scope.ParameterCodec.DecodeParameters(req.Request.URL.Query(), scope.Kind.GroupVersion(), &listOptions); err != nil {
  749. scope.err(err, res.ResponseWriter, req.Request)
  750. return
  751. }
  752. // transform fields
  753. // TODO: DecodeParametersInto should do this.
  754. if listOptions.FieldSelector != nil {
  755. fn := func(label, value string) (newLabel, newValue string, err error) {
  756. return scope.Convertor.ConvertFieldLabel(scope.Kind.GroupVersion().String(), scope.Kind.Kind, label, value)
  757. }
  758. if listOptions.FieldSelector, err = listOptions.FieldSelector.Transform(fn); err != nil {
  759. // TODO: allow bad request to set field causes based on query parameters
  760. err = errors.NewBadRequest(err.Error())
  761. scope.err(err, res.ResponseWriter, req.Request)
  762. return
  763. }
  764. }
  765. options := &api.DeleteOptions{}
  766. if checkBody {
  767. body, err := readBody(req.Request)
  768. if err != nil {
  769. scope.err(err, res.ResponseWriter, req.Request)
  770. return
  771. }
  772. if len(body) > 0 {
  773. s, err := negotiateInputSerializer(req.Request, scope.Serializer)
  774. if err != nil {
  775. scope.err(err, res.ResponseWriter, req.Request)
  776. return
  777. }
  778. defaultGVK := scope.Kind.GroupVersion().WithKind("DeleteOptions")
  779. obj, _, err := scope.Serializer.DecoderToVersion(s, defaultGVK.GroupVersion()).Decode(body, &defaultGVK, options)
  780. if err != nil {
  781. scope.err(err, res.ResponseWriter, req.Request)
  782. return
  783. }
  784. if obj != options {
  785. scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), res.ResponseWriter, req.Request)
  786. return
  787. }
  788. }
  789. }
  790. result, err := finishRequest(timeout, func() (runtime.Object, error) {
  791. return r.DeleteCollection(ctx, options, &listOptions)
  792. })
  793. if err != nil {
  794. scope.err(err, res.ResponseWriter, req.Request)
  795. return
  796. }
  797. // if the rest.Deleter returns a nil object, fill out a status. Callers may return a valid
  798. // object with the response.
  799. if result == nil {
  800. result = &unversioned.Status{
  801. Status: unversioned.StatusSuccess,
  802. Code: http.StatusOK,
  803. Details: &unversioned.StatusDetails{
  804. Kind: scope.Kind.Kind,
  805. },
  806. }
  807. } else {
  808. // when a non-status response is returned, set the self link
  809. if _, ok := result.(*unversioned.Status); !ok {
  810. if _, err := setListSelfLink(result, req, scope.Namer); err != nil {
  811. scope.err(err, res.ResponseWriter, req.Request)
  812. return
  813. }
  814. }
  815. }
  816. writeNegotiated(scope.Serializer, scope.Kind.GroupVersion(), w, req.Request, http.StatusOK, result)
  817. }
  818. }
  819. // resultFunc is a function that returns a rest result and can be run in a goroutine
  820. type resultFunc func() (runtime.Object, error)
  821. // finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
  822. // Any api.Status object returned is considered an "error", which interrupts the normal response flow.
  823. func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) {
  824. // these channels need to be buffered to prevent the goroutine below from hanging indefinitely
  825. // when the select statement reads something other than the one the goroutine sends on.
  826. ch := make(chan runtime.Object, 1)
  827. errCh := make(chan error, 1)
  828. panicCh := make(chan interface{}, 1)
  829. go func() {
  830. // panics don't cross goroutine boundaries, so we have to handle ourselves
  831. defer utilruntime.HandleCrash(func(panicReason interface{}) {
  832. // Propagate to parent goroutine
  833. panicCh <- panicReason
  834. })
  835. if result, err := fn(); err != nil {
  836. errCh <- err
  837. } else {
  838. ch <- result
  839. }
  840. }()
  841. select {
  842. case result = <-ch:
  843. if status, ok := result.(*unversioned.Status); ok {
  844. return nil, errors.FromObject(status)
  845. }
  846. return result, nil
  847. case err = <-errCh:
  848. return nil, err
  849. case p := <-panicCh:
  850. panic(p)
  851. case <-time.After(timeout):
  852. return nil, errors.NewTimeoutError("request did not complete within allowed duration", 0)
  853. }
  854. }
  855. // transformDecodeError adds additional information when a decode fails.
  856. func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind, body []byte) error {
  857. objGVKs, _, err := typer.ObjectKinds(into)
  858. if err != nil {
  859. return err
  860. }
  861. objGVK := objGVKs[0]
  862. if gvk != nil && len(gvk.Kind) > 0 {
  863. return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr))
  864. }
  865. summary := summarizeData(body, 30)
  866. return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v (%s)", objGVK.Kind, baseErr, summary))
  867. }
  868. // setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request
  869. // plus the path and query generated by the provided linkFunc
  870. func setSelfLink(obj runtime.Object, req *restful.Request, namer ScopeNamer) error {
  871. // TODO: SelfLink generation should return a full URL?
  872. path, query, err := namer.GenerateLink(req, obj)
  873. if err != nil {
  874. return nil
  875. }
  876. newURL := *req.Request.URL
  877. // use only canonical paths
  878. newURL.Path = path
  879. newURL.RawQuery = query
  880. newURL.Fragment = ""
  881. return namer.SetSelfLink(obj, newURL.String())
  882. }
  883. func hasUID(obj runtime.Object) (bool, error) {
  884. if obj == nil {
  885. return false, nil
  886. }
  887. accessor, err := meta.Accessor(obj)
  888. if err != nil {
  889. return false, errors.NewInternalError(err)
  890. }
  891. if len(accessor.GetUID()) == 0 {
  892. return false, nil
  893. }
  894. return true, nil
  895. }
  896. // checkName checks the provided name against the request
  897. func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) error {
  898. if objNamespace, objName, err := namer.ObjectName(obj); err == nil {
  899. if err != nil {
  900. return err
  901. }
  902. if objName != name {
  903. return errors.NewBadRequest(fmt.Sprintf(
  904. "the name of the object (%s) does not match the name on the URL (%s)", objName, name))
  905. }
  906. if len(namespace) > 0 {
  907. if len(objNamespace) > 0 && objNamespace != namespace {
  908. return errors.NewBadRequest(fmt.Sprintf(
  909. "the namespace of the object (%s) does not match the namespace on the request (%s)", objNamespace, namespace))
  910. }
  911. }
  912. }
  913. return nil
  914. }
  915. // setListSelfLink sets the self link of a list to the base URL, then sets the self links
  916. // on all child objects returned. Returns the number of items in the list.
  917. func setListSelfLink(obj runtime.Object, req *restful.Request, namer ScopeNamer) (int, error) {
  918. if !meta.IsListType(obj) {
  919. return 0, nil
  920. }
  921. // TODO: List SelfLink generation should return a full URL?
  922. path, query, err := namer.GenerateListLink(req)
  923. if err != nil {
  924. return 0, err
  925. }
  926. newURL := *req.Request.URL
  927. newURL.Path = path
  928. newURL.RawQuery = query
  929. // use the path that got us here
  930. newURL.Fragment = ""
  931. if err := namer.SetSelfLink(obj, newURL.String()); err != nil {
  932. glog.V(4).Infof("Unable to set self link on object: %v", err)
  933. }
  934. // Set self-link of objects in the list.
  935. items, err := meta.ExtractList(obj)
  936. if err != nil {
  937. return 0, err
  938. }
  939. for i := range items {
  940. if err := setSelfLink(items[i], req, namer); err != nil {
  941. return len(items), err
  942. }
  943. }
  944. return len(items), meta.SetList(obj, items)
  945. }
  946. func getPatchedJS(patchType api.PatchType, originalJS, patchJS []byte, obj runtime.Object) ([]byte, error) {
  947. switch patchType {
  948. case api.JSONPatchType:
  949. patchObj, err := jsonpatch.DecodePatch(patchJS)
  950. if err != nil {
  951. return nil, err
  952. }
  953. return patchObj.Apply(originalJS)
  954. case api.MergePatchType:
  955. return jsonpatch.MergePatch(originalJS, patchJS)
  956. case api.StrategicMergePatchType:
  957. return strategicpatch.StrategicMergePatchData(originalJS, patchJS, obj)
  958. default:
  959. // only here as a safety net - go-restful filters content-type
  960. return nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
  961. }
  962. }
  963. func summarizeData(data []byte, maxLength int) string {
  964. switch {
  965. case len(data) == 0:
  966. return "<empty>"
  967. case data[0] == '{':
  968. if len(data) > maxLength {
  969. return string(data[:maxLength]) + " ..."
  970. }
  971. return string(data)
  972. default:
  973. if len(data) > maxLength {
  974. return hex.EncodeToString(data[:maxLength]) + " ..."
  975. }
  976. return hex.EncodeToString(data)
  977. }
  978. }