kuberuntime_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kuberuntime
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. "os"
  19. "github.com/coreos/go-semver/semver"
  20. "github.com/golang/glog"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/client/record"
  23. "k8s.io/kubernetes/pkg/credentialprovider"
  24. internalApi "k8s.io/kubernetes/pkg/kubelet/api"
  25. runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
  26. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  27. "k8s.io/kubernetes/pkg/kubelet/images"
  28. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  29. "k8s.io/kubernetes/pkg/kubelet/network"
  30. proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
  31. "k8s.io/kubernetes/pkg/kubelet/types"
  32. kubetypes "k8s.io/kubernetes/pkg/types"
  33. "k8s.io/kubernetes/pkg/util/flowcontrol"
  34. )
  35. const (
  36. // The api version of kubelet runtime api
  37. kubeRuntimeAPIVersion = "0.1.0"
  38. // The root directory for pod logs
  39. podLogsRootDirectory = "/var/log/pods"
  40. )
  41. var (
  42. // ErrVersionNotSupported is returned when the api version of runtime interface is not supported
  43. ErrVersionNotSupported = errors.New("Runtime api version is not supported")
  44. )
  45. type kubeGenericRuntimeManager struct {
  46. runtimeName string
  47. recorder record.EventRecorder
  48. osInterface kubecontainer.OSInterface
  49. containerRefManager *kubecontainer.RefManager
  50. // Keyring for pulling images
  51. keyring credentialprovider.DockerKeyring
  52. // Runner of lifecycle events.
  53. runner kubecontainer.HandlerRunner
  54. // RuntimeHelper that wraps kubelet to generate runtime container options.
  55. runtimeHelper kubecontainer.RuntimeHelper
  56. // Health check results.
  57. livenessManager proberesults.Manager
  58. // If true, enforce container cpu limits with CFS quota support
  59. cpuCFSQuota bool
  60. // Network plugin.
  61. networkPlugin network.NetworkPlugin
  62. // wrapped image puller.
  63. imagePuller images.ImageManager
  64. // gRPC service clients
  65. runtimeService internalApi.RuntimeService
  66. imageService internalApi.ImageManagerService
  67. }
  68. // NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
  69. func NewKubeGenericRuntimeManager(
  70. recorder record.EventRecorder,
  71. livenessManager proberesults.Manager,
  72. containerRefManager *kubecontainer.RefManager,
  73. osInterface kubecontainer.OSInterface,
  74. networkPlugin network.NetworkPlugin,
  75. runtimeHelper kubecontainer.RuntimeHelper,
  76. httpClient types.HttpGetter,
  77. imageBackOff *flowcontrol.Backoff,
  78. serializeImagePulls bool,
  79. cpuCFSQuota bool,
  80. runtimeService internalApi.RuntimeService,
  81. imageService internalApi.ImageManagerService,
  82. ) (kubecontainer.Runtime, error) {
  83. kubeRuntimeManager := &kubeGenericRuntimeManager{
  84. recorder: recorder,
  85. cpuCFSQuota: cpuCFSQuota,
  86. livenessManager: livenessManager,
  87. containerRefManager: containerRefManager,
  88. osInterface: osInterface,
  89. networkPlugin: networkPlugin,
  90. runtimeHelper: runtimeHelper,
  91. runtimeService: runtimeService,
  92. imageService: imageService,
  93. keyring: credentialprovider.NewDockerKeyring(),
  94. }
  95. typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
  96. if err != nil {
  97. glog.Errorf("Get runtime version failed: %v", err)
  98. return nil, err
  99. }
  100. // Only matching kubeRuntimeAPIVersion is supported now
  101. // TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
  102. if typedVersion.GetVersion() != kubeRuntimeAPIVersion {
  103. glog.Errorf("Runtime api version %s is not supported, only %s is supported now",
  104. typedVersion.GetVersion(),
  105. kubeRuntimeAPIVersion)
  106. return nil, ErrVersionNotSupported
  107. }
  108. kubeRuntimeManager.runtimeName = typedVersion.GetRuntimeName()
  109. glog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
  110. typedVersion.GetRuntimeName(),
  111. typedVersion.GetRuntimeVersion(),
  112. typedVersion.GetRuntimeApiVersion())
  113. // If the container logs directory does not exist, create it.
  114. // TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
  115. // new runtime interface
  116. if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
  117. if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
  118. glog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
  119. }
  120. }
  121. kubeRuntimeManager.imagePuller = images.NewImageManager(
  122. kubecontainer.FilterEventRecorder(recorder),
  123. kubeRuntimeManager,
  124. imageBackOff,
  125. serializeImagePulls)
  126. kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
  127. return kubeRuntimeManager, nil
  128. }
  129. // Type returns the type of the container runtime.
  130. func (m *kubeGenericRuntimeManager) Type() string {
  131. return m.runtimeName
  132. }
  133. // runtimeVersion implements kubecontainer.Version interface by implementing
  134. // Compare() and String()
  135. type runtimeVersion struct {
  136. *semver.Version
  137. }
  138. func newRuntimeVersion(version string) (runtimeVersion, error) {
  139. sem, err := semver.NewVersion(version)
  140. if err != nil {
  141. return runtimeVersion{}, err
  142. }
  143. return runtimeVersion{sem}, nil
  144. }
  145. func (r runtimeVersion) Compare(other string) (int, error) {
  146. v, err := semver.NewVersion(other)
  147. if err != nil {
  148. return -1, err
  149. }
  150. if r.LessThan(*v) {
  151. return -1, nil
  152. }
  153. if v.LessThan(*r.Version) {
  154. return 1, nil
  155. }
  156. return 0, nil
  157. }
  158. // Version returns the version information of the container runtime.
  159. func (m *kubeGenericRuntimeManager) Version() (kubecontainer.Version, error) {
  160. typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
  161. if err != nil {
  162. glog.Errorf("Get remote runtime version failed: %v", err)
  163. return nil, err
  164. }
  165. return newRuntimeVersion(typedVersion.GetVersion())
  166. }
  167. // APIVersion returns the cached API version information of the container
  168. // runtime. Implementation is expected to update this cache periodically.
  169. // This may be different from the runtime engine's version.
  170. func (m *kubeGenericRuntimeManager) APIVersion() (kubecontainer.Version, error) {
  171. typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
  172. if err != nil {
  173. glog.Errorf("Get remote runtime version failed: %v", err)
  174. return nil, err
  175. }
  176. return newRuntimeVersion(typedVersion.GetRuntimeApiVersion())
  177. }
  178. // Status returns error if the runtime is unhealthy; nil otherwise.
  179. func (m *kubeGenericRuntimeManager) Status() error {
  180. _, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
  181. if err != nil {
  182. glog.Errorf("Checkout remote runtime status failed: %v", err)
  183. return err
  184. }
  185. return nil
  186. }
  187. // GetPods returns a list of containers grouped by pods. The boolean parameter
  188. // specifies whether the runtime returns all containers including those already
  189. // exited and dead containers (used for garbage collection).
  190. func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
  191. pods := make(map[kubetypes.UID]*kubecontainer.Pod)
  192. sandboxes, err := m.getKubeletSandboxes(all)
  193. if err != nil {
  194. return nil, err
  195. }
  196. for _, s := range sandboxes {
  197. podUID := kubetypes.UID(s.Metadata.GetUid())
  198. pods[podUID] = &kubecontainer.Pod{
  199. ID: podUID,
  200. Name: s.Metadata.GetName(),
  201. Namespace: s.Metadata.GetNamespace(),
  202. }
  203. }
  204. containers, err := m.getKubeletContainers(all)
  205. if err != nil {
  206. return nil, err
  207. }
  208. for _, c := range containers {
  209. labelledInfo := getContainerInfoFromLabels(c.Labels)
  210. pod, found := pods[labelledInfo.PodUID]
  211. if !found {
  212. pod = &kubecontainer.Pod{
  213. ID: labelledInfo.PodUID,
  214. Name: labelledInfo.PodName,
  215. Namespace: labelledInfo.PodNamespace,
  216. }
  217. pods[labelledInfo.PodUID] = pod
  218. }
  219. converted, err := m.toKubeContainer(c)
  220. if err != nil {
  221. glog.Warningf("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUID, err)
  222. continue
  223. }
  224. pod.Containers = append(pod.Containers, converted)
  225. }
  226. // Convert map to list.
  227. var result []*kubecontainer.Pod
  228. for _, pod := range pods {
  229. result = append(result, pod)
  230. }
  231. return result, nil
  232. }
  233. // SyncPod syncs the running pod into the desired pod.
  234. func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus,
  235. podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret,
  236. backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  237. result.Fail(fmt.Errorf("not implemented"))
  238. return
  239. }
  240. // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
  241. // gracePeriodOverride if specified allows the caller to override the pod default grace period.
  242. // only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
  243. // it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
  244. func (m *kubeGenericRuntimeManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
  245. return fmt.Errorf("not implemented")
  246. }
  247. // GetPodStatus retrieves the status of the pod, including the
  248. // information of all containers in the pod that are visble in Runtime.
  249. func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
  250. return nil, fmt.Errorf("not implemented")
  251. }
  252. // Returns the filesystem path of the pod's network namespace; if the
  253. // runtime does not handle namespace creation itself, or cannot return
  254. // the network namespace path, it returns an 'not supported' error.
  255. // TODO: Rename param name to sandboxID in kubecontainer.Runtime.GetNetNS().
  256. // TODO: Remove GetNetNS after networking is delegated to the container runtime.
  257. func (m *kubeGenericRuntimeManager) GetNetNS(sandboxID kubecontainer.ContainerID) (string, error) {
  258. readyState := runtimeApi.PodSandBoxState_READY
  259. filter := &runtimeApi.PodSandboxFilter{
  260. State: &readyState,
  261. Id: &sandboxID.ID,
  262. LabelSelector: map[string]string{kubernetesManagedLabel: "true"},
  263. }
  264. sandboxes, err := m.runtimeService.ListPodSandbox(filter)
  265. if err != nil {
  266. glog.Errorf("ListPodSandbox with filter %q failed: %v", filter, err)
  267. return "", err
  268. }
  269. if len(sandboxes) == 0 {
  270. glog.Errorf("No sandbox is found with filter %q", filter)
  271. return "", fmt.Errorf("Sandbox %q is not found", sandboxID)
  272. }
  273. sandboxStatus, err := m.runtimeService.PodSandboxStatus(sandboxes[0].GetId())
  274. if err != nil {
  275. glog.Errorf("PodSandboxStatus with id %q failed: %v", sandboxes[0].GetId(), err)
  276. return "", err
  277. }
  278. if sandboxStatus.Linux != nil && sandboxStatus.Linux.Namespaces != nil {
  279. return sandboxStatus.Linux.Namespaces.GetNetwork(), nil
  280. }
  281. return "", fmt.Errorf("not supported")
  282. }
  283. // GetPodContainerID gets pod sandbox ID
  284. func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
  285. return kubecontainer.ContainerID{}, fmt.Errorf("not implemented")
  286. }
  287. // Forward the specified port from the specified pod to the stream.
  288. func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
  289. return fmt.Errorf("not implemented")
  290. }
  291. // GarbageCollect removes dead containers using the specified container gc policy
  292. func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
  293. return fmt.Errorf("not implemented")
  294. }