visitor.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resource
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "net/url"
  20. "os"
  21. "path/filepath"
  22. "time"
  23. "k8s.io/kubernetes/pkg/api/meta"
  24. "k8s.io/kubernetes/pkg/api/unversioned"
  25. "k8s.io/kubernetes/pkg/api/validation"
  26. "k8s.io/kubernetes/pkg/runtime"
  27. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  28. "k8s.io/kubernetes/pkg/util/yaml"
  29. "k8s.io/kubernetes/pkg/watch"
  30. )
  31. const (
  32. constSTDINstr string = "STDIN"
  33. stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
  34. )
  35. // Visitor lets clients walk a list of resources.
  36. type Visitor interface {
  37. Visit(VisitorFunc) error
  38. }
  39. // VisitorFunc implements the Visitor interface for a matching function.
  40. // If there was a problem walking a list of resources, the incoming error
  41. // will describe the problem and the function can decide how to handle that error.
  42. // A nil returned indicates to accept an error to continue loops even when errors happen.
  43. // This is useful for ignoring certain kinds of errors or aggregating errors in some way.
  44. type VisitorFunc func(*Info, error) error
  45. // Watchable describes a resource that can be watched for changes that occur on the server,
  46. // beginning after the provided resource version.
  47. type Watchable interface {
  48. Watch(resourceVersion string) (watch.Interface, error)
  49. }
  50. // ResourceMapping allows an object to return the resource mapping associated with
  51. // the resource or resources it represents.
  52. type ResourceMapping interface {
  53. ResourceMapping() *meta.RESTMapping
  54. }
  55. // Info contains temporary info to execute a REST call, or show the results
  56. // of an already completed REST call.
  57. type Info struct {
  58. Client RESTClient
  59. Mapping *meta.RESTMapping
  60. Namespace string
  61. Name string
  62. // Optional, Source is the filename or URL to template file (.json or .yaml),
  63. // or stdin to use to handle the resource
  64. Source string
  65. // Optional, this is the provided object in a versioned type before defaulting
  66. // and conversions into its corresponding internal type. This is useful for
  67. // reflecting on user intent which may be lost after defaulting and conversions.
  68. VersionedObject interface{}
  69. // Optional, this is the most recent value returned by the server if available
  70. runtime.Object
  71. // Optional, this is the most recent resource version the server knows about for
  72. // this type of resource. It may not match the resource version of the object,
  73. // but if set it should be equal to or newer than the resource version of the
  74. // object (however the server defines resource version).
  75. ResourceVersion string
  76. // Optional, should this resource be exported, stripped of cluster-specific and instance specific fields
  77. Export bool
  78. }
  79. // NewInfo returns a new info object
  80. func NewInfo(client RESTClient, mapping *meta.RESTMapping, namespace, name string, export bool) *Info {
  81. return &Info{
  82. Client: client,
  83. Mapping: mapping,
  84. Namespace: namespace,
  85. Name: name,
  86. Export: export,
  87. }
  88. }
  89. // Visit implements Visitor
  90. func (i *Info) Visit(fn VisitorFunc) error {
  91. return fn(i, nil)
  92. }
  93. // Get retrieves the object from the Namespace and Name fields
  94. func (i *Info) Get() (err error) {
  95. obj, err := NewHelper(i.Client, i.Mapping).Get(i.Namespace, i.Name, i.Export)
  96. if err != nil {
  97. return err
  98. }
  99. i.Object = obj
  100. i.ResourceVersion, _ = i.Mapping.MetadataAccessor.ResourceVersion(obj)
  101. return nil
  102. }
  103. // Refresh updates the object with another object. If ignoreError is set
  104. // the Object will be updated even if name, namespace, or resourceVersion
  105. // attributes cannot be loaded from the object.
  106. func (i *Info) Refresh(obj runtime.Object, ignoreError bool) error {
  107. name, err := i.Mapping.MetadataAccessor.Name(obj)
  108. if err != nil {
  109. if !ignoreError {
  110. return err
  111. }
  112. } else {
  113. i.Name = name
  114. }
  115. namespace, err := i.Mapping.MetadataAccessor.Namespace(obj)
  116. if err != nil {
  117. if !ignoreError {
  118. return err
  119. }
  120. } else {
  121. i.Namespace = namespace
  122. }
  123. version, err := i.Mapping.MetadataAccessor.ResourceVersion(obj)
  124. if err != nil {
  125. if !ignoreError {
  126. return err
  127. }
  128. } else {
  129. i.ResourceVersion = version
  130. }
  131. i.Object = obj
  132. return nil
  133. }
  134. // Namespaced returns true if the object belongs to a namespace
  135. func (i *Info) Namespaced() bool {
  136. return i.Mapping != nil && i.Mapping.Scope.Name() == meta.RESTScopeNameNamespace
  137. }
  138. // Watch returns server changes to this object after it was retrieved.
  139. func (i *Info) Watch(resourceVersion string) (watch.Interface, error) {
  140. return NewHelper(i.Client, i.Mapping).WatchSingle(i.Namespace, i.Name, resourceVersion)
  141. }
  142. // ResourceMapping returns the mapping for this resource and implements ResourceMapping
  143. func (i *Info) ResourceMapping() *meta.RESTMapping {
  144. return i.Mapping
  145. }
  146. // VisitorList implements Visit for the sub visitors it contains. The first error
  147. // returned from a child Visitor will terminate iteration.
  148. type VisitorList []Visitor
  149. // Visit implements Visitor
  150. func (l VisitorList) Visit(fn VisitorFunc) error {
  151. for i := range l {
  152. if err := l[i].Visit(fn); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. // EagerVisitorList implements Visit for the sub visitors it contains. All errors
  159. // will be captured and returned at the end of iteration.
  160. type EagerVisitorList []Visitor
  161. // Visit implements Visitor, and gathers errors that occur during processing until
  162. // all sub visitors have been visited.
  163. func (l EagerVisitorList) Visit(fn VisitorFunc) error {
  164. errs := []error(nil)
  165. for i := range l {
  166. if err := l[i].Visit(func(info *Info, err error) error {
  167. if err != nil {
  168. errs = append(errs, err)
  169. return nil
  170. }
  171. if err := fn(info, nil); err != nil {
  172. errs = append(errs, err)
  173. }
  174. return nil
  175. }); err != nil {
  176. errs = append(errs, err)
  177. }
  178. }
  179. return utilerrors.NewAggregate(errs)
  180. }
  181. func ValidateSchema(data []byte, schema validation.Schema) error {
  182. if schema == nil {
  183. return nil
  184. }
  185. if err := schema.ValidateBytes(data); err != nil {
  186. return fmt.Errorf("error validating data: %v; %s", err, stopValidateMessage)
  187. }
  188. return nil
  189. }
  190. // URLVisitor downloads the contents of a URL, and if successful, returns
  191. // an info object representing the downloaded object.
  192. type URLVisitor struct {
  193. URL *url.URL
  194. *StreamVisitor
  195. HttpAttemptCount int
  196. }
  197. func (v *URLVisitor) Visit(fn VisitorFunc) error {
  198. body, err := readHttpWithRetries(httpgetImpl, time.Second, v.URL.String(), v.HttpAttemptCount)
  199. if err != nil {
  200. return err
  201. }
  202. defer body.Close()
  203. v.StreamVisitor.Reader = body
  204. return v.StreamVisitor.Visit(fn)
  205. }
  206. // readHttpWithRetries tries to http.Get the v.URL retries times before giving up.
  207. func readHttpWithRetries(get httpget, duration time.Duration, u string, attempts int) (io.ReadCloser, error) {
  208. var err error
  209. var body io.ReadCloser
  210. if attempts <= 0 {
  211. return nil, fmt.Errorf("http attempts must be greater than 0, was %d", attempts)
  212. }
  213. for i := 0; i < attempts; i++ {
  214. var statusCode int
  215. var status string
  216. if i > 0 {
  217. time.Sleep(duration)
  218. }
  219. // Try to get the URL
  220. statusCode, status, body, err = get(u)
  221. // Retry Errors
  222. if err != nil {
  223. continue
  224. }
  225. // Error - Set the error condition from the StatusCode
  226. if statusCode != 200 {
  227. err = fmt.Errorf("unable to read URL %q, server reported %s, status code=%d", u, status, statusCode)
  228. }
  229. if statusCode >= 500 && statusCode < 600 {
  230. // Retry 500's
  231. continue
  232. } else {
  233. // Don't retry other StatusCodes
  234. break
  235. }
  236. }
  237. return body, err
  238. }
  239. // httpget Defines function to retrieve a url and return the results. Exists for unit test stubbing.
  240. type httpget func(url string) (int, string, io.ReadCloser, error)
  241. // httpgetImpl Implements a function to retrieve a url and return the results.
  242. func httpgetImpl(url string) (int, string, io.ReadCloser, error) {
  243. resp, err := http.Get(url)
  244. if err != nil {
  245. return 0, "", nil, err
  246. }
  247. return resp.StatusCode, resp.Status, resp.Body, nil
  248. }
  249. // DecoratedVisitor will invoke the decorators in order prior to invoking the visitor function
  250. // passed to Visit. An error will terminate the visit.
  251. type DecoratedVisitor struct {
  252. visitor Visitor
  253. decorators []VisitorFunc
  254. }
  255. // NewDecoratedVisitor will create a visitor that invokes the provided visitor functions before
  256. // the user supplied visitor function is invoked, giving them the opportunity to mutate the Info
  257. // object or terminate early with an error.
  258. func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
  259. if len(fn) == 0 {
  260. return v
  261. }
  262. return DecoratedVisitor{v, fn}
  263. }
  264. // Visit implements Visitor
  265. func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
  266. return v.visitor.Visit(func(info *Info, err error) error {
  267. if err != nil {
  268. return err
  269. }
  270. for i := range v.decorators {
  271. if err := v.decorators[i](info, nil); err != nil {
  272. return err
  273. }
  274. }
  275. return fn(info, nil)
  276. })
  277. }
  278. // ContinueOnErrorVisitor visits each item and, if an error occurs on
  279. // any individual item, returns an aggregate error after all items
  280. // are visited.
  281. type ContinueOnErrorVisitor struct {
  282. Visitor
  283. }
  284. // Visit returns nil if no error occurs during traversal, a regular
  285. // error if one occurs, or if multiple errors occur, an aggregate
  286. // error. If the provided visitor fails on any individual item it
  287. // will not prevent the remaining items from being visited. An error
  288. // returned by the visitor directly may still result in some items
  289. // not being visited.
  290. func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
  291. errs := []error{}
  292. err := v.Visitor.Visit(func(info *Info, err error) error {
  293. if err != nil {
  294. errs = append(errs, err)
  295. return nil
  296. }
  297. if err := fn(info, nil); err != nil {
  298. errs = append(errs, err)
  299. }
  300. return nil
  301. })
  302. if err != nil {
  303. errs = append(errs, err)
  304. }
  305. if len(errs) == 1 {
  306. return errs[0]
  307. }
  308. return utilerrors.NewAggregate(errs)
  309. }
  310. // FlattenListVisitor flattens any objects that runtime.ExtractList recognizes as a list
  311. // - has an "Items" public field that is a slice of runtime.Objects or objects satisfying
  312. // that interface - into multiple Infos. An error on any sub item (for instance, if a List
  313. // contains an object that does not have a registered client or resource) will terminate
  314. // the visit.
  315. // TODO: allow errors to be aggregated?
  316. type FlattenListVisitor struct {
  317. Visitor
  318. *Mapper
  319. }
  320. // NewFlattenListVisitor creates a visitor that will expand list style runtime.Objects
  321. // into individual items and then visit them individually.
  322. func NewFlattenListVisitor(v Visitor, mapper *Mapper) Visitor {
  323. return FlattenListVisitor{v, mapper}
  324. }
  325. func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
  326. return v.Visitor.Visit(func(info *Info, err error) error {
  327. if err != nil {
  328. return err
  329. }
  330. if info.Object == nil {
  331. return fn(info, nil)
  332. }
  333. items, err := meta.ExtractList(info.Object)
  334. if err != nil {
  335. return fn(info, nil)
  336. }
  337. if errs := runtime.DecodeList(items, struct {
  338. runtime.ObjectTyper
  339. runtime.Decoder
  340. }{v.Mapper, v.Mapper.Decoder}); len(errs) > 0 {
  341. return utilerrors.NewAggregate(errs)
  342. }
  343. // If we have a GroupVersionKind on the list, prioritize that when asking for info on the objects contained in the list
  344. var preferredGVKs []unversioned.GroupVersionKind
  345. if info.Mapping != nil && !info.Mapping.GroupVersionKind.Empty() {
  346. preferredGVKs = append(preferredGVKs, info.Mapping.GroupVersionKind)
  347. }
  348. for i := range items {
  349. item, err := v.InfoForObject(items[i], preferredGVKs)
  350. if err != nil {
  351. return err
  352. }
  353. if len(info.ResourceVersion) != 0 {
  354. item.ResourceVersion = info.ResourceVersion
  355. }
  356. if err := fn(item, nil); err != nil {
  357. return err
  358. }
  359. }
  360. return nil
  361. })
  362. }
  363. func ignoreFile(path string, extensions []string) bool {
  364. if len(extensions) == 0 {
  365. return false
  366. }
  367. ext := filepath.Ext(path)
  368. for _, s := range extensions {
  369. if s == ext {
  370. return false
  371. }
  372. }
  373. return true
  374. }
  375. // FileVisitorForSTDIN return a special FileVisitor just for STDIN
  376. func FileVisitorForSTDIN(mapper *Mapper, schema validation.Schema) Visitor {
  377. return &FileVisitor{
  378. Path: constSTDINstr,
  379. StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema),
  380. }
  381. }
  382. // ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path.
  383. // After FileVisitors open the files, they will pass an io.Reader to a StreamVisitor to do the reading. (stdin
  384. // is also taken care of). Paths argument also accepts a single file, and will return a single visitor
  385. func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, schema validation.Schema) ([]Visitor, error) {
  386. var visitors []Visitor
  387. err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
  388. if err != nil {
  389. return err
  390. }
  391. if fi.IsDir() {
  392. if path != paths && !recursive {
  393. return filepath.SkipDir
  394. }
  395. return nil
  396. }
  397. // Don't check extension if the filepath was passed explicitly
  398. if path != paths && ignoreFile(path, extensions) {
  399. return nil
  400. }
  401. visitor := &FileVisitor{
  402. Path: path,
  403. StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
  404. }
  405. visitors = append(visitors, visitor)
  406. return nil
  407. })
  408. if err != nil {
  409. return nil, err
  410. }
  411. return visitors, nil
  412. }
  413. // FileVisitor is wrapping around a StreamVisitor, to handle open/close files
  414. type FileVisitor struct {
  415. Path string
  416. *StreamVisitor
  417. }
  418. // Visit in a FileVisitor is just taking care of opening/closing files
  419. func (v *FileVisitor) Visit(fn VisitorFunc) error {
  420. var f *os.File
  421. if v.Path == constSTDINstr {
  422. f = os.Stdin
  423. } else {
  424. var err error
  425. if f, err = os.Open(v.Path); err != nil {
  426. return err
  427. }
  428. }
  429. defer f.Close()
  430. v.StreamVisitor.Reader = f
  431. return v.StreamVisitor.Visit(fn)
  432. }
  433. // StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be
  434. // visited once.
  435. // TODO: depends on objects being in JSON format before being passed to decode - need to implement
  436. // a stream decoder method on runtime.Codec to properly handle this.
  437. type StreamVisitor struct {
  438. io.Reader
  439. *Mapper
  440. Source string
  441. Schema validation.Schema
  442. }
  443. // NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
  444. func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, schema validation.Schema) *StreamVisitor {
  445. return &StreamVisitor{
  446. Reader: r,
  447. Mapper: mapper,
  448. Source: source,
  449. Schema: schema,
  450. }
  451. }
  452. // Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream.
  453. func (v *StreamVisitor) Visit(fn VisitorFunc) error {
  454. d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
  455. for {
  456. ext := runtime.RawExtension{}
  457. if err := d.Decode(&ext); err != nil {
  458. if err == io.EOF {
  459. return nil
  460. }
  461. return err
  462. }
  463. // TODO: This needs to be able to handle object in other encodings and schemas.
  464. ext.Raw = bytes.TrimSpace(ext.Raw)
  465. if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
  466. continue
  467. }
  468. if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
  469. return fmt.Errorf("error validating %q: %v", v.Source, err)
  470. }
  471. info, err := v.InfoForData(ext.Raw, v.Source)
  472. if err != nil {
  473. if fnErr := fn(info, err); fnErr != nil {
  474. return fnErr
  475. }
  476. continue
  477. }
  478. if err := fn(info, nil); err != nil {
  479. return err
  480. }
  481. }
  482. }
  483. func UpdateObjectNamespace(info *Info, err error) error {
  484. if err != nil {
  485. return err
  486. }
  487. if info.Object != nil {
  488. return info.Mapping.MetadataAccessor.SetNamespace(info.Object, info.Namespace)
  489. }
  490. return nil
  491. }
  492. // FilterNamespace omits the namespace if the object is not namespace scoped
  493. func FilterNamespace(info *Info, err error) error {
  494. if err != nil {
  495. return err
  496. }
  497. if !info.Namespaced() {
  498. info.Namespace = ""
  499. UpdateObjectNamespace(info, nil)
  500. }
  501. return nil
  502. }
  503. // SetNamespace ensures that every Info object visited will have a namespace
  504. // set. If info.Object is set, it will be mutated as well.
  505. func SetNamespace(namespace string) VisitorFunc {
  506. return func(info *Info, err error) error {
  507. if err != nil {
  508. return err
  509. }
  510. if !info.Namespaced() {
  511. return nil
  512. }
  513. if len(info.Namespace) == 0 {
  514. info.Namespace = namespace
  515. UpdateObjectNamespace(info, nil)
  516. }
  517. return nil
  518. }
  519. }
  520. // RequireNamespace will either set a namespace if none is provided on the
  521. // Info object, or if the namespace is set and does not match the provided
  522. // value, returns an error. This is intended to guard against administrators
  523. // accidentally operating on resources outside their namespace.
  524. func RequireNamespace(namespace string) VisitorFunc {
  525. return func(info *Info, err error) error {
  526. if err != nil {
  527. return err
  528. }
  529. if !info.Namespaced() {
  530. return nil
  531. }
  532. if len(info.Namespace) == 0 {
  533. info.Namespace = namespace
  534. UpdateObjectNamespace(info, nil)
  535. return nil
  536. }
  537. if info.Namespace != namespace {
  538. return fmt.Errorf("the namespace from the provided object %q does not match the namespace %q. You must pass '--namespace=%s' to perform this operation.", info.Namespace, namespace, info.Namespace)
  539. }
  540. return nil
  541. }
  542. }
  543. // RetrieveLatest updates the Object on each Info by invoking a standard client
  544. // Get.
  545. func RetrieveLatest(info *Info, err error) error {
  546. if err != nil {
  547. return err
  548. }
  549. if meta.IsListType(info.Object) {
  550. return fmt.Errorf("watch is only supported on individual resources and resource collections, but a list of resources is found")
  551. }
  552. if len(info.Name) == 0 {
  553. return nil
  554. }
  555. if info.Namespaced() && len(info.Namespace) == 0 {
  556. return fmt.Errorf("no namespace set on resource %s %q", info.Mapping.Resource, info.Name)
  557. }
  558. return info.Get()
  559. }
  560. // RetrieveLazy updates the object if it has not been loaded yet.
  561. func RetrieveLazy(info *Info, err error) error {
  562. if err != nil {
  563. return err
  564. }
  565. if info.Object == nil {
  566. return info.Get()
  567. }
  568. return nil
  569. }