server.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package server
  14. import (
  15. "crypto/tls"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "net"
  20. "net/http"
  21. "net/http/pprof"
  22. "reflect"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "time"
  27. restful "github.com/emicklei/go-restful"
  28. "github.com/golang/glog"
  29. cadvisorapi "github.com/google/cadvisor/info/v1"
  30. cadvisorapiv2 "github.com/google/cadvisor/info/v2"
  31. "github.com/prometheus/client_golang/prometheus"
  32. "k8s.io/kubernetes/pkg/api"
  33. apierrs "k8s.io/kubernetes/pkg/api/errors"
  34. "k8s.io/kubernetes/pkg/api/unversioned"
  35. "k8s.io/kubernetes/pkg/api/v1"
  36. "k8s.io/kubernetes/pkg/api/validation"
  37. "k8s.io/kubernetes/pkg/auth/authenticator"
  38. "k8s.io/kubernetes/pkg/auth/authorizer"
  39. "k8s.io/kubernetes/pkg/healthz"
  40. "k8s.io/kubernetes/pkg/httplog"
  41. "k8s.io/kubernetes/pkg/kubelet/cm"
  42. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  43. "k8s.io/kubernetes/pkg/kubelet/server/portforward"
  44. "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
  45. "k8s.io/kubernetes/pkg/kubelet/server/stats"
  46. "k8s.io/kubernetes/pkg/runtime"
  47. "k8s.io/kubernetes/pkg/types"
  48. "k8s.io/kubernetes/pkg/util/configz"
  49. "k8s.io/kubernetes/pkg/util/flushwriter"
  50. "k8s.io/kubernetes/pkg/util/httpstream"
  51. "k8s.io/kubernetes/pkg/util/httpstream/spdy"
  52. "k8s.io/kubernetes/pkg/util/limitwriter"
  53. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  54. "k8s.io/kubernetes/pkg/util/term"
  55. "k8s.io/kubernetes/pkg/volume"
  56. )
  57. // Server is a http.Handler which exposes kubelet functionality over HTTP.
  58. type Server struct {
  59. auth AuthInterface
  60. host HostInterface
  61. restfulCont containerInterface
  62. resourceAnalyzer stats.ResourceAnalyzer
  63. runtime kubecontainer.Runtime
  64. }
  65. type TLSOptions struct {
  66. Config *tls.Config
  67. CertFile string
  68. KeyFile string
  69. }
  70. // containerInterface defines the restful.Container functions used on the root container
  71. type containerInterface interface {
  72. Add(service *restful.WebService) *restful.Container
  73. Handle(path string, handler http.Handler)
  74. Filter(filter restful.FilterFunction)
  75. ServeHTTP(w http.ResponseWriter, r *http.Request)
  76. RegisteredWebServices() []*restful.WebService
  77. // RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
  78. // Used to test filters are being applied on non-web-service handlers
  79. RegisteredHandlePaths() []string
  80. }
  81. // filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
  82. // so we can ensure restful.FilterFunctions are used for all handlers
  83. type filteringContainer struct {
  84. *restful.Container
  85. registeredHandlePaths []string
  86. }
  87. func (a *filteringContainer) Handle(path string, handler http.Handler) {
  88. a.HandleWithFilter(path, handler)
  89. a.registeredHandlePaths = append(a.registeredHandlePaths, path)
  90. }
  91. func (a *filteringContainer) RegisteredHandlePaths() []string {
  92. return a.registeredHandlePaths
  93. }
  94. // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
  95. func ListenAndServeKubeletServer(
  96. host HostInterface,
  97. resourceAnalyzer stats.ResourceAnalyzer,
  98. address net.IP,
  99. port uint,
  100. tlsOptions *TLSOptions,
  101. auth AuthInterface,
  102. enableDebuggingHandlers bool,
  103. runtime kubecontainer.Runtime) {
  104. glog.Infof("Starting to listen on %s:%d", address, port)
  105. handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime)
  106. s := &http.Server{
  107. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  108. Handler: &handler,
  109. MaxHeaderBytes: 1 << 20,
  110. }
  111. if tlsOptions != nil {
  112. s.TLSConfig = tlsOptions.Config
  113. glog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile))
  114. } else {
  115. glog.Fatal(s.ListenAndServe())
  116. }
  117. }
  118. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
  119. func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) {
  120. glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
  121. s := NewServer(host, resourceAnalyzer, nil, false, runtime)
  122. server := &http.Server{
  123. Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
  124. Handler: &s,
  125. MaxHeaderBytes: 1 << 20,
  126. }
  127. glog.Fatal(server.ListenAndServe())
  128. }
  129. // AuthInterface contains all methods required by the auth filters
  130. type AuthInterface interface {
  131. authenticator.Request
  132. authorizer.RequestAttributesGetter
  133. authorizer.Authorizer
  134. }
  135. // HostInterface contains all the kubelet methods required by the server.
  136. // For testablitiy.
  137. type HostInterface interface {
  138. GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
  139. GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
  140. GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error)
  141. GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
  142. GetPods() []*api.Pod
  143. GetRunningPods() ([]*api.Pod, error)
  144. GetPodByName(namespace, name string) (*api.Pod, bool)
  145. RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
  146. ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
  147. AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
  148. GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
  149. ServeLogs(w http.ResponseWriter, req *http.Request)
  150. PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
  151. StreamingConnectionIdleTimeout() time.Duration
  152. ResyncInterval() time.Duration
  153. GetHostname() string
  154. GetNode() (*api.Node, error)
  155. GetNodeConfig() cm.NodeConfig
  156. LatestLoopEntryTime() time.Time
  157. ImagesFsInfo() (cadvisorapiv2.FsInfo, error)
  158. RootFsInfo() (cadvisorapiv2.FsInfo, error)
  159. ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
  160. PLEGHealthCheck() (bool, error)
  161. }
  162. // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
  163. func NewServer(
  164. host HostInterface,
  165. resourceAnalyzer stats.ResourceAnalyzer,
  166. auth AuthInterface,
  167. enableDebuggingHandlers bool,
  168. runtime kubecontainer.Runtime) Server {
  169. server := Server{
  170. host: host,
  171. resourceAnalyzer: resourceAnalyzer,
  172. auth: auth,
  173. restfulCont: &filteringContainer{Container: restful.NewContainer()},
  174. runtime: runtime,
  175. }
  176. if auth != nil {
  177. server.InstallAuthFilter()
  178. }
  179. server.InstallDefaultHandlers()
  180. if enableDebuggingHandlers {
  181. server.InstallDebuggingHandlers()
  182. }
  183. return server
  184. }
  185. // InstallAuthFilter installs authentication filters with the restful Container.
  186. func (s *Server) InstallAuthFilter() {
  187. s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  188. // Authenticate
  189. u, ok, err := s.auth.AuthenticateRequest(req.Request)
  190. if err != nil {
  191. glog.Errorf("Unable to authenticate the request due to an error: %v", err)
  192. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  193. return
  194. }
  195. if !ok {
  196. resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
  197. return
  198. }
  199. // Get authorization attributes
  200. attrs := s.auth.GetRequestAttributes(u, req.Request)
  201. // Authorize
  202. authorized, reason, err := s.auth.Authorize(attrs)
  203. if err != nil {
  204. msg := fmt.Sprintf("Error (user=%s, verb=%s, namespace=%s, resource=%s)", u.GetName(), attrs.GetVerb(), attrs.GetNamespace(), attrs.GetResource())
  205. glog.Errorf(msg, err)
  206. resp.WriteErrorString(http.StatusInternalServerError, msg)
  207. return
  208. }
  209. if !authorized {
  210. msg := fmt.Sprintf("Forbidden (reason=%s, user=%s, verb=%s, namespace=%s, resource=%s)", reason, u.GetName(), attrs.GetVerb(), attrs.GetNamespace(), attrs.GetResource())
  211. glog.V(2).Info(msg)
  212. resp.WriteErrorString(http.StatusForbidden, msg)
  213. return
  214. }
  215. // Continue
  216. chain.ProcessFilter(req, resp)
  217. })
  218. }
  219. // InstallDefaultHandlers registers the default set of supported HTTP request
  220. // patterns with the restful Container.
  221. func (s *Server) InstallDefaultHandlers() {
  222. healthz.InstallHandler(s.restfulCont,
  223. healthz.PingHealthz,
  224. healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
  225. healthz.NamedCheck("pleg", s.plegHealthCheck),
  226. )
  227. var ws *restful.WebService
  228. ws = new(restful.WebService)
  229. ws.
  230. Path("/pods").
  231. Produces(restful.MIME_JSON)
  232. ws.Route(ws.GET("").
  233. To(s.getPods).
  234. Operation("getPods"))
  235. s.restfulCont.Add(ws)
  236. s.restfulCont.Add(stats.CreateHandlers(s.host, s.resourceAnalyzer))
  237. s.restfulCont.Handle("/metrics", prometheus.Handler())
  238. ws = new(restful.WebService)
  239. ws.
  240. Path("/spec/").
  241. Produces(restful.MIME_JSON)
  242. ws.Route(ws.GET("").
  243. To(s.getSpec).
  244. Operation("getSpec").
  245. Writes(cadvisorapi.MachineInfo{}))
  246. s.restfulCont.Add(ws)
  247. }
  248. const pprofBasePath = "/debug/pprof/"
  249. // InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
  250. func (s *Server) InstallDebuggingHandlers() {
  251. var ws *restful.WebService
  252. ws = new(restful.WebService)
  253. ws.
  254. Path("/run")
  255. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  256. To(s.getRun).
  257. Operation("getRun"))
  258. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  259. To(s.getRun).
  260. Operation("getRun"))
  261. s.restfulCont.Add(ws)
  262. ws = new(restful.WebService)
  263. ws.
  264. Path("/exec")
  265. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  266. To(s.getExec).
  267. Operation("getExec"))
  268. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  269. To(s.getExec).
  270. Operation("getExec"))
  271. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  272. To(s.getExec).
  273. Operation("getExec"))
  274. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  275. To(s.getExec).
  276. Operation("getExec"))
  277. s.restfulCont.Add(ws)
  278. ws = new(restful.WebService)
  279. ws.
  280. Path("/attach")
  281. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  282. To(s.getAttach).
  283. Operation("getAttach"))
  284. ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  285. To(s.getAttach).
  286. Operation("getAttach"))
  287. ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  288. To(s.getAttach).
  289. Operation("getAttach"))
  290. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  291. To(s.getAttach).
  292. Operation("getAttach"))
  293. s.restfulCont.Add(ws)
  294. ws = new(restful.WebService)
  295. ws.
  296. Path("/portForward")
  297. ws.Route(ws.POST("/{podNamespace}/{podID}").
  298. To(s.getPortForward).
  299. Operation("getPortForward"))
  300. ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
  301. To(s.getPortForward).
  302. Operation("getPortForward"))
  303. s.restfulCont.Add(ws)
  304. ws = new(restful.WebService)
  305. ws.
  306. Path("/logs/")
  307. ws.Route(ws.GET("").
  308. To(s.getLogs).
  309. Operation("getLogs"))
  310. ws.Route(ws.GET("/{logpath:*}").
  311. To(s.getLogs).
  312. Operation("getLogs"))
  313. s.restfulCont.Add(ws)
  314. ws = new(restful.WebService)
  315. ws.
  316. Path("/containerLogs")
  317. ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  318. To(s.getContainerLogs).
  319. Operation("getContainerLogs"))
  320. s.restfulCont.Add(ws)
  321. configz.InstallHandler(s.restfulCont)
  322. handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
  323. name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
  324. switch name {
  325. case "profile":
  326. pprof.Profile(resp, req.Request)
  327. case "symbol":
  328. pprof.Symbol(resp, req.Request)
  329. case "cmdline":
  330. pprof.Cmdline(resp, req.Request)
  331. default:
  332. pprof.Index(resp, req.Request)
  333. }
  334. }
  335. // Setup pporf handlers.
  336. ws = new(restful.WebService).Path(pprofBasePath)
  337. ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
  338. handlePprofEndpoint(req, resp)
  339. })).Doc("pprof endpoint")
  340. s.restfulCont.Add(ws)
  341. // The /runningpods endpoint is used for testing only.
  342. ws = new(restful.WebService)
  343. ws.
  344. Path("/runningpods/").
  345. Produces(restful.MIME_JSON)
  346. ws.Route(ws.GET("").
  347. To(s.getRunningPods).
  348. Operation("getRunningPods"))
  349. s.restfulCont.Add(ws)
  350. }
  351. type httpHandler struct {
  352. f func(w http.ResponseWriter, r *http.Request)
  353. }
  354. func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  355. h.f(w, r)
  356. }
  357. // Checks if kubelet's sync loop that updates containers is working.
  358. func (s *Server) syncLoopHealthCheck(req *http.Request) error {
  359. duration := s.host.ResyncInterval() * 2
  360. minDuration := time.Minute * 5
  361. if duration < minDuration {
  362. duration = minDuration
  363. }
  364. enterLoopTime := s.host.LatestLoopEntryTime()
  365. if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
  366. return fmt.Errorf("Sync Loop took longer than expected.")
  367. }
  368. return nil
  369. }
  370. // Checks if pleg, which lists pods periodically, is healthy.
  371. func (s *Server) plegHealthCheck(req *http.Request) error {
  372. if ok, err := s.host.PLEGHealthCheck(); !ok {
  373. return fmt.Errorf("PLEG took longer than expected: %v", err)
  374. }
  375. return nil
  376. }
  377. // getContainerLogs handles containerLogs request against the Kubelet
  378. func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
  379. podNamespace := request.PathParameter("podNamespace")
  380. podID := request.PathParameter("podID")
  381. containerName := request.PathParameter("containerName")
  382. if len(podID) == 0 {
  383. // TODO: Why return JSON when the rest return plaintext errors?
  384. // TODO: Why return plaintext errors?
  385. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
  386. return
  387. }
  388. if len(containerName) == 0 {
  389. // TODO: Why return JSON when the rest return plaintext errors?
  390. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
  391. return
  392. }
  393. if len(podNamespace) == 0 {
  394. // TODO: Why return JSON when the rest return plaintext errors?
  395. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
  396. return
  397. }
  398. query := request.Request.URL.Query()
  399. // backwards compatibility for the "tail" query parameter
  400. if tail := request.QueryParameter("tail"); len(tail) > 0 {
  401. query["tailLines"] = []string{tail}
  402. // "all" is the same as omitting tail
  403. if tail == "all" {
  404. delete(query, "tailLines")
  405. }
  406. }
  407. // container logs on the kubelet are locked to the v1 API version of PodLogOptions
  408. logOptions := &api.PodLogOptions{}
  409. if err := api.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
  410. response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
  411. return
  412. }
  413. logOptions.TypeMeta = unversioned.TypeMeta{}
  414. if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
  415. response.WriteError(apierrs.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
  416. return
  417. }
  418. pod, ok := s.host.GetPodByName(podNamespace, podID)
  419. if !ok {
  420. response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist\n", podID))
  421. return
  422. }
  423. // Check if containerName is valid.
  424. containerExists := false
  425. for _, container := range pod.Spec.Containers {
  426. if container.Name == containerName {
  427. containerExists = true
  428. }
  429. }
  430. if !containerExists {
  431. for _, container := range pod.Spec.InitContainers {
  432. if container.Name == containerName {
  433. containerExists = true
  434. }
  435. }
  436. }
  437. if !containerExists {
  438. response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q\n", containerName, podID))
  439. return
  440. }
  441. if _, ok := response.ResponseWriter.(http.Flusher); !ok {
  442. response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs\n", reflect.TypeOf(response)))
  443. return
  444. }
  445. fw := flushwriter.Wrap(response.ResponseWriter)
  446. if logOptions.LimitBytes != nil {
  447. fw = limitwriter.New(fw, *logOptions.LimitBytes)
  448. }
  449. response.Header().Set("Transfer-Encoding", "chunked")
  450. if err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
  451. if err != limitwriter.ErrMaximumWrite {
  452. response.WriteError(http.StatusBadRequest, err)
  453. }
  454. return
  455. }
  456. }
  457. // encodePods creates an api.PodList object from pods and returns the encoded
  458. // PodList.
  459. func encodePods(pods []*api.Pod) (data []byte, err error) {
  460. podList := new(api.PodList)
  461. for _, pod := range pods {
  462. podList.Items = append(podList.Items, *pod)
  463. }
  464. // TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
  465. // as API server refactor.
  466. // TODO: Locked to v1, needs to be made generic
  467. codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
  468. return runtime.Encode(codec, podList)
  469. }
  470. // getPods returns a list of pods bound to the Kubelet and their spec.
  471. func (s *Server) getPods(request *restful.Request, response *restful.Response) {
  472. pods := s.host.GetPods()
  473. data, err := encodePods(pods)
  474. if err != nil {
  475. response.WriteError(http.StatusInternalServerError, err)
  476. return
  477. }
  478. writeJsonResponse(response, data)
  479. }
  480. // getRunningPods returns a list of pods running on Kubelet. The list is
  481. // provided by the container runtime, and is different from the list returned
  482. // by getPods, which is a set of desired pods to run.
  483. func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
  484. pods, err := s.host.GetRunningPods()
  485. if err != nil {
  486. response.WriteError(http.StatusInternalServerError, err)
  487. return
  488. }
  489. data, err := encodePods(pods)
  490. if err != nil {
  491. response.WriteError(http.StatusInternalServerError, err)
  492. return
  493. }
  494. writeJsonResponse(response, data)
  495. }
  496. // getLogs handles logs requests against the Kubelet.
  497. func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
  498. s.host.ServeLogs(response, request.Request)
  499. }
  500. // getSpec handles spec requests against the Kubelet.
  501. func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
  502. info, err := s.host.GetCachedMachineInfo()
  503. if err != nil {
  504. response.WriteError(http.StatusInternalServerError, err)
  505. return
  506. }
  507. response.WriteEntity(info)
  508. }
  509. func getContainerCoordinates(request *restful.Request) (namespace, pod string, uid types.UID, container string) {
  510. namespace = request.PathParameter("podNamespace")
  511. pod = request.PathParameter("podID")
  512. if uidStr := request.PathParameter("uid"); uidStr != "" {
  513. uid = types.UID(uidStr)
  514. }
  515. container = request.PathParameter("containerName")
  516. return
  517. }
  518. // getAttach handles requests to attach to a container.
  519. func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
  520. podNamespace, podID, uid, container := getContainerCoordinates(request)
  521. pod, ok := s.host.GetPodByName(podNamespace, podID)
  522. if !ok {
  523. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  524. return
  525. }
  526. remotecommand.ServeAttach(response.ResponseWriter,
  527. request.Request,
  528. s.host,
  529. kubecontainer.GetPodFullName(pod),
  530. uid,
  531. container,
  532. s.host.StreamingConnectionIdleTimeout(),
  533. remotecommand.DefaultStreamCreationTimeout,
  534. remotecommand.SupportedStreamingProtocols)
  535. }
  536. // getExec handles requests to run a command inside a container.
  537. func (s *Server) getExec(request *restful.Request, response *restful.Response) {
  538. podNamespace, podID, uid, container := getContainerCoordinates(request)
  539. pod, ok := s.host.GetPodByName(podNamespace, podID)
  540. if !ok {
  541. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  542. return
  543. }
  544. remotecommand.ServeExec(response.ResponseWriter,
  545. request.Request,
  546. s.host,
  547. kubecontainer.GetPodFullName(pod),
  548. uid,
  549. container,
  550. s.host.StreamingConnectionIdleTimeout(),
  551. remotecommand.DefaultStreamCreationTimeout,
  552. remotecommand.SupportedStreamingProtocols)
  553. }
  554. // getRun handles requests to run a command inside a container.
  555. func (s *Server) getRun(request *restful.Request, response *restful.Response) {
  556. podNamespace, podID, uid, container := getContainerCoordinates(request)
  557. pod, ok := s.host.GetPodByName(podNamespace, podID)
  558. if !ok {
  559. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  560. return
  561. }
  562. command := strings.Split(request.QueryParameter("cmd"), " ")
  563. data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
  564. if err != nil {
  565. response.WriteError(http.StatusInternalServerError, err)
  566. return
  567. }
  568. writeJsonResponse(response, data)
  569. }
  570. func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
  571. namespace = request.PathParameter("podNamespace")
  572. pod = request.PathParameter("podID")
  573. if uidStr := request.PathParameter("uid"); uidStr != "" {
  574. uid = types.UID(uidStr)
  575. }
  576. return
  577. }
  578. // Derived from go-restful writeJSON.
  579. func writeJsonResponse(response *restful.Response, data []byte) {
  580. if data == nil {
  581. response.WriteHeader(http.StatusOK)
  582. // do not write a nil representation
  583. return
  584. }
  585. response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
  586. response.WriteHeader(http.StatusOK)
  587. if _, err := response.Write(data); err != nil {
  588. glog.Errorf("Error writing response: %v", err)
  589. }
  590. }
  591. // PortForwarder knows how to forward content from a data stream to/from a port
  592. // in a pod.
  593. type PortForwarder interface {
  594. // PortForwarder copies data between a data stream and a port in a pod.
  595. PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
  596. }
  597. // getPortForward handles a new restful port forward request. It determines the
  598. // pod name and uid and then calls ServePortForward.
  599. func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
  600. podNamespace, podID, uid := getPodCoordinates(request)
  601. pod, ok := s.host.GetPodByName(podNamespace, podID)
  602. if !ok {
  603. response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
  604. return
  605. }
  606. podName := kubecontainer.GetPodFullName(pod)
  607. ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout)
  608. }
  609. // ServePortForward handles a port forwarding request. A single request is
  610. // kept alive as long as the client is still alive and the connection has not
  611. // been timed out due to idleness. This function handles multiple forwarded
  612. // connections; i.e., multiple `curl http://localhost:8888/` requests will be
  613. // handled by a single invocation of ServePortForward.
  614. func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
  615. supportedPortForwardProtocols := []string{portforward.PortForwardProtocolV1Name}
  616. _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
  617. // negotiated protocol isn't currently used server side, but could be in the future
  618. if err != nil {
  619. // Handshake writes the error to the client
  620. utilruntime.HandleError(err)
  621. return
  622. }
  623. streamChan := make(chan httpstream.Stream, 1)
  624. glog.V(5).Infof("Upgrading port forward response")
  625. upgrader := spdy.NewResponseUpgrader()
  626. conn := upgrader.UpgradeResponse(w, req, portForwardStreamReceived(streamChan))
  627. if conn == nil {
  628. return
  629. }
  630. defer conn.Close()
  631. glog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout)
  632. conn.SetIdleTimeout(idleTimeout)
  633. h := &portForwardStreamHandler{
  634. conn: conn,
  635. streamChan: streamChan,
  636. streamPairs: make(map[string]*portForwardStreamPair),
  637. streamCreationTimeout: streamCreationTimeout,
  638. pod: podName,
  639. uid: uid,
  640. forwarder: portForwarder,
  641. }
  642. h.run()
  643. }
  644. // portForwardStreamReceived is the httpstream.NewStreamHandler for port
  645. // forward streams. It checks each stream's port and stream type headers,
  646. // rejecting any streams that with missing or invalid values. Each valid
  647. // stream is sent to the streams channel.
  648. func portForwardStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
  649. return func(stream httpstream.Stream, replySent <-chan struct{}) error {
  650. // make sure it has a valid port header
  651. portString := stream.Headers().Get(api.PortHeader)
  652. if len(portString) == 0 {
  653. return fmt.Errorf("%q header is required", api.PortHeader)
  654. }
  655. port, err := strconv.ParseUint(portString, 10, 16)
  656. if err != nil {
  657. return fmt.Errorf("unable to parse %q as a port: %v", portString, err)
  658. }
  659. if port < 1 {
  660. return fmt.Errorf("port %q must be > 0", portString)
  661. }
  662. // make sure it has a valid stream type header
  663. streamType := stream.Headers().Get(api.StreamType)
  664. if len(streamType) == 0 {
  665. return fmt.Errorf("%q header is required", api.StreamType)
  666. }
  667. if streamType != api.StreamTypeError && streamType != api.StreamTypeData {
  668. return fmt.Errorf("invalid stream type %q", streamType)
  669. }
  670. streams <- stream
  671. return nil
  672. }
  673. }
  674. // portForwardStreamHandler is capable of processing multiple port forward
  675. // requests over a single httpstream.Connection.
  676. type portForwardStreamHandler struct {
  677. conn httpstream.Connection
  678. streamChan chan httpstream.Stream
  679. streamPairsLock sync.RWMutex
  680. streamPairs map[string]*portForwardStreamPair
  681. streamCreationTimeout time.Duration
  682. pod string
  683. uid types.UID
  684. forwarder PortForwarder
  685. }
  686. // getStreamPair returns a portForwardStreamPair for requestID. This creates a
  687. // new pair if one does not yet exist for the requestID. The returned bool is
  688. // true if the pair was created.
  689. func (h *portForwardStreamHandler) getStreamPair(requestID string) (*portForwardStreamPair, bool) {
  690. h.streamPairsLock.Lock()
  691. defer h.streamPairsLock.Unlock()
  692. if p, ok := h.streamPairs[requestID]; ok {
  693. glog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
  694. return p, false
  695. }
  696. glog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
  697. p := newPortForwardPair(requestID)
  698. h.streamPairs[requestID] = p
  699. return p, true
  700. }
  701. // monitorStreamPair waits for the pair to receive both its error and data
  702. // streams, or for the timeout to expire (whichever happens first), and then
  703. // removes the pair.
  704. func (h *portForwardStreamHandler) monitorStreamPair(p *portForwardStreamPair, timeout <-chan time.Time) {
  705. select {
  706. case <-timeout:
  707. err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
  708. utilruntime.HandleError(err)
  709. p.printError(err.Error())
  710. case <-p.complete:
  711. glog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
  712. }
  713. h.removeStreamPair(p.requestID)
  714. }
  715. // hasStreamPair returns a bool indicating if a stream pair for requestID
  716. // exists.
  717. func (h *portForwardStreamHandler) hasStreamPair(requestID string) bool {
  718. h.streamPairsLock.RLock()
  719. defer h.streamPairsLock.RUnlock()
  720. _, ok := h.streamPairs[requestID]
  721. return ok
  722. }
  723. // removeStreamPair removes the stream pair identified by requestID from streamPairs.
  724. func (h *portForwardStreamHandler) removeStreamPair(requestID string) {
  725. h.streamPairsLock.Lock()
  726. defer h.streamPairsLock.Unlock()
  727. delete(h.streamPairs, requestID)
  728. }
  729. // requestID returns the request id for stream.
  730. func (h *portForwardStreamHandler) requestID(stream httpstream.Stream) string {
  731. requestID := stream.Headers().Get(api.PortForwardRequestIDHeader)
  732. if len(requestID) == 0 {
  733. glog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader)
  734. // If we get here, it's because the connection came from an older client
  735. // that isn't generating the request id header
  736. // (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287)
  737. //
  738. // This is a best-effort attempt at supporting older clients.
  739. //
  740. // When there aren't concurrent new forwarded connections, each connection
  741. // will have a pair of streams (data, error), and the stream IDs will be
  742. // consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert
  743. // the stream ID into a pseudo-request id by taking the stream type and
  744. // using id = stream.Identifier() when the stream type is error,
  745. // and id = stream.Identifier() - 2 when it's data.
  746. //
  747. // NOTE: this only works when there are not concurrent new streams from
  748. // multiple forwarded connections; it's a best-effort attempt at supporting
  749. // old clients that don't generate request ids. If there are concurrent
  750. // new connections, it's possible that 1 connection gets streams whose IDs
  751. // are not consecutive (e.g. 5 and 9 instead of 5 and 7).
  752. streamType := stream.Headers().Get(api.StreamType)
  753. switch streamType {
  754. case api.StreamTypeError:
  755. requestID = strconv.Itoa(int(stream.Identifier()))
  756. case api.StreamTypeData:
  757. requestID = strconv.Itoa(int(stream.Identifier()) - 2)
  758. }
  759. glog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier())
  760. }
  761. return requestID
  762. }
  763. // run is the main loop for the portForwardStreamHandler. It processes new
  764. // streams, invoking portForward for each complete stream pair. The loop exits
  765. // when the httpstream.Connection is closed.
  766. func (h *portForwardStreamHandler) run() {
  767. glog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn)
  768. Loop:
  769. for {
  770. select {
  771. case <-h.conn.CloseChan():
  772. glog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn)
  773. break Loop
  774. case stream := <-h.streamChan:
  775. requestID := h.requestID(stream)
  776. streamType := stream.Headers().Get(api.StreamType)
  777. glog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
  778. p, created := h.getStreamPair(requestID)
  779. if created {
  780. go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
  781. }
  782. if complete, err := p.add(stream); err != nil {
  783. msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
  784. utilruntime.HandleError(errors.New(msg))
  785. p.printError(msg)
  786. } else if complete {
  787. go h.portForward(p)
  788. }
  789. }
  790. }
  791. }
  792. // portForward invokes the portForwardStreamHandler's forwarder.PortForward
  793. // function for the given stream pair.
  794. func (h *portForwardStreamHandler) portForward(p *portForwardStreamPair) {
  795. defer p.dataStream.Close()
  796. defer p.errorStream.Close()
  797. portString := p.dataStream.Headers().Get(api.PortHeader)
  798. port, _ := strconv.ParseUint(portString, 10, 16)
  799. glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
  800. err := h.forwarder.PortForward(h.pod, h.uid, uint16(port), p.dataStream)
  801. glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
  802. if err != nil {
  803. msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
  804. utilruntime.HandleError(msg)
  805. fmt.Fprint(p.errorStream, msg.Error())
  806. }
  807. }
  808. // portForwardStreamPair represents the error and data streams for a port
  809. // forwarding request.
  810. type portForwardStreamPair struct {
  811. lock sync.RWMutex
  812. requestID string
  813. dataStream httpstream.Stream
  814. errorStream httpstream.Stream
  815. complete chan struct{}
  816. }
  817. // newPortForwardPair creates a new portForwardStreamPair.
  818. func newPortForwardPair(requestID string) *portForwardStreamPair {
  819. return &portForwardStreamPair{
  820. requestID: requestID,
  821. complete: make(chan struct{}),
  822. }
  823. }
  824. // add adds the stream to the portForwardStreamPair. If the pair already
  825. // contains a stream for the new stream's type, an error is returned. add
  826. // returns true if both the data and error streams for this pair have been
  827. // received.
  828. func (p *portForwardStreamPair) add(stream httpstream.Stream) (bool, error) {
  829. p.lock.Lock()
  830. defer p.lock.Unlock()
  831. switch stream.Headers().Get(api.StreamType) {
  832. case api.StreamTypeError:
  833. if p.errorStream != nil {
  834. return false, errors.New("error stream already assigned")
  835. }
  836. p.errorStream = stream
  837. case api.StreamTypeData:
  838. if p.dataStream != nil {
  839. return false, errors.New("data stream already assigned")
  840. }
  841. p.dataStream = stream
  842. }
  843. complete := p.errorStream != nil && p.dataStream != nil
  844. if complete {
  845. close(p.complete)
  846. }
  847. return complete, nil
  848. }
  849. // printError writes s to p.errorStream if p.errorStream has been set.
  850. func (p *portForwardStreamPair) printError(s string) {
  851. p.lock.RLock()
  852. defer p.lock.RUnlock()
  853. if p.errorStream != nil {
  854. fmt.Fprint(p.errorStream, s)
  855. }
  856. }
  857. // ServeHTTP responds to HTTP requests on the Kubelet.
  858. func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  859. defer httplog.NewLogged(req, &w).StacktraceWhen(
  860. httplog.StatusIsNot(
  861. http.StatusOK,
  862. http.StatusMovedPermanently,
  863. http.StatusTemporaryRedirect,
  864. http.StatusNotFound,
  865. http.StatusSwitchingProtocols,
  866. ),
  867. ).Log()
  868. s.restfulCont.ServeHTTP(w, req)
  869. }