docker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package dockertools
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "net/http"
  18. "path"
  19. "strconv"
  20. "strings"
  21. "time"
  22. dockerref "github.com/docker/distribution/reference"
  23. "github.com/docker/docker/pkg/jsonmessage"
  24. dockerapi "github.com/docker/engine-api/client"
  25. dockertypes "github.com/docker/engine-api/types"
  26. "github.com/golang/glog"
  27. "k8s.io/kubernetes/pkg/api"
  28. "k8s.io/kubernetes/pkg/credentialprovider"
  29. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  30. "k8s.io/kubernetes/pkg/kubelet/images"
  31. "k8s.io/kubernetes/pkg/kubelet/leaky"
  32. "k8s.io/kubernetes/pkg/types"
  33. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  34. "k8s.io/kubernetes/pkg/util/flowcontrol"
  35. "k8s.io/kubernetes/pkg/util/parsers"
  36. )
  37. const (
  38. PodInfraContainerName = leaky.PodInfraContainerName
  39. DockerPrefix = "docker://"
  40. LogSuffix = "log"
  41. ext4MaxFileNameLen = 255
  42. )
  43. const (
  44. // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
  45. minShares = 2
  46. sharesPerCPU = 1024
  47. milliCPUToCPU = 1000
  48. // 100000 is equivalent to 100ms
  49. quotaPeriod = 100000
  50. minQuotaPeriod = 1000
  51. )
  52. // DockerInterface is an abstract interface for testability. It abstracts the interface of docker client.
  53. type DockerInterface interface {
  54. ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
  55. InspectContainer(id string) (*dockertypes.ContainerJSON, error)
  56. CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
  57. StartContainer(id string) error
  58. StopContainer(id string, timeout int) error
  59. RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
  60. InspectImage(image string) (*dockertypes.ImageInspect, error)
  61. ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
  62. PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
  63. RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
  64. ImageHistory(id string) ([]dockertypes.ImageHistory, error)
  65. Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
  66. Version() (*dockertypes.Version, error)
  67. Info() (*dockertypes.Info, error)
  68. CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
  69. StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
  70. InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
  71. AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
  72. ResizeContainerTTY(id string, height, width int) error
  73. ResizeExecTTY(id string, height, width int) error
  74. }
  75. // KubeletContainerName encapsulates a pod name and a Kubernetes container name.
  76. type KubeletContainerName struct {
  77. PodFullName string
  78. PodUID types.UID
  79. ContainerName string
  80. }
  81. // containerNamePrefix is used to identify the containers on the node managed by this
  82. // process.
  83. var containerNamePrefix = "k8s"
  84. // SetContainerNamePrefix allows the container prefix name for this process to be changed.
  85. // This is intended to support testing and bootstrapping experimentation. It cannot be
  86. // changed once the Kubelet starts.
  87. func SetContainerNamePrefix(prefix string) {
  88. containerNamePrefix = prefix
  89. }
  90. // DockerPuller is an abstract interface for testability. It abstracts image pull operations.
  91. type DockerPuller interface {
  92. Pull(image string, secrets []api.Secret) error
  93. IsImagePresent(image string) (bool, error)
  94. }
  95. // dockerPuller is the default implementation of DockerPuller.
  96. type dockerPuller struct {
  97. client DockerInterface
  98. keyring credentialprovider.DockerKeyring
  99. }
  100. type throttledDockerPuller struct {
  101. puller dockerPuller
  102. limiter flowcontrol.RateLimiter
  103. }
  104. // newDockerPuller creates a new instance of the default implementation of DockerPuller.
  105. func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
  106. dp := dockerPuller{
  107. client: client,
  108. keyring: credentialprovider.NewDockerKeyring(),
  109. }
  110. if qps == 0.0 {
  111. return dp
  112. }
  113. return &throttledDockerPuller{
  114. puller: dp,
  115. limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
  116. }
  117. }
  118. func filterHTTPError(err error, image string) error {
  119. // docker/docker/pull/11314 prints detailed error info for docker pull.
  120. // When it hits 502, it returns a verbose html output including an inline svg,
  121. // which makes the output of kubectl get pods much harder to parse.
  122. // Here converts such verbose output to a concise one.
  123. jerr, ok := err.(*jsonmessage.JSONError)
  124. if ok && (jerr.Code == http.StatusBadGateway ||
  125. jerr.Code == http.StatusServiceUnavailable ||
  126. jerr.Code == http.StatusGatewayTimeout) {
  127. glog.V(2).Infof("Pulling image %q failed: %v", image, err)
  128. return images.RegistryUnavailable
  129. } else {
  130. return err
  131. }
  132. }
  133. // Check if the inspected image matches what we are looking for
  134. func matchImageTagOrSHA(inspected dockertypes.ImageInspect, image string) bool {
  135. // The image string follows the grammar specified here
  136. // https://github.com/docker/distribution/blob/master/reference/reference.go#L4
  137. named, err := dockerref.ParseNamed(image)
  138. if err != nil {
  139. glog.V(4).Infof("couldn't parse image reference %q: %v", image, err)
  140. return false
  141. }
  142. _, isTagged := named.(dockerref.Tagged)
  143. digest, isDigested := named.(dockerref.Digested)
  144. if !isTagged && !isDigested {
  145. // No Tag or SHA specified, so just return what we have
  146. return true
  147. }
  148. if isTagged {
  149. // Check the RepoTags for a match.
  150. for _, tag := range inspected.RepoTags {
  151. // An image name (without the tag/digest) can be [hostname '/'] component ['/' component]*
  152. // Because either the RepoTag or the name *may* contain the
  153. // hostname or not, we only check for the suffix match.
  154. if strings.HasSuffix(image, tag) || strings.HasSuffix(tag, image) {
  155. return true
  156. }
  157. }
  158. }
  159. if isDigested {
  160. algo := digest.Digest().Algorithm().String()
  161. sha := digest.Digest().Hex()
  162. // Look specifically for short and long sha(s)
  163. if strings.Contains(inspected.ID, algo+":"+sha) {
  164. // We found the short or long SHA requested
  165. return true
  166. }
  167. }
  168. glog.V(4).Infof("Inspected image (%q) does not match %s", inspected.ID, image)
  169. return false
  170. }
  171. // applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest,
  172. // a default tag will be applied.
  173. func applyDefaultImageTag(image string) (string, error) {
  174. named, err := dockerref.ParseNamed(image)
  175. if err != nil {
  176. return "", fmt.Errorf("couldn't parse image reference %q: %v", image, err)
  177. }
  178. _, isTagged := named.(dockerref.Tagged)
  179. _, isDigested := named.(dockerref.Digested)
  180. if !isTagged && !isDigested {
  181. named, err := dockerref.WithTag(named, parsers.DefaultImageTag)
  182. if err != nil {
  183. return "", fmt.Errorf("failed to apply default image tag %q: %v", image, err)
  184. }
  185. image = named.String()
  186. }
  187. return image, nil
  188. }
  189. func (p dockerPuller) Pull(image string, secrets []api.Secret) error {
  190. // If the image contains no tag or digest, a default tag should be applied.
  191. image, err := applyDefaultImageTag(image)
  192. if err != nil {
  193. return err
  194. }
  195. keyring, err := credentialprovider.MakeDockerKeyring(secrets, p.keyring)
  196. if err != nil {
  197. return err
  198. }
  199. // The only used image pull option RegistryAuth will be set in kube_docker_client
  200. opts := dockertypes.ImagePullOptions{}
  201. creds, haveCredentials := keyring.Lookup(image)
  202. if !haveCredentials {
  203. glog.V(1).Infof("Pulling image %s without credentials", image)
  204. err := p.client.PullImage(image, dockertypes.AuthConfig{}, opts)
  205. if err == nil {
  206. // Sometimes PullImage failed with no error returned.
  207. exist, ierr := p.IsImagePresent(image)
  208. if ierr != nil {
  209. glog.Warningf("Failed to inspect image %s: %v", image, ierr)
  210. }
  211. if !exist {
  212. return fmt.Errorf("image pull failed for unknown error")
  213. }
  214. return nil
  215. }
  216. // Image spec: [<registry>/]<repository>/<image>[:<version] so we count '/'
  217. explicitRegistry := (strings.Count(image, "/") == 2)
  218. // Hack, look for a private registry, and decorate the error with the lack of
  219. // credentials. This is heuristic, and really probably could be done better
  220. // by talking to the registry API directly from the kubelet here.
  221. if explicitRegistry {
  222. return fmt.Errorf("image pull failed for %s, this may be because there are no credentials on this request. details: (%v)", image, err)
  223. }
  224. return filterHTTPError(err, image)
  225. }
  226. var pullErrs []error
  227. for _, currentCreds := range creds {
  228. err = p.client.PullImage(image, credentialprovider.LazyProvide(currentCreds), opts)
  229. // If there was no error, return success
  230. if err == nil {
  231. return nil
  232. }
  233. pullErrs = append(pullErrs, filterHTTPError(err, image))
  234. }
  235. return utilerrors.NewAggregate(pullErrs)
  236. }
  237. func (p throttledDockerPuller) Pull(image string, secrets []api.Secret) error {
  238. if p.limiter.TryAccept() {
  239. return p.puller.Pull(image, secrets)
  240. }
  241. return fmt.Errorf("pull QPS exceeded.")
  242. }
  243. func (p dockerPuller) IsImagePresent(image string) (bool, error) {
  244. _, err := p.client.InspectImage(image)
  245. if err == nil {
  246. return true, nil
  247. }
  248. if _, ok := err.(imageNotFoundError); ok {
  249. return false, nil
  250. }
  251. return false, err
  252. }
  253. func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) {
  254. return p.puller.IsImagePresent(name)
  255. }
  256. // Creates a name which can be reversed to identify both full pod name and container name.
  257. // This function returns stable name, unique name and a unique id.
  258. // Although rand.Uint32() is not really unique, but it's enough for us because error will
  259. // only occur when instances of the same container in the same pod have the same UID. The
  260. // chance is really slim.
  261. func BuildDockerName(dockerName KubeletContainerName, container *api.Container) (string, string, string) {
  262. containerName := dockerName.ContainerName + "." + strconv.FormatUint(kubecontainer.HashContainer(container), 16)
  263. stableName := fmt.Sprintf("%s_%s_%s_%s",
  264. containerNamePrefix,
  265. containerName,
  266. dockerName.PodFullName,
  267. dockerName.PodUID)
  268. UID := fmt.Sprintf("%08x", rand.Uint32())
  269. return stableName, fmt.Sprintf("%s_%s", stableName, UID), UID
  270. }
  271. // Unpacks a container name, returning the pod full name and container name we would have used to
  272. // construct the docker name. If we are unable to parse the name, an error is returned.
  273. func ParseDockerName(name string) (dockerName *KubeletContainerName, hash uint64, err error) {
  274. // For some reason docker appears to be appending '/' to names.
  275. // If it's there, strip it.
  276. name = strings.TrimPrefix(name, "/")
  277. parts := strings.Split(name, "_")
  278. if len(parts) == 0 || parts[0] != containerNamePrefix {
  279. err = fmt.Errorf("failed to parse Docker container name %q into parts", name)
  280. return nil, 0, err
  281. }
  282. if len(parts) < 6 {
  283. // We have at least 5 fields. We may have more in the future.
  284. // Anything with less fields than this is not something we can
  285. // manage.
  286. glog.Warningf("found a container with the %q prefix, but too few fields (%d): %q", containerNamePrefix, len(parts), name)
  287. err = fmt.Errorf("Docker container name %q has less parts than expected %v", name, parts)
  288. return nil, 0, err
  289. }
  290. nameParts := strings.Split(parts[1], ".")
  291. containerName := nameParts[0]
  292. if len(nameParts) > 1 {
  293. hash, err = strconv.ParseUint(nameParts[1], 16, 32)
  294. if err != nil {
  295. glog.Warningf("invalid container hash %q in container %q", nameParts[1], name)
  296. }
  297. }
  298. podFullName := parts[2] + "_" + parts[3]
  299. podUID := types.UID(parts[4])
  300. return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil
  301. }
  302. func LogSymlink(containerLogsDir, podFullName, containerName, dockerId string) string {
  303. suffix := fmt.Sprintf(".%s", LogSuffix)
  304. logPath := fmt.Sprintf("%s_%s-%s", podFullName, containerName, dockerId)
  305. // Length of a filename cannot exceed 255 characters in ext4 on Linux.
  306. if len(logPath) > ext4MaxFileNameLen-len(suffix) {
  307. logPath = logPath[:ext4MaxFileNameLen-len(suffix)]
  308. }
  309. return path.Join(containerLogsDir, logPath+suffix)
  310. }
  311. // Get a *dockerapi.Client, either using the endpoint passed in, or using
  312. // DOCKER_HOST, DOCKER_TLS_VERIFY, and DOCKER_CERT path per their spec
  313. func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
  314. if len(dockerEndpoint) > 0 {
  315. glog.Infof("Connecting to docker on %s", dockerEndpoint)
  316. return dockerapi.NewClient(dockerEndpoint, "", nil, nil)
  317. }
  318. return dockerapi.NewEnvClient()
  319. }
  320. // ConnectToDockerOrDie creates docker client connecting to docker daemon.
  321. // If the endpoint passed in is "fake://", a fake docker client
  322. // will be returned. The program exits if error occurs. The requestTimeout
  323. // is the timeout for docker requests. If timeout is exceeded, the request
  324. // will be cancelled and throw out an error. If requestTimeout is 0, a default
  325. // value will be applied.
  326. func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
  327. if dockerEndpoint == "fake://" {
  328. return NewFakeDockerClient()
  329. }
  330. client, err := getDockerClient(dockerEndpoint)
  331. if err != nil {
  332. glog.Fatalf("Couldn't connect to docker: %v", err)
  333. }
  334. glog.Infof("Start docker client with request timeout=%v", requestTimeout)
  335. return newKubeDockerClient(client, requestTimeout)
  336. }
  337. // milliCPUToQuota converts milliCPU to CFS quota and period values
  338. func milliCPUToQuota(milliCPU int64) (quota int64, period int64) {
  339. // CFS quota is measured in two values:
  340. // - cfs_period_us=100ms (the amount of time to measure usage across)
  341. // - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
  342. // so in the above example, you are limited to 20% of a single CPU
  343. // for multi-cpu environments, you just scale equivalent amounts
  344. if milliCPU == 0 {
  345. // take the default behavior from docker
  346. return
  347. }
  348. // we set the period to 100ms by default
  349. period = quotaPeriod
  350. // we then convert your milliCPU to a value normalized over a period
  351. quota = (milliCPU * quotaPeriod) / milliCPUToCPU
  352. // quota needs to be a minimum of 1ms.
  353. if quota < minQuotaPeriod {
  354. quota = minQuotaPeriod
  355. }
  356. return
  357. }
  358. func milliCPUToShares(milliCPU int64) int64 {
  359. if milliCPU == 0 {
  360. // Docker converts zero milliCPU to unset, which maps to kernel default
  361. // for unset: 1024. Return 2 here to really match kernel default for
  362. // zero milliCPU.
  363. return minShares
  364. }
  365. // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
  366. shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
  367. if shares < minShares {
  368. return minShares
  369. }
  370. return shares
  371. }
  372. // GetKubeletDockerContainers lists all container or just the running ones.
  373. // Returns a list of docker containers that we manage
  374. func GetKubeletDockerContainers(client DockerInterface, allContainers bool) ([]*dockertypes.Container, error) {
  375. result := []*dockertypes.Container{}
  376. containers, err := client.ListContainers(dockertypes.ContainerListOptions{All: allContainers})
  377. if err != nil {
  378. return nil, err
  379. }
  380. for i := range containers {
  381. container := &containers[i]
  382. if len(container.Names) == 0 {
  383. continue
  384. }
  385. // Skip containers that we didn't create to allow users to manually
  386. // spin up their own containers if they want.
  387. if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") {
  388. glog.V(5).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
  389. continue
  390. }
  391. result = append(result, container)
  392. }
  393. return result, nil
  394. }