kube_docker_client.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dockertools
  14. import (
  15. "bytes"
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "sync"
  22. "time"
  23. "github.com/golang/glog"
  24. dockermessage "github.com/docker/docker/pkg/jsonmessage"
  25. dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
  26. dockerapi "github.com/docker/engine-api/client"
  27. dockertypes "github.com/docker/engine-api/types"
  28. "golang.org/x/net/context"
  29. )
  30. // kubeDockerClient is a wrapped layer of docker client for kubelet internal use. This layer is added to:
  31. // 1) Redirect stream for exec and attach operations.
  32. // 2) Wrap the context in this layer to make the DockerInterface cleaner.
  33. // 3) Stabilize the DockerInterface. The engine-api is still under active development, the interface
  34. // is not stabilized yet. However, the DockerInterface is used in many files in Kubernetes, we may
  35. // not want to change the interface frequently. With this layer, we can port the engine api to the
  36. // DockerInterface to avoid changing DockerInterface as much as possible.
  37. // (See
  38. // * https://github.com/docker/engine-api/issues/89
  39. // * https://github.com/docker/engine-api/issues/137
  40. // * https://github.com/docker/engine-api/pull/140)
  41. // TODO(random-liu): Swith to new docker interface by refactoring the functions in the old DockerInterface
  42. // one by one.
  43. type kubeDockerClient struct {
  44. // timeout is the timeout of short running docker operations.
  45. timeout time.Duration
  46. client *dockerapi.Client
  47. }
  48. // Make sure that kubeDockerClient implemented the DockerInterface.
  49. var _ DockerInterface = &kubeDockerClient{}
  50. // There are 2 kinds of docker operations categorized by running time:
  51. // * Long running operation: The long running operation could run for arbitrary long time, and the running time
  52. // usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer.
  53. // * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish
  54. // in expected and usually short time. These include all other operations.
  55. // kubeDockerClient only applies timeout on non-long running operations.
  56. const (
  57. // defaultTimeout is the default timeout of short running docker operations.
  58. defaultTimeout = 2 * time.Minute
  59. // defaultShmSize is the default ShmSize to use (in bytes) if not specified.
  60. defaultShmSize = int64(1024 * 1024 * 64)
  61. // defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
  62. defaultImagePullingProgressReportInterval = 10 * time.Second
  63. // defaultImagePullingStuckTimeout is the default timeout for image pulling stuck. If no progress
  64. // is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
  65. // Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
  66. // between progress updates.
  67. // TODO(random-liu): Make this configurable
  68. defaultImagePullingStuckTimeout = 1 * time.Minute
  69. )
  70. // newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
  71. // defaultTimeout will be applied.
  72. func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout time.Duration) DockerInterface {
  73. if requestTimeout == 0 {
  74. requestTimeout = defaultTimeout
  75. }
  76. return &kubeDockerClient{
  77. client: dockerClient,
  78. timeout: requestTimeout,
  79. }
  80. }
  81. func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
  82. ctx, cancel := d.getTimeoutContext()
  83. defer cancel()
  84. containers, err := d.client.ContainerList(ctx, options)
  85. if ctxErr := contextError(ctx); ctxErr != nil {
  86. return nil, ctxErr
  87. }
  88. if err != nil {
  89. return nil, err
  90. }
  91. return containers, nil
  92. }
  93. func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
  94. ctx, cancel := d.getTimeoutContext()
  95. defer cancel()
  96. containerJSON, err := d.client.ContainerInspect(ctx, id)
  97. if ctxErr := contextError(ctx); ctxErr != nil {
  98. return nil, ctxErr
  99. }
  100. if err != nil {
  101. if dockerapi.IsErrContainerNotFound(err) {
  102. return nil, containerNotFoundError{ID: id}
  103. }
  104. return nil, err
  105. }
  106. return &containerJSON, nil
  107. }
  108. func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
  109. ctx, cancel := d.getTimeoutContext()
  110. defer cancel()
  111. // we provide an explicit default shm size as to not depend on docker daemon.
  112. // TODO: evaluate exposing this as a knob in the API
  113. if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
  114. opts.HostConfig.ShmSize = defaultShmSize
  115. }
  116. createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
  117. if ctxErr := contextError(ctx); ctxErr != nil {
  118. return nil, ctxErr
  119. }
  120. if err != nil {
  121. return nil, err
  122. }
  123. return &createResp, nil
  124. }
  125. func (d *kubeDockerClient) StartContainer(id string) error {
  126. ctx, cancel := d.getTimeoutContext()
  127. defer cancel()
  128. err := d.client.ContainerStart(ctx, id)
  129. if ctxErr := contextError(ctx); ctxErr != nil {
  130. return ctxErr
  131. }
  132. return err
  133. }
  134. // Stopping an already stopped container will not cause an error in engine-api.
  135. func (d *kubeDockerClient) StopContainer(id string, timeout int) error {
  136. ctx, cancel := d.getCustomTimeoutContext(time.Duration(timeout) * time.Second)
  137. defer cancel()
  138. err := d.client.ContainerStop(ctx, id, timeout)
  139. if ctxErr := contextError(ctx); ctxErr != nil {
  140. return ctxErr
  141. }
  142. return err
  143. }
  144. func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
  145. ctx, cancel := d.getTimeoutContext()
  146. defer cancel()
  147. err := d.client.ContainerRemove(ctx, id, opts)
  148. if ctxErr := contextError(ctx); ctxErr != nil {
  149. return ctxErr
  150. }
  151. return err
  152. }
  153. func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect, error) {
  154. ctx, cancel := d.getTimeoutContext()
  155. defer cancel()
  156. resp, _, err := d.client.ImageInspectWithRaw(ctx, image, true)
  157. if ctxErr := contextError(ctx); ctxErr != nil {
  158. return nil, ctxErr
  159. }
  160. if err != nil {
  161. if dockerapi.IsErrImageNotFound(err) {
  162. err = imageNotFoundError{ID: image}
  163. }
  164. return nil, err
  165. }
  166. if !matchImageTagOrSHA(resp, image) {
  167. return nil, imageNotFoundError{ID: image}
  168. }
  169. return &resp, nil
  170. }
  171. func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
  172. ctx, cancel := d.getTimeoutContext()
  173. defer cancel()
  174. resp, err := d.client.ImageHistory(ctx, id)
  175. if ctxErr := contextError(ctx); ctxErr != nil {
  176. return nil, ctxErr
  177. }
  178. return resp, err
  179. }
  180. func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
  181. ctx, cancel := d.getTimeoutContext()
  182. defer cancel()
  183. images, err := d.client.ImageList(ctx, opts)
  184. if ctxErr := contextError(ctx); ctxErr != nil {
  185. return nil, ctxErr
  186. }
  187. if err != nil {
  188. return nil, err
  189. }
  190. return images, nil
  191. }
  192. func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) {
  193. var buf bytes.Buffer
  194. if err := json.NewEncoder(&buf).Encode(auth); err != nil {
  195. return "", err
  196. }
  197. return base64.URLEncoding.EncodeToString(buf.Bytes()), nil
  198. }
  199. // progress is a wrapper of dockermessage.JSONMessage with a lock protecting it.
  200. type progress struct {
  201. sync.RWMutex
  202. // message stores the latest docker json message.
  203. message *dockermessage.JSONMessage
  204. // timestamp of the latest update.
  205. timestamp time.Time
  206. }
  207. func newProgress() *progress {
  208. return &progress{timestamp: time.Now()}
  209. }
  210. func (p *progress) set(msg *dockermessage.JSONMessage) {
  211. p.Lock()
  212. defer p.Unlock()
  213. p.message = msg
  214. p.timestamp = time.Now()
  215. }
  216. func (p *progress) get() (string, time.Time) {
  217. p.RLock()
  218. defer p.RUnlock()
  219. if p.message == nil {
  220. return "No progress", p.timestamp
  221. }
  222. // The following code is based on JSONMessage.Display
  223. var prefix string
  224. if p.message.ID != "" {
  225. prefix = fmt.Sprintf("%s: ", p.message.ID)
  226. }
  227. if p.message.Progress == nil {
  228. return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
  229. }
  230. return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
  231. }
  232. // progressReporter keeps the newest image pulling progress and periodically report the newest progress.
  233. type progressReporter struct {
  234. *progress
  235. image string
  236. cancel context.CancelFunc
  237. stopCh chan struct{}
  238. }
  239. // newProgressReporter creates a new progressReporter for specific image with specified reporting interval
  240. func newProgressReporter(image string, cancel context.CancelFunc) *progressReporter {
  241. return &progressReporter{
  242. progress: newProgress(),
  243. image: image,
  244. cancel: cancel,
  245. stopCh: make(chan struct{}),
  246. }
  247. }
  248. // start starts the progressReporter
  249. func (p *progressReporter) start() {
  250. go func() {
  251. ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
  252. defer ticker.Stop()
  253. for {
  254. // TODO(random-liu): Report as events.
  255. select {
  256. case <-ticker.C:
  257. progress, timestamp := p.progress.get()
  258. // If there is no progress for defaultImagePullingStuckTimeout, cancel the operation.
  259. if time.Now().Sub(timestamp) > defaultImagePullingStuckTimeout {
  260. glog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, defaultImagePullingStuckTimeout, progress)
  261. p.cancel()
  262. return
  263. }
  264. glog.V(2).Infof("Pulling image %q: %q", p.image, progress)
  265. case <-p.stopCh:
  266. progress, _ := p.progress.get()
  267. glog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
  268. return
  269. }
  270. }
  271. }()
  272. }
  273. // stop stops the progressReporter
  274. func (p *progressReporter) stop() {
  275. close(p.stopCh)
  276. }
  277. func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
  278. // RegistryAuth is the base64 encoded credentials for the registry
  279. base64Auth, err := base64EncodeAuth(auth)
  280. if err != nil {
  281. return err
  282. }
  283. opts.RegistryAuth = base64Auth
  284. ctx, cancel := d.getCancelableContext()
  285. defer cancel()
  286. resp, err := d.client.ImagePull(ctx, image, opts)
  287. if err != nil {
  288. return err
  289. }
  290. defer resp.Close()
  291. reporter := newProgressReporter(image, cancel)
  292. reporter.start()
  293. defer reporter.stop()
  294. decoder := json.NewDecoder(resp)
  295. for {
  296. var msg dockermessage.JSONMessage
  297. err := decoder.Decode(&msg)
  298. if err == io.EOF {
  299. break
  300. }
  301. if err != nil {
  302. return err
  303. }
  304. if msg.Error != nil {
  305. return msg.Error
  306. }
  307. reporter.set(&msg)
  308. }
  309. return nil
  310. }
  311. func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
  312. ctx, cancel := d.getTimeoutContext()
  313. defer cancel()
  314. resp, err := d.client.ImageRemove(ctx, image, opts)
  315. if ctxErr := contextError(ctx); ctxErr != nil {
  316. return nil, ctxErr
  317. }
  318. return resp, err
  319. }
  320. func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
  321. ctx, cancel := d.getCancelableContext()
  322. defer cancel()
  323. resp, err := d.client.ContainerLogs(ctx, id, opts)
  324. if ctxErr := contextError(ctx); ctxErr != nil {
  325. return ctxErr
  326. }
  327. if err != nil {
  328. return err
  329. }
  330. defer resp.Close()
  331. return d.redirectResponseToOutputStream(sopts.RawTerminal, sopts.OutputStream, sopts.ErrorStream, resp)
  332. }
  333. func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
  334. ctx, cancel := d.getTimeoutContext()
  335. defer cancel()
  336. resp, err := d.client.ServerVersion(ctx)
  337. if ctxErr := contextError(ctx); ctxErr != nil {
  338. return nil, ctxErr
  339. }
  340. if err != nil {
  341. return nil, err
  342. }
  343. return &resp, nil
  344. }
  345. func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
  346. ctx, cancel := d.getTimeoutContext()
  347. defer cancel()
  348. resp, err := d.client.Info(ctx)
  349. if ctxErr := contextError(ctx); ctxErr != nil {
  350. return nil, ctxErr
  351. }
  352. if err != nil {
  353. return nil, err
  354. }
  355. return &resp, nil
  356. }
  357. // TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
  358. func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error) {
  359. ctx, cancel := d.getTimeoutContext()
  360. defer cancel()
  361. resp, err := d.client.ContainerExecCreate(ctx, id, opts)
  362. if ctxErr := contextError(ctx); ctxErr != nil {
  363. return nil, ctxErr
  364. }
  365. if err != nil {
  366. return nil, err
  367. }
  368. return &resp, nil
  369. }
  370. func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
  371. ctx, cancel := d.getCancelableContext()
  372. defer cancel()
  373. if opts.Detach {
  374. err := d.client.ContainerExecStart(ctx, startExec, opts)
  375. if ctxErr := contextError(ctx); ctxErr != nil {
  376. return ctxErr
  377. }
  378. return err
  379. }
  380. resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecConfig{
  381. Detach: opts.Detach,
  382. Tty: opts.Tty,
  383. })
  384. if ctxErr := contextError(ctx); ctxErr != nil {
  385. return ctxErr
  386. }
  387. if err != nil {
  388. return err
  389. }
  390. defer resp.Close()
  391. return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
  392. }
  393. func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
  394. ctx, cancel := d.getTimeoutContext()
  395. defer cancel()
  396. resp, err := d.client.ContainerExecInspect(ctx, id)
  397. if ctxErr := contextError(ctx); ctxErr != nil {
  398. return nil, ctxErr
  399. }
  400. if err != nil {
  401. return nil, err
  402. }
  403. return &resp, nil
  404. }
  405. func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
  406. ctx, cancel := d.getCancelableContext()
  407. defer cancel()
  408. resp, err := d.client.ContainerAttach(ctx, id, opts)
  409. if ctxErr := contextError(ctx); ctxErr != nil {
  410. return ctxErr
  411. }
  412. if err != nil {
  413. return err
  414. }
  415. defer resp.Close()
  416. return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
  417. }
  418. func (d *kubeDockerClient) ResizeExecTTY(id string, height, width int) error {
  419. ctx, cancel := d.getCancelableContext()
  420. defer cancel()
  421. return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{
  422. Height: height,
  423. Width: width,
  424. })
  425. }
  426. func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width int) error {
  427. ctx, cancel := d.getCancelableContext()
  428. defer cancel()
  429. return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{
  430. Height: height,
  431. Width: width,
  432. })
  433. }
  434. // redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will
  435. // only be redirected to stdout.
  436. func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
  437. if outputStream == nil {
  438. outputStream = ioutil.Discard
  439. }
  440. if errorStream == nil {
  441. errorStream = ioutil.Discard
  442. }
  443. var err error
  444. if tty {
  445. _, err = io.Copy(outputStream, resp)
  446. } else {
  447. _, err = dockerstdcopy.StdCopy(outputStream, errorStream, resp)
  448. }
  449. return err
  450. }
  451. // holdHijackedConnection hold the HijackedResponse, redirect the inputStream to the connection, and redirect the response
  452. // stream to stdout and stderr. NOTE: If needed, we could also add context in this function.
  453. func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error {
  454. receiveStdout := make(chan error)
  455. if outputStream != nil || errorStream != nil {
  456. go func() {
  457. receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader)
  458. }()
  459. }
  460. stdinDone := make(chan struct{})
  461. go func() {
  462. if inputStream != nil {
  463. io.Copy(resp.Conn, inputStream)
  464. }
  465. resp.CloseWrite()
  466. close(stdinDone)
  467. }()
  468. select {
  469. case err := <-receiveStdout:
  470. return err
  471. case <-stdinDone:
  472. if outputStream != nil || errorStream != nil {
  473. return <-receiveStdout
  474. }
  475. }
  476. return nil
  477. }
  478. // getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable
  479. // context to avoid potential resource leak, although the current implementation shouldn't leak resource.
  480. func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) {
  481. return context.WithCancel(context.Background())
  482. }
  483. // getTimeoutContext returns a new context with default request timeout
  484. func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) {
  485. return context.WithTimeout(context.Background(), d.timeout)
  486. }
  487. // getCustomTimeoutContext returns a new context with a specific request timeout
  488. func (d *kubeDockerClient) getCustomTimeoutContext(timeout time.Duration) (context.Context, context.CancelFunc) {
  489. // Pick the larger of the two
  490. if d.timeout > timeout {
  491. timeout = d.timeout
  492. }
  493. return context.WithTimeout(context.Background(), timeout)
  494. }
  495. // ParseDockerTimestamp parses the timestamp returned by DockerInterface from string to time.Time
  496. func ParseDockerTimestamp(s string) (time.Time, error) {
  497. // Timestamp returned by Docker is in time.RFC3339Nano format.
  498. return time.Parse(time.RFC3339Nano, s)
  499. }
  500. // contextError checks the context, and returns error if the context is timeout.
  501. func contextError(ctx context.Context) error {
  502. if ctx.Err() == context.DeadlineExceeded {
  503. return operationTimeout{err: ctx.Err()}
  504. }
  505. return ctx.Err()
  506. }
  507. // StreamOptions are the options used to configure the stream redirection
  508. type StreamOptions struct {
  509. RawTerminal bool
  510. InputStream io.Reader
  511. OutputStream io.Writer
  512. ErrorStream io.Writer
  513. }
  514. // operationTimeout is the error returned when the docker operations are timeout.
  515. type operationTimeout struct {
  516. err error
  517. }
  518. func (e operationTimeout) Error() string {
  519. return fmt.Sprintf("operation timeout: %v", e.err)
  520. }
  521. // containerNotFoundError is the error returned by InspectContainer when container not found. We
  522. // add this error type for testability. We don't use the original error returned by engine-api
  523. // because dockertypes.containerNotFoundError is private, we can't create and inject it in our test.
  524. type containerNotFoundError struct {
  525. ID string
  526. }
  527. func (e containerNotFoundError) Error() string {
  528. return fmt.Sprintf("no such container: %q", e.ID)
  529. }
  530. // imageNotFoundError is the error returned by InspectImage when image not found.
  531. type imageNotFoundError struct {
  532. ID string
  533. }
  534. func (e imageNotFoundError) Error() string {
  535. return fmt.Sprintf("no such image: %q", e.ID)
  536. }