123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package dockertools
- import (
- "fmt"
- "math/rand"
- "net/http"
- "path"
- "strconv"
- "strings"
- "time"
- dockerref "github.com/docker/distribution/reference"
- "github.com/docker/docker/pkg/jsonmessage"
- dockerapi "github.com/docker/engine-api/client"
- dockertypes "github.com/docker/engine-api/types"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/credentialprovider"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/images"
- "k8s.io/kubernetes/pkg/kubelet/leaky"
- "k8s.io/kubernetes/pkg/types"
- utilerrors "k8s.io/kubernetes/pkg/util/errors"
- "k8s.io/kubernetes/pkg/util/flowcontrol"
- "k8s.io/kubernetes/pkg/util/parsers"
- )
- const (
- PodInfraContainerName = leaky.PodInfraContainerName
- DockerPrefix = "docker://"
- LogSuffix = "log"
- ext4MaxFileNameLen = 255
- )
- const (
- // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
- minShares = 2
- sharesPerCPU = 1024
- milliCPUToCPU = 1000
- // 100000 is equivalent to 100ms
- quotaPeriod = 100000
- minQuotaPeriod = 1000
- )
- // DockerInterface is an abstract interface for testability. It abstracts the interface of docker client.
- type DockerInterface interface {
- ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
- InspectContainer(id string) (*dockertypes.ContainerJSON, error)
- CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
- StartContainer(id string) error
- StopContainer(id string, timeout int) error
- RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
- InspectImage(image string) (*dockertypes.ImageInspect, error)
- ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
- PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
- RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
- ImageHistory(id string) ([]dockertypes.ImageHistory, error)
- Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
- Version() (*dockertypes.Version, error)
- Info() (*dockertypes.Info, error)
- CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
- StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
- InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
- AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
- ResizeContainerTTY(id string, height, width int) error
- ResizeExecTTY(id string, height, width int) error
- }
- // KubeletContainerName encapsulates a pod name and a Kubernetes container name.
- type KubeletContainerName struct {
- PodFullName string
- PodUID types.UID
- ContainerName string
- }
- // containerNamePrefix is used to identify the containers on the node managed by this
- // process.
- var containerNamePrefix = "k8s"
- // SetContainerNamePrefix allows the container prefix name for this process to be changed.
- // This is intended to support testing and bootstrapping experimentation. It cannot be
- // changed once the Kubelet starts.
- func SetContainerNamePrefix(prefix string) {
- containerNamePrefix = prefix
- }
- // DockerPuller is an abstract interface for testability. It abstracts image pull operations.
- type DockerPuller interface {
- Pull(image string, secrets []api.Secret) error
- IsImagePresent(image string) (bool, error)
- }
- // dockerPuller is the default implementation of DockerPuller.
- type dockerPuller struct {
- client DockerInterface
- keyring credentialprovider.DockerKeyring
- }
- type throttledDockerPuller struct {
- puller dockerPuller
- limiter flowcontrol.RateLimiter
- }
- // newDockerPuller creates a new instance of the default implementation of DockerPuller.
- func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
- dp := dockerPuller{
- client: client,
- keyring: credentialprovider.NewDockerKeyring(),
- }
- if qps == 0.0 {
- return dp
- }
- return &throttledDockerPuller{
- puller: dp,
- limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
- }
- }
- func filterHTTPError(err error, image string) error {
- // docker/docker/pull/11314 prints detailed error info for docker pull.
- // When it hits 502, it returns a verbose html output including an inline svg,
- // which makes the output of kubectl get pods much harder to parse.
- // Here converts such verbose output to a concise one.
- jerr, ok := err.(*jsonmessage.JSONError)
- if ok && (jerr.Code == http.StatusBadGateway ||
- jerr.Code == http.StatusServiceUnavailable ||
- jerr.Code == http.StatusGatewayTimeout) {
- glog.V(2).Infof("Pulling image %q failed: %v", image, err)
- return images.RegistryUnavailable
- } else {
- return err
- }
- }
- // Check if the inspected image matches what we are looking for
- func matchImageTagOrSHA(inspected dockertypes.ImageInspect, image string) bool {
- // The image string follows the grammar specified here
- // https://github.com/docker/distribution/blob/master/reference/reference.go#L4
- named, err := dockerref.ParseNamed(image)
- if err != nil {
- glog.V(4).Infof("couldn't parse image reference %q: %v", image, err)
- return false
- }
- _, isTagged := named.(dockerref.Tagged)
- digest, isDigested := named.(dockerref.Digested)
- if !isTagged && !isDigested {
- // No Tag or SHA specified, so just return what we have
- return true
- }
- if isTagged {
- // Check the RepoTags for a match.
- for _, tag := range inspected.RepoTags {
- // An image name (without the tag/digest) can be [hostname '/'] component ['/' component]*
- // Because either the RepoTag or the name *may* contain the
- // hostname or not, we only check for the suffix match.
- if strings.HasSuffix(image, tag) || strings.HasSuffix(tag, image) {
- return true
- }
- }
- }
- if isDigested {
- algo := digest.Digest().Algorithm().String()
- sha := digest.Digest().Hex()
- // Look specifically for short and long sha(s)
- if strings.Contains(inspected.ID, algo+":"+sha) {
- // We found the short or long SHA requested
- return true
- }
- }
- glog.V(4).Infof("Inspected image (%q) does not match %s", inspected.ID, image)
- return false
- }
- // applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest,
- // a default tag will be applied.
- func applyDefaultImageTag(image string) (string, error) {
- named, err := dockerref.ParseNamed(image)
- if err != nil {
- return "", fmt.Errorf("couldn't parse image reference %q: %v", image, err)
- }
- _, isTagged := named.(dockerref.Tagged)
- _, isDigested := named.(dockerref.Digested)
- if !isTagged && !isDigested {
- named, err := dockerref.WithTag(named, parsers.DefaultImageTag)
- if err != nil {
- return "", fmt.Errorf("failed to apply default image tag %q: %v", image, err)
- }
- image = named.String()
- }
- return image, nil
- }
- func (p dockerPuller) Pull(image string, secrets []api.Secret) error {
- // If the image contains no tag or digest, a default tag should be applied.
- image, err := applyDefaultImageTag(image)
- if err != nil {
- return err
- }
- keyring, err := credentialprovider.MakeDockerKeyring(secrets, p.keyring)
- if err != nil {
- return err
- }
- // The only used image pull option RegistryAuth will be set in kube_docker_client
- opts := dockertypes.ImagePullOptions{}
- creds, haveCredentials := keyring.Lookup(image)
- if !haveCredentials {
- glog.V(1).Infof("Pulling image %s without credentials", image)
- err := p.client.PullImage(image, dockertypes.AuthConfig{}, opts)
- if err == nil {
- // Sometimes PullImage failed with no error returned.
- exist, ierr := p.IsImagePresent(image)
- if ierr != nil {
- glog.Warningf("Failed to inspect image %s: %v", image, ierr)
- }
- if !exist {
- return fmt.Errorf("image pull failed for unknown error")
- }
- return nil
- }
- // Image spec: [<registry>/]<repository>/<image>[:<version] so we count '/'
- explicitRegistry := (strings.Count(image, "/") == 2)
- // Hack, look for a private registry, and decorate the error with the lack of
- // credentials. This is heuristic, and really probably could be done better
- // by talking to the registry API directly from the kubelet here.
- if explicitRegistry {
- return fmt.Errorf("image pull failed for %s, this may be because there are no credentials on this request. details: (%v)", image, err)
- }
- return filterHTTPError(err, image)
- }
- var pullErrs []error
- for _, currentCreds := range creds {
- err = p.client.PullImage(image, credentialprovider.LazyProvide(currentCreds), opts)
- // If there was no error, return success
- if err == nil {
- return nil
- }
- pullErrs = append(pullErrs, filterHTTPError(err, image))
- }
- return utilerrors.NewAggregate(pullErrs)
- }
- func (p throttledDockerPuller) Pull(image string, secrets []api.Secret) error {
- if p.limiter.TryAccept() {
- return p.puller.Pull(image, secrets)
- }
- return fmt.Errorf("pull QPS exceeded.")
- }
- func (p dockerPuller) IsImagePresent(image string) (bool, error) {
- _, err := p.client.InspectImage(image)
- if err == nil {
- return true, nil
- }
- if _, ok := err.(imageNotFoundError); ok {
- return false, nil
- }
- return false, err
- }
- func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) {
- return p.puller.IsImagePresent(name)
- }
- // Creates a name which can be reversed to identify both full pod name and container name.
- // This function returns stable name, unique name and a unique id.
- // Although rand.Uint32() is not really unique, but it's enough for us because error will
- // only occur when instances of the same container in the same pod have the same UID. The
- // chance is really slim.
- func BuildDockerName(dockerName KubeletContainerName, container *api.Container) (string, string, string) {
- containerName := dockerName.ContainerName + "." + strconv.FormatUint(kubecontainer.HashContainer(container), 16)
- stableName := fmt.Sprintf("%s_%s_%s_%s",
- containerNamePrefix,
- containerName,
- dockerName.PodFullName,
- dockerName.PodUID)
- UID := fmt.Sprintf("%08x", rand.Uint32())
- return stableName, fmt.Sprintf("%s_%s", stableName, UID), UID
- }
- // Unpacks a container name, returning the pod full name and container name we would have used to
- // construct the docker name. If we are unable to parse the name, an error is returned.
- func ParseDockerName(name string) (dockerName *KubeletContainerName, hash uint64, err error) {
- // For some reason docker appears to be appending '/' to names.
- // If it's there, strip it.
- name = strings.TrimPrefix(name, "/")
- parts := strings.Split(name, "_")
- if len(parts) == 0 || parts[0] != containerNamePrefix {
- err = fmt.Errorf("failed to parse Docker container name %q into parts", name)
- return nil, 0, err
- }
- if len(parts) < 6 {
- // We have at least 5 fields. We may have more in the future.
- // Anything with less fields than this is not something we can
- // manage.
- glog.Warningf("found a container with the %q prefix, but too few fields (%d): %q", containerNamePrefix, len(parts), name)
- err = fmt.Errorf("Docker container name %q has less parts than expected %v", name, parts)
- return nil, 0, err
- }
- nameParts := strings.Split(parts[1], ".")
- containerName := nameParts[0]
- if len(nameParts) > 1 {
- hash, err = strconv.ParseUint(nameParts[1], 16, 32)
- if err != nil {
- glog.Warningf("invalid container hash %q in container %q", nameParts[1], name)
- }
- }
- podFullName := parts[2] + "_" + parts[3]
- podUID := types.UID(parts[4])
- return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil
- }
- func LogSymlink(containerLogsDir, podFullName, containerName, dockerId string) string {
- suffix := fmt.Sprintf(".%s", LogSuffix)
- logPath := fmt.Sprintf("%s_%s-%s", podFullName, containerName, dockerId)
- // Length of a filename cannot exceed 255 characters in ext4 on Linux.
- if len(logPath) > ext4MaxFileNameLen-len(suffix) {
- logPath = logPath[:ext4MaxFileNameLen-len(suffix)]
- }
- return path.Join(containerLogsDir, logPath+suffix)
- }
- // Get a *dockerapi.Client, either using the endpoint passed in, or using
- // DOCKER_HOST, DOCKER_TLS_VERIFY, and DOCKER_CERT path per their spec
- func getDockerClient(dockerEndpoint string) (*dockerapi.Client, error) {
- if len(dockerEndpoint) > 0 {
- glog.Infof("Connecting to docker on %s", dockerEndpoint)
- return dockerapi.NewClient(dockerEndpoint, "", nil, nil)
- }
- return dockerapi.NewEnvClient()
- }
- // ConnectToDockerOrDie creates docker client connecting to docker daemon.
- // If the endpoint passed in is "fake://", a fake docker client
- // will be returned. The program exits if error occurs. The requestTimeout
- // is the timeout for docker requests. If timeout is exceeded, the request
- // will be cancelled and throw out an error. If requestTimeout is 0, a default
- // value will be applied.
- func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
- if dockerEndpoint == "fake://" {
- return NewFakeDockerClient()
- }
- client, err := getDockerClient(dockerEndpoint)
- if err != nil {
- glog.Fatalf("Couldn't connect to docker: %v", err)
- }
- glog.Infof("Start docker client with request timeout=%v", requestTimeout)
- return newKubeDockerClient(client, requestTimeout)
- }
- // milliCPUToQuota converts milliCPU to CFS quota and period values
- func milliCPUToQuota(milliCPU int64) (quota int64, period int64) {
- // CFS quota is measured in two values:
- // - cfs_period_us=100ms (the amount of time to measure usage across)
- // - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
- // so in the above example, you are limited to 20% of a single CPU
- // for multi-cpu environments, you just scale equivalent amounts
- if milliCPU == 0 {
- // take the default behavior from docker
- return
- }
- // we set the period to 100ms by default
- period = quotaPeriod
- // we then convert your milliCPU to a value normalized over a period
- quota = (milliCPU * quotaPeriod) / milliCPUToCPU
- // quota needs to be a minimum of 1ms.
- if quota < minQuotaPeriod {
- quota = minQuotaPeriod
- }
- return
- }
- func milliCPUToShares(milliCPU int64) int64 {
- if milliCPU == 0 {
- // Docker converts zero milliCPU to unset, which maps to kernel default
- // for unset: 1024. Return 2 here to really match kernel default for
- // zero milliCPU.
- return minShares
- }
- // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
- shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
- if shares < minShares {
- return minShares
- }
- return shares
- }
- // GetKubeletDockerContainers lists all container or just the running ones.
- // Returns a list of docker containers that we manage
- func GetKubeletDockerContainers(client DockerInterface, allContainers bool) ([]*dockertypes.Container, error) {
- result := []*dockertypes.Container{}
- containers, err := client.ListContainers(dockertypes.ContainerListOptions{All: allContainers})
- if err != nil {
- return nil, err
- }
- for i := range containers {
- container := &containers[i]
- if len(container.Names) == 0 {
- continue
- }
- // Skip containers that we didn't create to allow users to manually
- // spin up their own containers if they want.
- if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") {
- glog.V(5).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
- continue
- }
- result = append(result, container)
- }
- return result, nil
- }
|