rkt.go 77 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rkt
  14. import (
  15. "bufio"
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "os"
  22. "os/exec"
  23. "path"
  24. "path/filepath"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "syscall"
  30. "time"
  31. appcschema "github.com/appc/spec/schema"
  32. appctypes "github.com/appc/spec/schema/types"
  33. "github.com/coreos/go-systemd/unit"
  34. rktapi "github.com/coreos/rkt/api/v1alpha"
  35. "github.com/golang/glog"
  36. "golang.org/x/net/context"
  37. "google.golang.org/grpc"
  38. "k8s.io/kubernetes/pkg/api"
  39. "k8s.io/kubernetes/pkg/client/record"
  40. "k8s.io/kubernetes/pkg/credentialprovider"
  41. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  42. "k8s.io/kubernetes/pkg/kubelet/events"
  43. "k8s.io/kubernetes/pkg/kubelet/images"
  44. "k8s.io/kubernetes/pkg/kubelet/leaky"
  45. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  46. "k8s.io/kubernetes/pkg/kubelet/network"
  47. "k8s.io/kubernetes/pkg/kubelet/network/hairpin"
  48. proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
  49. "k8s.io/kubernetes/pkg/kubelet/types"
  50. "k8s.io/kubernetes/pkg/kubelet/util/format"
  51. "k8s.io/kubernetes/pkg/securitycontext"
  52. kubetypes "k8s.io/kubernetes/pkg/types"
  53. "k8s.io/kubernetes/pkg/util/errors"
  54. utilexec "k8s.io/kubernetes/pkg/util/exec"
  55. "k8s.io/kubernetes/pkg/util/flowcontrol"
  56. "k8s.io/kubernetes/pkg/util/selinux"
  57. utilstrings "k8s.io/kubernetes/pkg/util/strings"
  58. "k8s.io/kubernetes/pkg/util/term"
  59. "k8s.io/kubernetes/pkg/util/uuid"
  60. utilwait "k8s.io/kubernetes/pkg/util/wait"
  61. )
  62. const (
  63. RktType = "rkt"
  64. DefaultRktAPIServiceEndpoint = "localhost:15441"
  65. minimumRktBinVersion = "1.13.0"
  66. recommendedRktBinVersion = "1.13.0"
  67. minimumRktApiVersion = "1.0.0-alpha"
  68. minimumSystemdVersion = "219"
  69. systemdServiceDir = "/run/systemd/system"
  70. rktDataDir = "/var/lib/rkt"
  71. rktLocalConfigDir = "/etc/rkt"
  72. kubernetesUnitPrefix = "k8s_"
  73. unitKubernetesSection = "X-Kubernetes"
  74. unitPodUID = "PodUID"
  75. unitPodName = "PodName"
  76. unitPodNamespace = "PodNamespace"
  77. unitPodHostNetwork = "PodHostNetwork"
  78. k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
  79. k8sRktKubeletAnnoValue = "true"
  80. k8sRktContainerHashAnno = "rkt.kubernetes.io/container-hash"
  81. k8sRktRestartCountAnno = "rkt.kubernetes.io/restart-count"
  82. k8sRktTerminationMessagePathAnno = "rkt.kubernetes.io/termination-message-path"
  83. // TODO(euank): This has significant security concerns as a stage1 image is
  84. // effectively root.
  85. // Furthermore, this (using an annotation) is a hack to pass an extra
  86. // non-portable argument in. It should not be relied on to be stable.
  87. // In the future, this might be subsumed by a first-class api object, or by a
  88. // kitchen-sink params object (#17064).
  89. // See discussion in #23944
  90. // Also, do we want more granularity than path-at-the-kubelet-level and
  91. // image/name-at-the-pod-level?
  92. k8sRktStage1NameAnno = "rkt.alpha.kubernetes.io/stage1-name-override"
  93. dockerPrefix = "docker://"
  94. authDir = "auth.d"
  95. dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}`
  96. defaultRktAPIServiceAddr = "localhost:15441"
  97. // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
  98. // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative.
  99. // hence, setting ndots to be 5.
  100. // TODO(yifan): Move this and dockertools.ndotsDNSOption to a common package.
  101. defaultDNSOption = "ndots:5"
  102. // Annotations for the ENTRYPOINT and CMD for an ACI that's converted from Docker image.
  103. // TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133.
  104. appcDockerEntrypoint = "appc.io/docker/entrypoint"
  105. appcDockerCmd = "appc.io/docker/cmd"
  106. // TODO(yifan): Reuse this const with Docker runtime.
  107. minimumGracePeriodInSeconds = 2
  108. // The network name of the network when no-op plugin is being used.
  109. // TODO(yifan): This is not ideal since today we cannot make the rkt's 'net.d' dir point to the
  110. // CNI directory specified by kubelet. Once that is fixed, we can just use the network config
  111. // under the CNI directory directly.
  112. // See https://github.com/coreos/rkt/pull/2312#issuecomment-200068370.
  113. defaultNetworkName = "rkt.kubernetes.io"
  114. // defaultRequestTimeout is the default timeout of rkt requests.
  115. defaultRequestTimeout = 2 * time.Minute
  116. )
  117. // Runtime implements the Containerruntime for rkt. The implementation
  118. // uses systemd, so in order to run this runtime, systemd must be installed
  119. // on the machine.
  120. type Runtime struct {
  121. cli cliInterface
  122. systemd systemdInterface
  123. // The grpc client for rkt api-service.
  124. apisvcConn *grpc.ClientConn
  125. apisvc rktapi.PublicAPIClient
  126. config *Config
  127. // TODO(yifan): Refactor this to be generic keyring.
  128. dockerKeyring credentialprovider.DockerKeyring
  129. containerRefManager *kubecontainer.RefManager
  130. podGetter podGetter
  131. runtimeHelper kubecontainer.RuntimeHelper
  132. recorder record.EventRecorder
  133. livenessManager proberesults.Manager
  134. imagePuller images.ImageManager
  135. runner kubecontainer.HandlerRunner
  136. execer utilexec.Interface
  137. os kubecontainer.OSInterface
  138. // Network plugin.
  139. networkPlugin network.NetworkPlugin
  140. // If true, the "hairpin mode" flag is set on container interfaces.
  141. // A false value means the kubelet just backs off from setting it,
  142. // it might already be true.
  143. configureHairpinMode bool
  144. // used for a systemd Exec, which requires the full path.
  145. touchPath string
  146. nsenterPath string
  147. versions versions
  148. // requestTimeout is the timeout of rkt requests.
  149. requestTimeout time.Duration
  150. }
  151. var _ kubecontainer.Runtime = &Runtime{}
  152. // TODO(yifan): This duplicates the podGetter in dockertools.
  153. type podGetter interface {
  154. GetPodByUID(kubetypes.UID) (*api.Pod, bool)
  155. }
  156. // cliInterface wrapps the command line calls for testing purpose.
  157. type cliInterface interface {
  158. // RunCommand creates rkt commands and runs it with the given config.
  159. // If the config is nil, it will use the one inferred from rkt API service.
  160. RunCommand(config *Config, args ...string) (result []string, err error)
  161. }
  162. // New creates the rkt container runtime which implements the container runtime interface.
  163. // It will test if the rkt binary is in the $PATH, and whether we can get the
  164. // version of it. If so, creates the rkt container runtime, otherwise returns an error.
  165. func New(
  166. apiEndpoint string,
  167. config *Config,
  168. runtimeHelper kubecontainer.RuntimeHelper,
  169. recorder record.EventRecorder,
  170. containerRefManager *kubecontainer.RefManager,
  171. podGetter podGetter,
  172. livenessManager proberesults.Manager,
  173. httpClient types.HttpGetter,
  174. networkPlugin network.NetworkPlugin,
  175. hairpinMode bool,
  176. execer utilexec.Interface,
  177. os kubecontainer.OSInterface,
  178. imageBackOff *flowcontrol.Backoff,
  179. serializeImagePulls bool,
  180. requestTimeout time.Duration,
  181. ) (*Runtime, error) {
  182. // Create dbus connection.
  183. systemd, err := newSystemd()
  184. if err != nil {
  185. return nil, fmt.Errorf("rkt: cannot create systemd interface: %v", err)
  186. }
  187. // TODO(yifan): Use secure connection.
  188. apisvcConn, err := grpc.Dial(apiEndpoint, grpc.WithInsecure())
  189. if err != nil {
  190. return nil, fmt.Errorf("rkt: cannot connect to rkt api service: %v", err)
  191. }
  192. // TODO(yifan): Get the rkt path from API service.
  193. if config.Path == "" {
  194. // No default rkt path was set, so try to find one in $PATH.
  195. var err error
  196. config.Path, err = execer.LookPath("rkt")
  197. if err != nil {
  198. return nil, fmt.Errorf("cannot find rkt binary: %v", err)
  199. }
  200. }
  201. touchPath, err := execer.LookPath("touch")
  202. if err != nil {
  203. return nil, fmt.Errorf("cannot find touch binary: %v", err)
  204. }
  205. nsenterPath, err := execer.LookPath("nsenter")
  206. if err != nil {
  207. return nil, fmt.Errorf("cannot find nsenter binary: %v", err)
  208. }
  209. if requestTimeout == 0 {
  210. requestTimeout = defaultRequestTimeout
  211. }
  212. rkt := &Runtime{
  213. os: kubecontainer.RealOS{},
  214. systemd: systemd,
  215. apisvcConn: apisvcConn,
  216. apisvc: rktapi.NewPublicAPIClient(apisvcConn),
  217. config: config,
  218. dockerKeyring: credentialprovider.NewDockerKeyring(),
  219. containerRefManager: containerRefManager,
  220. podGetter: podGetter,
  221. runtimeHelper: runtimeHelper,
  222. recorder: recorder,
  223. livenessManager: livenessManager,
  224. networkPlugin: networkPlugin,
  225. execer: execer,
  226. touchPath: touchPath,
  227. nsenterPath: nsenterPath,
  228. requestTimeout: requestTimeout,
  229. }
  230. rkt.config, err = rkt.getConfig(rkt.config)
  231. if err != nil {
  232. return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err)
  233. }
  234. rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
  235. rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls)
  236. if err := rkt.getVersions(); err != nil {
  237. return nil, fmt.Errorf("rkt: error getting version info: %v", err)
  238. }
  239. rkt.cli = rkt
  240. return rkt, nil
  241. }
  242. func buildCommand(config *Config, args ...string) *exec.Cmd {
  243. cmd := exec.Command(config.Path)
  244. cmd.Args = append(cmd.Args, config.buildGlobalOptions()...)
  245. cmd.Args = append(cmd.Args, args...)
  246. return cmd
  247. }
  248. // convertToACName converts a string into ACName.
  249. func convertToACName(name string) appctypes.ACName {
  250. // Note that as the 'name' already matches 'DNS_LABEL'
  251. // defined in pkg/api/types.go, there shouldn't be error or panic.
  252. acname, _ := appctypes.SanitizeACName(name)
  253. return *appctypes.MustACName(acname)
  254. }
  255. // RunCommand invokes rkt binary with arguments and returns the result
  256. // from stdout in a list of strings. Each string in the list is a line.
  257. // If config is non-nil, it will use the given config instead of the config
  258. // inferred from rkt API service.
  259. func (r *Runtime) RunCommand(config *Config, args ...string) ([]string, error) {
  260. if config == nil {
  261. config = r.config
  262. }
  263. glog.V(4).Infof("rkt: Run command: %q with config: %#v", args, config)
  264. var stdout, stderr bytes.Buffer
  265. cmd := buildCommand(config, args...)
  266. cmd.Stdout, cmd.Stderr = &stdout, &stderr
  267. if err := cmd.Run(); err != nil {
  268. return nil, fmt.Errorf("failed to run %v: %v\nstdout: %v\nstderr: %v", args, err, stdout.String(), stderr.String())
  269. }
  270. return strings.Split(strings.TrimSpace(stdout.String()), "\n"), nil
  271. }
  272. // makePodServiceFileName constructs the unit file name for a pod using its rkt pod uuid.
  273. func makePodServiceFileName(uuid string) string {
  274. // TODO(yifan): Add name for readability? We need to consider the
  275. // limit of the length.
  276. return fmt.Sprintf("%s%s.service", kubernetesUnitPrefix, uuid)
  277. }
  278. func getRktUUIDFromServiceFileName(filename string) string {
  279. return strings.TrimPrefix(strings.TrimSuffix(filename, path.Ext(filename)), kubernetesUnitPrefix)
  280. }
  281. // setIsolators sets the apps' isolators according to the security context and resource spec.
  282. func setIsolators(app *appctypes.App, c *api.Container, ctx *api.SecurityContext) error {
  283. var isolators []appctypes.Isolator
  284. // Capabilities isolators.
  285. if ctx != nil {
  286. var addCaps, dropCaps []string
  287. if ctx.Capabilities != nil {
  288. addCaps, dropCaps = securitycontext.MakeCapabilities(ctx.Capabilities.Add, ctx.Capabilities.Drop)
  289. }
  290. if ctx.Privileged != nil && *ctx.Privileged {
  291. addCaps, dropCaps = allCapabilities(), []string{}
  292. }
  293. if len(addCaps) > 0 {
  294. set, err := appctypes.NewLinuxCapabilitiesRetainSet(addCaps...)
  295. if err != nil {
  296. return err
  297. }
  298. isolators = append(isolators, set.AsIsolator())
  299. }
  300. if len(dropCaps) > 0 {
  301. set, err := appctypes.NewLinuxCapabilitiesRevokeSet(dropCaps...)
  302. if err != nil {
  303. return err
  304. }
  305. isolators = append(isolators, set.AsIsolator())
  306. }
  307. }
  308. // Resources isolators.
  309. type resource struct {
  310. limit string
  311. request string
  312. }
  313. // If limit is empty, populate it with request and vice versa.
  314. resources := make(map[api.ResourceName]*resource)
  315. for name, quantity := range c.Resources.Limits {
  316. resources[name] = &resource{limit: quantity.String(), request: quantity.String()}
  317. }
  318. for name, quantity := range c.Resources.Requests {
  319. r, ok := resources[name]
  320. if ok {
  321. r.request = quantity.String()
  322. continue
  323. }
  324. resources[name] = &resource{limit: quantity.String(), request: quantity.String()}
  325. }
  326. for name, res := range resources {
  327. switch name {
  328. case api.ResourceCPU:
  329. cpu, err := appctypes.NewResourceCPUIsolator(res.request, res.limit)
  330. if err != nil {
  331. return err
  332. }
  333. isolators = append(isolators, cpu.AsIsolator())
  334. case api.ResourceMemory:
  335. memory, err := appctypes.NewResourceMemoryIsolator(res.request, res.limit)
  336. if err != nil {
  337. return err
  338. }
  339. isolators = append(isolators, memory.AsIsolator())
  340. default:
  341. return fmt.Errorf("resource type not supported: %v", name)
  342. }
  343. }
  344. mergeIsolators(app, isolators)
  345. return nil
  346. }
  347. // mergeIsolators replaces the app.Isolators with isolators.
  348. func mergeIsolators(app *appctypes.App, isolators []appctypes.Isolator) {
  349. for _, is := range isolators {
  350. found := false
  351. for j, js := range app.Isolators {
  352. if is.Name.Equals(js.Name) {
  353. switch is.Name {
  354. case appctypes.LinuxCapabilitiesRetainSetName:
  355. // TODO(yifan): More fine grain merge for capability set instead of override.
  356. fallthrough
  357. case appctypes.LinuxCapabilitiesRevokeSetName:
  358. fallthrough
  359. case appctypes.ResourceCPUName:
  360. fallthrough
  361. case appctypes.ResourceMemoryName:
  362. app.Isolators[j] = is
  363. default:
  364. panic(fmt.Sprintf("unexpected isolator name: %v", is.Name))
  365. }
  366. found = true
  367. break
  368. }
  369. }
  370. if !found {
  371. app.Isolators = append(app.Isolators, is)
  372. }
  373. }
  374. }
  375. // mergeEnv merges the optEnv with the image's environments.
  376. // The environments defined in the image will be overridden by
  377. // the ones with the same name in optEnv.
  378. func mergeEnv(app *appctypes.App, optEnv []kubecontainer.EnvVar) {
  379. envMap := make(map[string]string)
  380. for _, e := range app.Environment {
  381. envMap[e.Name] = e.Value
  382. }
  383. for _, e := range optEnv {
  384. envMap[e.Name] = e.Value
  385. }
  386. app.Environment = nil
  387. for name, value := range envMap {
  388. app.Environment = append(app.Environment, appctypes.EnvironmentVariable{
  389. Name: name,
  390. Value: value,
  391. })
  392. }
  393. }
  394. // mergeMounts merges the mountPoints with the image's mount points.
  395. // The mount points defined in the image will be overridden by the ones
  396. // with the same container path.
  397. func mergeMounts(app *appctypes.App, mountPoints []appctypes.MountPoint) {
  398. mountMap := make(map[string]appctypes.MountPoint)
  399. for _, m := range app.MountPoints {
  400. mountMap[m.Path] = m
  401. }
  402. for _, m := range mountPoints {
  403. mountMap[m.Path] = m
  404. }
  405. app.MountPoints = nil
  406. for _, mount := range mountMap {
  407. app.MountPoints = append(app.MountPoints, mount)
  408. }
  409. }
  410. // mergePortMappings merges the containerPorts with the image's container ports.
  411. // The port mappings defined in the image will be overridden by the ones
  412. // with the same name in optPortMappings.
  413. func mergePortMappings(app *appctypes.App, containerPorts []appctypes.Port) {
  414. portMap := make(map[appctypes.ACName]appctypes.Port)
  415. for _, p := range app.Ports {
  416. portMap[p.Name] = p
  417. }
  418. for _, p := range containerPorts {
  419. portMap[p.Name] = p
  420. }
  421. app.Ports = nil
  422. for _, port := range portMap {
  423. app.Ports = append(app.Ports, port)
  424. }
  425. }
  426. func verifyNonRoot(app *appctypes.App, ctx *api.SecurityContext) error {
  427. if ctx != nil && ctx.RunAsNonRoot != nil && *ctx.RunAsNonRoot {
  428. if ctx.RunAsUser != nil && *ctx.RunAsUser == 0 {
  429. return fmt.Errorf("container's runAsUser breaks non-root policy")
  430. }
  431. if ctx.RunAsUser == nil && app.User == "0" {
  432. return fmt.Errorf("container has no runAsUser and image will run as root")
  433. }
  434. }
  435. return nil
  436. }
  437. func setSupplementalGIDs(app *appctypes.App, podCtx *api.PodSecurityContext, supplementalGids []int64) {
  438. if podCtx != nil || len(supplementalGids) != 0 {
  439. app.SupplementaryGIDs = app.SupplementaryGIDs[:0]
  440. }
  441. if podCtx != nil {
  442. for _, v := range podCtx.SupplementalGroups {
  443. app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(v))
  444. }
  445. if podCtx.FSGroup != nil {
  446. app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(*podCtx.FSGroup))
  447. }
  448. }
  449. for _, v := range supplementalGids {
  450. app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(v))
  451. }
  452. }
  453. // setApp merges the container spec with the image's manifest.
  454. func setApp(imgManifest *appcschema.ImageManifest, c *api.Container,
  455. mountPoints []appctypes.MountPoint, containerPorts []appctypes.Port, envs []kubecontainer.EnvVar,
  456. ctx *api.SecurityContext, podCtx *api.PodSecurityContext, supplementalGids []int64) error {
  457. app := imgManifest.App
  458. // Set up Exec.
  459. var command, args []string
  460. cmd, ok := imgManifest.Annotations.Get(appcDockerEntrypoint)
  461. if ok {
  462. err := json.Unmarshal([]byte(cmd), &command)
  463. if err != nil {
  464. return fmt.Errorf("cannot unmarshal ENTRYPOINT %q: %v", cmd, err)
  465. }
  466. }
  467. ag, ok := imgManifest.Annotations.Get(appcDockerCmd)
  468. if ok {
  469. err := json.Unmarshal([]byte(ag), &args)
  470. if err != nil {
  471. return fmt.Errorf("cannot unmarshal CMD %q: %v", ag, err)
  472. }
  473. }
  474. userCommand, userArgs := kubecontainer.ExpandContainerCommandAndArgs(c, envs)
  475. if len(userCommand) > 0 {
  476. command = userCommand
  477. args = nil // If 'command' is specified, then drop the default args.
  478. }
  479. if len(userArgs) > 0 {
  480. args = userArgs
  481. }
  482. exec := append(command, args...)
  483. if len(exec) > 0 {
  484. app.Exec = exec
  485. }
  486. // Set UID and GIDs.
  487. if err := verifyNonRoot(app, ctx); err != nil {
  488. return err
  489. }
  490. if ctx != nil && ctx.RunAsUser != nil {
  491. app.User = strconv.Itoa(int(*ctx.RunAsUser))
  492. }
  493. setSupplementalGIDs(app, podCtx, supplementalGids)
  494. // If 'User' or 'Group' are still empty at this point,
  495. // then apply the root UID and GID.
  496. // TODO(yifan): If only the GID is empty, rkt should be able to determine the GID
  497. // using the /etc/passwd file in the image.
  498. // See https://github.com/appc/docker2aci/issues/175.
  499. // Maybe we can remove this check in the future.
  500. if app.User == "" {
  501. app.User = "0"
  502. app.Group = "0"
  503. }
  504. if app.Group == "" {
  505. return fmt.Errorf("cannot determine the GID of the app %q", imgManifest.Name)
  506. }
  507. // Set working directory.
  508. if len(c.WorkingDir) > 0 {
  509. app.WorkingDirectory = c.WorkingDir
  510. }
  511. // Notes that we don't create Mounts section in the pod manifest here,
  512. // as Mounts will be automatically generated by rkt.
  513. mergeMounts(app, mountPoints)
  514. mergeEnv(app, envs)
  515. mergePortMappings(app, containerPorts)
  516. return setIsolators(app, c, ctx)
  517. }
  518. // makePodManifest transforms a kubelet pod spec to the rkt pod manifest.
  519. func (r *Runtime) makePodManifest(pod *api.Pod, podIP string, pullSecrets []api.Secret) (*appcschema.PodManifest, error) {
  520. manifest := appcschema.BlankPodManifest()
  521. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  522. defer cancel()
  523. listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
  524. Detail: true,
  525. Filters: kubernetesPodFilters(pod.UID),
  526. })
  527. if err != nil {
  528. return nil, fmt.Errorf("couldn't list pods: %v", err)
  529. }
  530. restartCount := 0
  531. for _, pod := range listResp.Pods {
  532. manifest := &appcschema.PodManifest{}
  533. err = json.Unmarshal(pod.Manifest, manifest)
  534. if err != nil {
  535. glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
  536. continue
  537. }
  538. if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
  539. num, err := strconv.Atoi(countString)
  540. if err != nil {
  541. glog.Warningf("rkt: error reading restart count on pod: %v", err)
  542. continue
  543. }
  544. if num+1 > restartCount {
  545. restartCount = num + 1
  546. }
  547. }
  548. }
  549. requiresPrivileged := false
  550. manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue)
  551. manifest.Annotations.Set(*appctypes.MustACIdentifier(types.KubernetesPodUIDLabel), string(pod.UID))
  552. manifest.Annotations.Set(*appctypes.MustACIdentifier(types.KubernetesPodNameLabel), pod.Name)
  553. manifest.Annotations.Set(*appctypes.MustACIdentifier(types.KubernetesPodNamespaceLabel), pod.Namespace)
  554. manifest.Annotations.Set(*appctypes.MustACIdentifier(types.KubernetesContainerNameLabel), leaky.PodInfraContainerName)
  555. manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))
  556. if stage1Name, ok := pod.Annotations[k8sRktStage1NameAnno]; ok {
  557. requiresPrivileged = true
  558. manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktStage1NameAnno), stage1Name)
  559. }
  560. for _, c := range pod.Spec.Containers {
  561. err := r.newAppcRuntimeApp(pod, podIP, c, requiresPrivileged, pullSecrets, manifest)
  562. if err != nil {
  563. return nil, err
  564. }
  565. }
  566. // TODO(yifan): Set pod-level isolators once it's supported in kubernetes.
  567. return manifest, nil
  568. }
  569. func copyfile(src, dst string) error {
  570. data, err := ioutil.ReadFile(src)
  571. if err != nil {
  572. return err
  573. }
  574. return ioutil.WriteFile(dst, data, 0644)
  575. }
  576. // TODO(yifan): Can make rkt handle this when '--net=host'. See https://github.com/coreos/rkt/issues/2430.
  577. func makeHostNetworkMount(opts *kubecontainer.RunContainerOptions) (*kubecontainer.Mount, *kubecontainer.Mount, error) {
  578. hostsPath := filepath.Join(opts.PodContainerDir, "etc-hosts")
  579. resolvPath := filepath.Join(opts.PodContainerDir, "etc-resolv-conf")
  580. if err := copyfile("/etc/hosts", hostsPath); err != nil {
  581. return nil, nil, err
  582. }
  583. if err := copyfile("/etc/resolv.conf", resolvPath); err != nil {
  584. return nil, nil, err
  585. }
  586. hostsMount := kubecontainer.Mount{
  587. Name: "kubernetes-hostnetwork-hosts-conf",
  588. ContainerPath: "/etc/hosts",
  589. HostPath: hostsPath,
  590. }
  591. resolvMount := kubecontainer.Mount{
  592. Name: "kubernetes-hostnetwork-resolv-conf",
  593. ContainerPath: "/etc/resolv.conf",
  594. HostPath: resolvPath,
  595. }
  596. opts.Mounts = append(opts.Mounts, hostsMount, resolvMount)
  597. return &hostsMount, &resolvMount, nil
  598. }
  599. // podFinishedMarkerPath returns the path to a file which should be used to
  600. // indicate the pod exiting, and the time thereof.
  601. // If the file at the path does not exist, the pod should not be exited. If it
  602. // does exist, then the ctime of the file should indicate the time the pod
  603. // exited.
  604. func podFinishedMarkerPath(podDir string, rktUID string) string {
  605. return filepath.Join(podDir, "finished-"+rktUID)
  606. }
  607. func podFinishedMarkCommand(touchPath, podDir, rktUID string) string {
  608. // TODO, if the path has a `'` character in it, this breaks.
  609. return touchPath + " " + podFinishedMarkerPath(podDir, rktUID)
  610. }
  611. // podFinishedAt returns the time that a pod exited, or a zero time if it has
  612. // not.
  613. func (r *Runtime) podFinishedAt(podUID kubetypes.UID, rktUID string) time.Time {
  614. markerFile := podFinishedMarkerPath(r.runtimeHelper.GetPodDir(podUID), rktUID)
  615. stat, err := r.os.Stat(markerFile)
  616. if err != nil {
  617. if !os.IsNotExist(err) {
  618. glog.Warningf("rkt: unexpected fs error checking pod finished marker: %v", err)
  619. }
  620. return time.Time{}
  621. }
  622. return stat.ModTime()
  623. }
  624. func (r *Runtime) makeContainerLogMount(opts *kubecontainer.RunContainerOptions, container *api.Container) (*kubecontainer.Mount, error) {
  625. if opts.PodContainerDir == "" || container.TerminationMessagePath == "" {
  626. return nil, nil
  627. }
  628. // In docker runtime, the container log path contains the container ID.
  629. // However, for rkt runtime, we cannot get the container ID before the
  630. // the container is launched, so here we generate a random uuid to enable
  631. // us to map a container's termination message path to a unique log file
  632. // on the disk.
  633. randomUID := uuid.NewUUID()
  634. containerLogPath := path.Join(opts.PodContainerDir, string(randomUID))
  635. fs, err := r.os.Create(containerLogPath)
  636. if err != nil {
  637. return nil, err
  638. }
  639. if err := fs.Close(); err != nil {
  640. return nil, err
  641. }
  642. mnt := kubecontainer.Mount{
  643. // Use a random name for the termination message mount, so that
  644. // when a container restarts, it will not overwrite the old termination
  645. // message.
  646. Name: fmt.Sprintf("termination-message-%s", randomUID),
  647. ContainerPath: container.TerminationMessagePath,
  648. HostPath: containerLogPath,
  649. ReadOnly: false,
  650. }
  651. opts.Mounts = append(opts.Mounts, mnt)
  652. return &mnt, nil
  653. }
  654. func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, podIP string, c api.Container, requiresPrivileged bool, pullSecrets []api.Secret, manifest *appcschema.PodManifest) error {
  655. var annotations appctypes.Annotations = []appctypes.Annotation{
  656. {
  657. Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
  658. Value: strconv.FormatUint(kubecontainer.HashContainer(&c), 10),
  659. },
  660. {
  661. Name: *appctypes.MustACIdentifier(types.KubernetesContainerNameLabel),
  662. Value: c.Name,
  663. },
  664. }
  665. if requiresPrivileged && !securitycontext.HasPrivilegedRequest(&c) {
  666. return fmt.Errorf("cannot make %q: running a custom stage1 requires a privileged security context", format.Pod(pod))
  667. }
  668. if err, _ := r.imagePuller.EnsureImageExists(pod, &c, pullSecrets); err != nil {
  669. return nil
  670. }
  671. imgManifest, err := r.getImageManifest(c.Image)
  672. if err != nil {
  673. return err
  674. }
  675. if imgManifest.App == nil {
  676. imgManifest.App = new(appctypes.App)
  677. }
  678. imageID, err := r.getImageID(c.Image)
  679. if err != nil {
  680. return err
  681. }
  682. hash, err := appctypes.NewHash(imageID)
  683. if err != nil {
  684. return err
  685. }
  686. // TODO: determine how this should be handled for rkt
  687. opts, err := r.runtimeHelper.GenerateRunContainerOptions(pod, &c, podIP)
  688. if err != nil {
  689. return err
  690. }
  691. // Create additional mount for termintation message path.
  692. mount, err := r.makeContainerLogMount(opts, &c)
  693. if err != nil {
  694. return err
  695. }
  696. mounts := append(opts.Mounts, *mount)
  697. annotations = append(annotations, appctypes.Annotation{
  698. Name: *appctypes.MustACIdentifier(k8sRktTerminationMessagePathAnno),
  699. Value: mount.HostPath,
  700. })
  701. // If run in 'hostnetwork' mode, then copy the host's /etc/resolv.conf and /etc/hosts,
  702. // and add mounts.
  703. if kubecontainer.IsHostNetworkPod(pod) {
  704. hostsMount, resolvMount, err := makeHostNetworkMount(opts)
  705. if err != nil {
  706. return err
  707. }
  708. mounts = append(mounts, *hostsMount, *resolvMount)
  709. }
  710. supplementalGids := r.runtimeHelper.GetExtraSupplementalGroupsForPod(pod)
  711. ctx := securitycontext.DetermineEffectiveSecurityContext(pod, &c)
  712. volumes, mountPoints := convertKubeMounts(mounts)
  713. containerPorts, hostPorts := convertKubePortMappings(opts.PortMappings)
  714. if err := setApp(imgManifest, &c, mountPoints, containerPorts, opts.Envs, ctx, pod.Spec.SecurityContext, supplementalGids); err != nil {
  715. return err
  716. }
  717. ra := appcschema.RuntimeApp{
  718. Name: convertToACName(c.Name),
  719. Image: appcschema.RuntimeImage{ID: *hash},
  720. App: imgManifest.App,
  721. Annotations: annotations,
  722. }
  723. if c.SecurityContext != nil && c.SecurityContext.ReadOnlyRootFilesystem != nil {
  724. ra.ReadOnlyRootFS = *c.SecurityContext.ReadOnlyRootFilesystem
  725. }
  726. manifest.Apps = append(manifest.Apps, ra)
  727. manifest.Volumes = append(manifest.Volumes, volumes...)
  728. manifest.Ports = append(manifest.Ports, hostPorts...)
  729. return nil
  730. }
  731. func runningKubernetesPodFilters(uid kubetypes.UID) []*rktapi.PodFilter {
  732. return []*rktapi.PodFilter{
  733. {
  734. States: []rktapi.PodState{
  735. rktapi.PodState_POD_STATE_RUNNING,
  736. },
  737. Annotations: []*rktapi.KeyValue{
  738. {
  739. Key: k8sRktKubeletAnno,
  740. Value: k8sRktKubeletAnnoValue,
  741. },
  742. {
  743. Key: types.KubernetesPodUIDLabel,
  744. Value: string(uid),
  745. },
  746. },
  747. },
  748. }
  749. }
  750. func kubernetesPodFilters(uid kubetypes.UID) []*rktapi.PodFilter {
  751. return []*rktapi.PodFilter{
  752. {
  753. Annotations: []*rktapi.KeyValue{
  754. {
  755. Key: k8sRktKubeletAnno,
  756. Value: k8sRktKubeletAnnoValue,
  757. },
  758. {
  759. Key: types.KubernetesPodUIDLabel,
  760. Value: string(uid),
  761. },
  762. },
  763. },
  764. }
  765. }
  766. func kubernetesPodsFilters() []*rktapi.PodFilter {
  767. return []*rktapi.PodFilter{
  768. {
  769. Annotations: []*rktapi.KeyValue{
  770. {
  771. Key: k8sRktKubeletAnno,
  772. Value: k8sRktKubeletAnnoValue,
  773. },
  774. },
  775. },
  776. }
  777. }
  778. func newUnitOption(section, name, value string) *unit.UnitOption {
  779. return &unit.UnitOption{Section: section, Name: name, Value: value}
  780. }
  781. // apiPodToruntimePod converts an api.Pod to kubelet/container.Pod.
  782. func apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.Pod {
  783. p := &kubecontainer.Pod{
  784. ID: pod.UID,
  785. Name: pod.Name,
  786. Namespace: pod.Namespace,
  787. }
  788. for i := range pod.Spec.Containers {
  789. c := &pod.Spec.Containers[i]
  790. p.Containers = append(p.Containers, &kubecontainer.Container{
  791. ID: buildContainerID(&containerID{uuid, c.Name}),
  792. Name: c.Name,
  793. Image: c.Image,
  794. Hash: kubecontainer.HashContainer(c),
  795. })
  796. }
  797. return p
  798. }
  799. // serviceFilePath returns the absolute path of the service file.
  800. func serviceFilePath(serviceName string) string {
  801. return path.Join(systemdServiceDir, serviceName)
  802. }
  803. // shouldCreateNetns returns true if:
  804. // The pod does not run in host network. And
  805. // The pod runs inside a netns created outside of rkt.
  806. func (r *Runtime) shouldCreateNetns(pod *api.Pod) bool {
  807. return !kubecontainer.IsHostNetworkPod(pod) && r.networkPlugin.Name() != network.DefaultPluginName
  808. }
  809. // usesRktHostNetwork returns true if:
  810. // The pod runs in the host network. Or
  811. // The pod runs inside a netns created outside of rkt.
  812. func (r *Runtime) usesRktHostNetwork(pod *api.Pod) bool {
  813. return kubecontainer.IsHostNetworkPod(pod) || r.shouldCreateNetns(pod)
  814. }
  815. // generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters.
  816. func (r *Runtime) generateRunCommand(pod *api.Pod, uuid, netnsName string) (string, error) {
  817. config := *r.config
  818. privileged := true
  819. for _, c := range pod.Spec.Containers {
  820. ctx := securitycontext.DetermineEffectiveSecurityContext(pod, &c)
  821. if ctx == nil || ctx.Privileged == nil || *ctx.Privileged == false {
  822. privileged = false
  823. break
  824. }
  825. }
  826. // Use "all-run" insecure option (https://github.com/coreos/rkt/pull/2983) to take care
  827. // of privileged pod.
  828. // TODO(yifan): Have more granular app-level control of the insecure options.
  829. // See: https://github.com/coreos/rkt/issues/2996.
  830. if privileged {
  831. config.InsecureOptions = fmt.Sprintf("%s,%s", config.InsecureOptions, "all-run")
  832. }
  833. runPrepared := buildCommand(&config, "run-prepared").Args
  834. var hostname string
  835. var err error
  836. osInfos, err := getOSReleaseInfo()
  837. if err != nil {
  838. glog.Warningf("rkt: Failed to read the os release info: %v", err)
  839. } else {
  840. // Overlay fs is not supported for SELinux yet on many distros.
  841. // See https://github.com/coreos/rkt/issues/1727#issuecomment-173203129.
  842. // For now, coreos carries a patch to support it: https://github.com/coreos/coreos-overlay/pull/1703
  843. if osInfos["ID"] != "coreos" && pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil {
  844. runPrepared = append(runPrepared, "--no-overlay=true")
  845. }
  846. }
  847. // Apply '--net=host' to pod that is running on host network or inside a network namespace.
  848. if r.usesRktHostNetwork(pod) {
  849. runPrepared = append(runPrepared, "--net=host")
  850. } else {
  851. runPrepared = append(runPrepared, fmt.Sprintf("--net=%s", defaultNetworkName))
  852. }
  853. if kubecontainer.IsHostNetworkPod(pod) {
  854. // TODO(yifan): Let runtimeHelper.GeneratePodHostNameAndDomain() to handle this.
  855. hostname, err = r.os.Hostname()
  856. if err != nil {
  857. return "", err
  858. }
  859. } else {
  860. // Setup DNS.
  861. dnsServers, dnsSearches, err := r.runtimeHelper.GetClusterDNS(pod)
  862. if err != nil {
  863. return "", err
  864. }
  865. for _, server := range dnsServers {
  866. runPrepared = append(runPrepared, fmt.Sprintf("--dns=%s", server))
  867. }
  868. for _, search := range dnsSearches {
  869. runPrepared = append(runPrepared, fmt.Sprintf("--dns-search=%s", search))
  870. }
  871. if len(dnsServers) > 0 || len(dnsSearches) > 0 {
  872. runPrepared = append(runPrepared, fmt.Sprintf("--dns-opt=%s", defaultDNSOption))
  873. }
  874. // TODO(yifan): host domain is not being used.
  875. hostname, _, err = r.runtimeHelper.GeneratePodHostNameAndDomain(pod)
  876. if err != nil {
  877. return "", err
  878. }
  879. }
  880. runPrepared = append(runPrepared, fmt.Sprintf("--hostname=%s", hostname))
  881. runPrepared = append(runPrepared, uuid)
  882. if r.shouldCreateNetns(pod) {
  883. // Drop the `rkt run-prepared` into the network namespace we
  884. // created.
  885. // TODO: switch to 'ip netns exec' once we can depend on a new
  886. // enough version that doesn't have bugs like
  887. // https://bugzilla.redhat.com/show_bug.cgi?id=882047
  888. nsenterExec := []string{r.nsenterPath, "--net=" + netnsPathFromName(netnsName), "--"}
  889. runPrepared = append(nsenterExec, runPrepared...)
  890. }
  891. return strings.Join(runPrepared, " "), nil
  892. }
  893. func (r *Runtime) cleanupPodNetwork(pod *api.Pod) error {
  894. glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod))
  895. // No-op if the pod is not running in a created netns.
  896. if !r.shouldCreateNetns(pod) {
  897. return nil
  898. }
  899. var teardownErr error
  900. containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
  901. if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil {
  902. teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err)
  903. glog.Errorf("%v", teardownErr)
  904. }
  905. if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil {
  906. return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err)
  907. }
  908. return teardownErr
  909. }
  910. func (r *Runtime) preparePodArgs(manifest *appcschema.PodManifest, manifestFileName string) []string {
  911. // Order of precedence for the stage1:
  912. // 1) pod annotation (stage1 name)
  913. // 2) kubelet configured stage1 (stage1 path)
  914. // 3) empty; whatever rkt's compiled to default to
  915. stage1ImageCmd := ""
  916. if r.config.Stage1Image != "" {
  917. stage1ImageCmd = "--stage1-name=" + r.config.Stage1Image
  918. }
  919. if stage1Name, ok := manifest.Annotations.Get(k8sRktStage1NameAnno); ok {
  920. stage1ImageCmd = "--stage1-name=" + stage1Name
  921. }
  922. // Run 'rkt prepare' to get the rkt UUID.
  923. cmds := []string{"prepare", "--quiet", "--pod-manifest", manifestFileName}
  924. if stage1ImageCmd != "" {
  925. cmds = append(cmds, stage1ImageCmd)
  926. }
  927. return cmds
  928. }
  929. func (r *Runtime) getSelinuxContext(opt *api.SELinuxOptions) (string, error) {
  930. selinuxRunner := selinux.NewSelinuxContextRunner()
  931. str, err := selinuxRunner.Getfilecon(r.config.Dir)
  932. if err != nil {
  933. return "", err
  934. }
  935. ctx := strings.SplitN(str, ":", 4)
  936. if len(ctx) != 4 {
  937. return "", fmt.Errorf("malformated selinux context")
  938. }
  939. if opt.User != "" {
  940. ctx[0] = opt.User
  941. }
  942. if opt.Role != "" {
  943. ctx[1] = opt.Role
  944. }
  945. if opt.Type != "" {
  946. ctx[2] = opt.Type
  947. }
  948. if opt.Level != "" {
  949. ctx[3] = opt.Level
  950. }
  951. return strings.Join(ctx, ":"), nil
  952. }
  953. // preparePod will:
  954. //
  955. // 1. Invoke 'rkt prepare' to prepare the pod, and get the rkt pod uuid.
  956. // 2. Create the unit file and save it under systemdUnitDir.
  957. //
  958. // On success, it will return a string that represents name of the unit file
  959. // and the runtime pod.
  960. func (r *Runtime) preparePod(pod *api.Pod, podIP string, pullSecrets []api.Secret, netnsName string) (string, *kubecontainer.Pod, error) {
  961. // Generate the appc pod manifest from the k8s pod spec.
  962. manifest, err := r.makePodManifest(pod, podIP, pullSecrets)
  963. if err != nil {
  964. return "", nil, err
  965. }
  966. manifestFile, err := ioutil.TempFile("", fmt.Sprintf("manifest-%s-", pod.Name))
  967. if err != nil {
  968. return "", nil, err
  969. }
  970. defer func() {
  971. manifestFile.Close()
  972. if err := r.os.Remove(manifestFile.Name()); err != nil {
  973. glog.Warningf("rkt: Cannot remove temp manifest file %q: %v", manifestFile.Name(), err)
  974. }
  975. }()
  976. data, err := json.Marshal(manifest)
  977. if err != nil {
  978. return "", nil, err
  979. }
  980. glog.V(4).Infof("Generating pod manifest for pod %q: %v", format.Pod(pod), string(data))
  981. // Since File.Write returns error if the written length is less than len(data),
  982. // so check error is enough for us.
  983. if _, err := manifestFile.Write(data); err != nil {
  984. return "", nil, err
  985. }
  986. prepareCmd := r.preparePodArgs(manifest, manifestFile.Name())
  987. output, err := r.cli.RunCommand(nil, prepareCmd...)
  988. if err != nil {
  989. return "", nil, err
  990. }
  991. if len(output) != 1 {
  992. return "", nil, fmt.Errorf("invalid output from 'rkt prepare': %v", output)
  993. }
  994. uuid := output[0]
  995. glog.V(4).Infof("'rkt prepare' returns %q", uuid)
  996. // Create systemd service file for the rkt pod.
  997. runPrepared, err := r.generateRunCommand(pod, uuid, netnsName)
  998. if err != nil {
  999. return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err)
  1000. }
  1001. // TODO handle pod.Spec.HostPID
  1002. // TODO handle pod.Spec.HostIPC
  1003. // TODO per container finishedAt, not just per pod
  1004. markPodFinished := podFinishedMarkCommand(r.touchPath, r.runtimeHelper.GetPodDir(pod.UID), uuid)
  1005. hostNetwork := kubecontainer.IsHostNetworkPod(pod)
  1006. units := []*unit.UnitOption{
  1007. newUnitOption("Service", "ExecStart", runPrepared),
  1008. newUnitOption("Service", "ExecStopPost", markPodFinished),
  1009. // This enables graceful stop.
  1010. newUnitOption("Service", "KillMode", "mixed"),
  1011. newUnitOption("Service", "TimeoutStopSec", fmt.Sprintf("%ds", getPodTerminationGracePeriodInSecond(pod))),
  1012. // Track pod info for garbage collection
  1013. newUnitOption(unitKubernetesSection, unitPodUID, string(pod.UID)),
  1014. newUnitOption(unitKubernetesSection, unitPodName, pod.Name),
  1015. newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace),
  1016. newUnitOption(unitKubernetesSection, unitPodHostNetwork, fmt.Sprintf("%v", hostNetwork)),
  1017. }
  1018. if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil {
  1019. opt := pod.Spec.SecurityContext.SELinuxOptions
  1020. selinuxContext, err := r.getSelinuxContext(opt)
  1021. if err != nil {
  1022. glog.Errorf("rkt: Failed to construct selinux context with selinux option %q: %v", opt, err)
  1023. return "", nil, err
  1024. }
  1025. units = append(units, newUnitOption("Service", "SELinuxContext", selinuxContext))
  1026. }
  1027. serviceName := makePodServiceFileName(uuid)
  1028. glog.V(4).Infof("rkt: Creating service file %q for pod %q", serviceName, format.Pod(pod))
  1029. serviceFile, err := r.os.Create(serviceFilePath(serviceName))
  1030. if err != nil {
  1031. return "", nil, err
  1032. }
  1033. if _, err := io.Copy(serviceFile, unit.Serialize(units)); err != nil {
  1034. return "", nil, err
  1035. }
  1036. serviceFile.Close()
  1037. return serviceName, apiPodToruntimePod(uuid, pod), nil
  1038. }
  1039. // generateEvents is a helper function that generates some container
  1040. // life cycle events for containers in a pod.
  1041. func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, failure error) {
  1042. // Set up container references.
  1043. for _, c := range runtimePod.Containers {
  1044. containerID := c.ID
  1045. id, err := parseContainerID(containerID)
  1046. if err != nil {
  1047. glog.Warningf("Invalid container ID %q", containerID)
  1048. continue
  1049. }
  1050. ref, ok := r.containerRefManager.GetRef(containerID)
  1051. if !ok {
  1052. glog.Warningf("No ref for container %q", containerID)
  1053. continue
  1054. }
  1055. // Note that 'rkt id' is the pod id.
  1056. uuid := utilstrings.ShortenString(id.uuid, 8)
  1057. switch reason {
  1058. case "Created":
  1059. r.recorder.Eventf(ref, api.EventTypeNormal, events.CreatedContainer, "Created with rkt id %v", uuid)
  1060. case "Started":
  1061. r.recorder.Eventf(ref, api.EventTypeNormal, events.StartedContainer, "Started with rkt id %v", uuid)
  1062. case "Failed":
  1063. r.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToStartContainer, "Failed to start with rkt id %v with error %v", uuid, failure)
  1064. case "Killing":
  1065. r.recorder.Eventf(ref, api.EventTypeNormal, events.KillingContainer, "Killing with rkt id %v", uuid)
  1066. default:
  1067. glog.Errorf("rkt: Unexpected event %q", reason)
  1068. }
  1069. }
  1070. return
  1071. }
  1072. func makePodNetnsName(podID kubetypes.UID) string {
  1073. return fmt.Sprintf("%s%s", kubernetesUnitPrefix, string(podID))
  1074. }
  1075. func netnsPathFromName(netnsName string) string {
  1076. return fmt.Sprintf("/var/run/netns/%s", netnsName)
  1077. }
  1078. // setupPodNetwork creates a network namespace for the given pod and calls
  1079. // configured NetworkPlugin's setup function on it.
  1080. // It returns the namespace name, configured IP (if available), and an error if
  1081. // one occurred.
  1082. //
  1083. // If the pod is running in host network or is running using the no-op plugin, then nothing will be done.
  1084. func (r *Runtime) setupPodNetwork(pod *api.Pod) (string, string, error) {
  1085. glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.networkPlugin.Name(), format.Pod(pod))
  1086. // No-op if the pod is not running in a created netns.
  1087. if !r.shouldCreateNetns(pod) {
  1088. return "", "", nil
  1089. }
  1090. netnsName := makePodNetnsName(pod.UID)
  1091. // Create a new network namespace for the pod
  1092. r.execer.Command("ip", "netns", "del", netnsName).Output()
  1093. _, err := r.execer.Command("ip", "netns", "add", netnsName).Output()
  1094. if err != nil {
  1095. return "", "", fmt.Errorf("failed to create pod network namespace: %v", err)
  1096. }
  1097. // Set up networking with the network plugin
  1098. glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod))
  1099. containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
  1100. err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID)
  1101. if err != nil {
  1102. return "", "", fmt.Errorf("failed to set up pod network: %v", err)
  1103. }
  1104. status, err := r.networkPlugin.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID)
  1105. if err != nil {
  1106. return "", "", fmt.Errorf("failed to get status of pod network: %v", err)
  1107. }
  1108. if r.configureHairpinMode {
  1109. if err = hairpin.SetUpContainerPath(netnsPathFromName(netnsName), network.DefaultInterfaceName); err != nil {
  1110. glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err)
  1111. }
  1112. }
  1113. return netnsName, status.IP.String(), nil
  1114. }
  1115. // RunPod first creates the unit file for a pod, and then
  1116. // starts the unit over d-bus.
  1117. func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
  1118. glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod))
  1119. var err error
  1120. var netnsName string
  1121. var podIP string
  1122. netnsName, podIP, err = r.setupPodNetwork(pod)
  1123. if err != nil {
  1124. r.cleanupPodNetwork(pod)
  1125. return err
  1126. }
  1127. name, runtimePod, prepareErr := r.preparePod(pod, podIP, pullSecrets, netnsName)
  1128. // Set container references and generate events.
  1129. // If preparedPod fails, then send out 'failed' events for each container.
  1130. // Otherwise, store the container references so we can use them later to send events.
  1131. for i, c := range pod.Spec.Containers {
  1132. ref, err := kubecontainer.GenerateContainerRef(pod, &c)
  1133. if err != nil {
  1134. glog.Errorf("Couldn't make a ref to pod %q, container %v: '%v'", format.Pod(pod), c.Name, err)
  1135. continue
  1136. }
  1137. if prepareErr != nil {
  1138. r.recorder.Eventf(ref, api.EventTypeWarning, events.FailedToCreateContainer, "Failed to create rkt container with error: %v", prepareErr)
  1139. continue
  1140. }
  1141. containerID := runtimePod.Containers[i].ID
  1142. r.containerRefManager.SetRef(containerID, ref)
  1143. }
  1144. if prepareErr != nil {
  1145. r.cleanupPodNetwork(pod)
  1146. return prepareErr
  1147. }
  1148. r.generateEvents(runtimePod, "Created", nil)
  1149. // RestartUnit has the same effect as StartUnit if the unit is not running, besides it can restart
  1150. // a unit if the unit file is changed and reloaded.
  1151. reschan := make(chan string)
  1152. _, err = r.systemd.RestartUnit(name, "replace", reschan)
  1153. if err != nil {
  1154. r.generateEvents(runtimePod, "Failed", err)
  1155. r.cleanupPodNetwork(pod)
  1156. return err
  1157. }
  1158. res := <-reschan
  1159. if res != "done" {
  1160. err := fmt.Errorf("Failed to restart unit %q: %s", name, res)
  1161. r.generateEvents(runtimePod, "Failed", err)
  1162. r.cleanupPodNetwork(pod)
  1163. return err
  1164. }
  1165. r.generateEvents(runtimePod, "Started", nil)
  1166. // This is a temporary solution until we have a clean design on how
  1167. // kubelet handles events. See https://github.com/kubernetes/kubernetes/issues/23084.
  1168. if err := r.runLifecycleHooks(pod, runtimePod, lifecyclePostStartHook); err != nil {
  1169. if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil {
  1170. return errors.NewAggregate([]error{err, errKill})
  1171. }
  1172. r.cleanupPodNetwork(pod)
  1173. return err
  1174. }
  1175. return nil
  1176. }
  1177. func (r *Runtime) runPreStopHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error {
  1178. glog.V(4).Infof("rkt: Running pre-stop hook for container %q of pod %q", container.Name, format.Pod(pod))
  1179. msg, err := r.runner.Run(containerID, pod, container, container.Lifecycle.PreStop)
  1180. if err != nil {
  1181. ref, ok := r.containerRefManager.GetRef(containerID)
  1182. if !ok {
  1183. glog.Warningf("No ref for container %q", containerID)
  1184. } else {
  1185. r.recorder.Eventf(ref, api.EventTypeWarning, events.FailedPreStopHook, msg)
  1186. }
  1187. }
  1188. return err
  1189. }
  1190. func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error {
  1191. glog.V(4).Infof("rkt: Running post-start hook for container %q of pod %q", container.Name, format.Pod(pod))
  1192. cid, err := parseContainerID(containerID)
  1193. if err != nil {
  1194. return fmt.Errorf("cannot parse container ID %v", containerID)
  1195. }
  1196. isContainerRunning := func() (done bool, err error) {
  1197. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  1198. defer cancel()
  1199. resp, err := r.apisvc.InspectPod(ctx, &rktapi.InspectPodRequest{Id: cid.uuid})
  1200. if err != nil {
  1201. return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod))
  1202. }
  1203. for _, app := range resp.Pod.Apps {
  1204. if app.Name == cid.appName {
  1205. return app.State == rktapi.AppState_APP_STATE_RUNNING, nil
  1206. }
  1207. }
  1208. return false, fmt.Errorf("failed to find container %q in rkt pod %q", cid.appName, cid.uuid)
  1209. }
  1210. // TODO(yifan): Polling the pod's state for now.
  1211. timeout := time.Second * 5
  1212. pollInterval := time.Millisecond * 500
  1213. if err := utilwait.Poll(pollInterval, timeout, isContainerRunning); err != nil {
  1214. return fmt.Errorf("rkt: Pod %q doesn't become running in %v: %v", format.Pod(pod), timeout, err)
  1215. }
  1216. msg, err := r.runner.Run(containerID, pod, container, container.Lifecycle.PostStart)
  1217. if err != nil {
  1218. ref, ok := r.containerRefManager.GetRef(containerID)
  1219. if !ok {
  1220. glog.Warningf("No ref for container %q", containerID)
  1221. } else {
  1222. r.recorder.Eventf(ref, api.EventTypeWarning, events.FailedPostStartHook, msg)
  1223. }
  1224. }
  1225. return err
  1226. }
  1227. type lifecycleHookType string
  1228. const (
  1229. lifecyclePostStartHook lifecycleHookType = "post-start"
  1230. lifecyclePreStopHook lifecycleHookType = "pre-stop"
  1231. )
  1232. func (r *Runtime) runLifecycleHooks(pod *api.Pod, runtimePod *kubecontainer.Pod, typ lifecycleHookType) error {
  1233. var wg sync.WaitGroup
  1234. var errlist []error
  1235. errCh := make(chan error, len(pod.Spec.Containers))
  1236. wg.Add(len(pod.Spec.Containers))
  1237. for i, c := range pod.Spec.Containers {
  1238. var hookFunc func(kubecontainer.ContainerID, *api.Pod, *api.Container) error
  1239. switch typ {
  1240. case lifecyclePostStartHook:
  1241. if c.Lifecycle != nil && c.Lifecycle.PostStart != nil {
  1242. hookFunc = r.runPostStartHook
  1243. }
  1244. case lifecyclePreStopHook:
  1245. if c.Lifecycle != nil && c.Lifecycle.PreStop != nil {
  1246. hookFunc = r.runPreStopHook
  1247. }
  1248. default:
  1249. errCh <- fmt.Errorf("Unrecognized lifecycle hook type %q for container %q in pod %q", typ, c.Name, format.Pod(pod))
  1250. }
  1251. if hookFunc == nil {
  1252. wg.Done()
  1253. continue
  1254. }
  1255. container := &pod.Spec.Containers[i]
  1256. runtimeContainer := runtimePod.FindContainerByName(container.Name)
  1257. if runtimeContainer == nil {
  1258. // Container already gone.
  1259. wg.Done()
  1260. continue
  1261. }
  1262. containerID := runtimeContainer.ID
  1263. go func() {
  1264. defer wg.Done()
  1265. if err := hookFunc(containerID, pod, container); err != nil {
  1266. glog.Errorf("rkt: Failed to run %s hook for container %q of pod %q: %v", typ, container.Name, format.Pod(pod), err)
  1267. errCh <- err
  1268. } else {
  1269. glog.V(4).Infof("rkt: %s hook completed successfully for container %q of pod %q", typ, container.Name, format.Pod(pod))
  1270. }
  1271. }()
  1272. }
  1273. wg.Wait()
  1274. close(errCh)
  1275. for err := range errCh {
  1276. errlist = append(errlist, err)
  1277. }
  1278. return errors.NewAggregate(errlist)
  1279. }
  1280. // convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
  1281. func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) {
  1282. manifest := &appcschema.PodManifest{}
  1283. err := json.Unmarshal(rktpod.Manifest, manifest)
  1284. if err != nil {
  1285. return nil, err
  1286. }
  1287. podUID, ok := manifest.Annotations.Get(types.KubernetesPodUIDLabel)
  1288. if !ok {
  1289. return nil, fmt.Errorf("pod is missing annotation %s", types.KubernetesPodUIDLabel)
  1290. }
  1291. podName, ok := manifest.Annotations.Get(types.KubernetesPodNameLabel)
  1292. if !ok {
  1293. return nil, fmt.Errorf("pod is missing annotation %s", types.KubernetesPodNameLabel)
  1294. }
  1295. podNamespace, ok := manifest.Annotations.Get(types.KubernetesPodNamespaceLabel)
  1296. if !ok {
  1297. return nil, fmt.Errorf("pod is missing annotation %s", types.KubernetesPodNamespaceLabel)
  1298. }
  1299. kubepod := &kubecontainer.Pod{
  1300. ID: kubetypes.UID(podUID),
  1301. Name: podName,
  1302. Namespace: podNamespace,
  1303. }
  1304. for i, app := range rktpod.Apps {
  1305. // The order of the apps is determined by the rkt pod manifest.
  1306. // TODO(yifan): Let the server to unmarshal the annotations? https://github.com/coreos/rkt/issues/1872
  1307. hashStr, ok := manifest.Apps[i].Annotations.Get(k8sRktContainerHashAnno)
  1308. if !ok {
  1309. return nil, fmt.Errorf("app %q is missing annotation %s", app.Name, k8sRktContainerHashAnno)
  1310. }
  1311. containerHash, err := strconv.ParseUint(hashStr, 10, 64)
  1312. if err != nil {
  1313. return nil, fmt.Errorf("couldn't parse container's hash %q: %v", hashStr, err)
  1314. }
  1315. kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
  1316. ID: buildContainerID(&containerID{rktpod.Id, app.Name}),
  1317. Name: app.Name,
  1318. // By default, the version returned by rkt API service will be "latest" if not specified.
  1319. Image: fmt.Sprintf("%s:%s", app.Image.Name, app.Image.Version),
  1320. ImageID: app.Image.Id,
  1321. Hash: containerHash,
  1322. State: appStateToContainerState(app.State),
  1323. })
  1324. }
  1325. return kubepod, nil
  1326. }
  1327. // GetPods runs 'rkt list' to get the list of rkt pods.
  1328. // Then it will use the result to construct a list of container runtime pods.
  1329. // If all is false, then only running pods will be returned, otherwise all pods will be
  1330. // returned.
  1331. func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
  1332. glog.V(4).Infof("Rkt getting pods")
  1333. listReq := &rktapi.ListPodsRequest{
  1334. Detail: true,
  1335. Filters: []*rktapi.PodFilter{
  1336. {
  1337. Annotations: []*rktapi.KeyValue{
  1338. {
  1339. Key: k8sRktKubeletAnno,
  1340. Value: k8sRktKubeletAnnoValue,
  1341. },
  1342. },
  1343. },
  1344. },
  1345. }
  1346. if !all {
  1347. listReq.Filters[0].States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
  1348. }
  1349. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  1350. defer cancel()
  1351. listResp, err := r.apisvc.ListPods(ctx, listReq)
  1352. if err != nil {
  1353. return nil, fmt.Errorf("couldn't list pods: %v", err)
  1354. }
  1355. pods := make(map[kubetypes.UID]*kubecontainer.Pod)
  1356. var podIDs []kubetypes.UID
  1357. for _, pod := range listResp.Pods {
  1358. pod, err := r.convertRktPod(pod)
  1359. if err != nil {
  1360. glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
  1361. continue
  1362. }
  1363. // Group pods together.
  1364. oldPod, found := pods[pod.ID]
  1365. if !found {
  1366. pods[pod.ID] = pod
  1367. podIDs = append(podIDs, pod.ID)
  1368. continue
  1369. }
  1370. oldPod.Containers = append(oldPod.Containers, pod.Containers...)
  1371. }
  1372. // Convert map to list, using the consistent order from the podIDs array.
  1373. var result []*kubecontainer.Pod
  1374. for _, id := range podIDs {
  1375. result = append(result, pods[id])
  1376. }
  1377. return result, nil
  1378. }
  1379. func getPodTerminationGracePeriodInSecond(pod *api.Pod) int64 {
  1380. var gracePeriod int64
  1381. switch {
  1382. case pod.DeletionGracePeriodSeconds != nil:
  1383. gracePeriod = *pod.DeletionGracePeriodSeconds
  1384. case pod.Spec.TerminationGracePeriodSeconds != nil:
  1385. gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  1386. }
  1387. if gracePeriod < minimumGracePeriodInSeconds {
  1388. gracePeriod = minimumGracePeriodInSeconds
  1389. }
  1390. return gracePeriod
  1391. }
  1392. func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod) {
  1393. gracePeriod := getPodTerminationGracePeriodInSecond(pod)
  1394. done := make(chan struct{})
  1395. go func() {
  1396. if err := r.runLifecycleHooks(pod, runningPod, lifecyclePreStopHook); err != nil {
  1397. glog.Errorf("rkt: Some pre-stop hooks failed for pod %q: %v", format.Pod(pod), err)
  1398. }
  1399. close(done)
  1400. }()
  1401. select {
  1402. case <-time.After(time.Duration(gracePeriod) * time.Second):
  1403. glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %q", gracePeriod, format.Pod(pod))
  1404. case <-done:
  1405. }
  1406. }
  1407. // KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
  1408. // TODO: add support for gracePeriodOverride which is used in eviction scenarios
  1409. func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
  1410. glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
  1411. if len(runningPod.Containers) == 0 {
  1412. glog.V(4).Infof("rkt: Pod %q is already being killed, no action will be taken", runningPod.Name)
  1413. return nil
  1414. }
  1415. if pod != nil {
  1416. r.waitPreStopHooks(pod, &runningPod)
  1417. }
  1418. containerID, err := parseContainerID(runningPod.Containers[0].ID)
  1419. if err != nil {
  1420. glog.Errorf("rkt: Failed to get rkt uuid of the pod %q: %v", runningPod.Name, err)
  1421. return err
  1422. }
  1423. serviceName := makePodServiceFileName(containerID.uuid)
  1424. serviceFile := serviceFilePath(serviceName)
  1425. r.generateEvents(&runningPod, "Killing", nil)
  1426. for _, c := range runningPod.Containers {
  1427. r.containerRefManager.ClearRef(c.ID)
  1428. }
  1429. // Since all service file have 'KillMode=mixed', the processes in
  1430. // the unit's cgroup will receive a SIGKILL if the normal stop timeouts.
  1431. reschan := make(chan string)
  1432. if _, err = r.systemd.StopUnit(serviceName, "replace", reschan); err != nil {
  1433. glog.Errorf("rkt: Failed to stop unit %q: %v", serviceName, err)
  1434. return err
  1435. }
  1436. res := <-reschan
  1437. if res != "done" {
  1438. err := fmt.Errorf("invalid result: %s", res)
  1439. glog.Errorf("rkt: Failed to stop unit %q: %v", serviceName, err)
  1440. return err
  1441. }
  1442. // Clean up networking. Use the service file to get pod details since 'pod' can be nil.
  1443. if err := r.cleanupPodNetworkFromServiceFile(serviceFile); err != nil {
  1444. glog.Errorf("rkt: failed to tear down network for unit %q: %v", serviceName, err)
  1445. return err
  1446. }
  1447. return nil
  1448. }
  1449. func (r *Runtime) Type() string {
  1450. return RktType
  1451. }
  1452. func (r *Runtime) Version() (kubecontainer.Version, error) {
  1453. r.versions.RLock()
  1454. defer r.versions.RUnlock()
  1455. return r.versions.binVersion, nil
  1456. }
  1457. func (r *Runtime) APIVersion() (kubecontainer.Version, error) {
  1458. r.versions.RLock()
  1459. defer r.versions.RUnlock()
  1460. return r.versions.apiVersion, nil
  1461. }
  1462. // Status returns error if rkt is unhealthy, nil otherwise.
  1463. func (r *Runtime) Status() error {
  1464. return r.checkVersion(minimumRktBinVersion, recommendedRktBinVersion, minimumRktApiVersion, minimumSystemdVersion)
  1465. }
  1466. // SyncPod syncs the running pod to match the specified desired pod.
  1467. func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  1468. var err error
  1469. defer func() {
  1470. if err != nil {
  1471. result.Fail(err)
  1472. }
  1473. }()
  1474. // TODO: (random-liu) Stop using running pod in SyncPod()
  1475. // TODO: (random-liu) Rename podStatus to apiPodStatus, rename internalPodStatus to podStatus, and use new pod status as much as possible,
  1476. // we may stop using apiPodStatus someday.
  1477. runningPod := kubecontainer.ConvertPodStatusToRunningPod(internalPodStatus)
  1478. // Add references to all containers.
  1479. unidentifiedContainers := make(map[kubecontainer.ContainerID]*kubecontainer.Container)
  1480. for _, c := range runningPod.Containers {
  1481. unidentifiedContainers[c.ID] = c
  1482. }
  1483. restartPod := false
  1484. for _, container := range pod.Spec.Containers {
  1485. expectedHash := kubecontainer.HashContainer(&container)
  1486. c := runningPod.FindContainerByName(container.Name)
  1487. if c == nil {
  1488. if kubecontainer.ShouldContainerBeRestarted(&container, pod, internalPodStatus) {
  1489. glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
  1490. // TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
  1491. // https://github.com/appc/spec/issues/276.
  1492. restartPod = true
  1493. break
  1494. }
  1495. continue
  1496. }
  1497. // TODO: check for non-root image directives. See ../docker/manager.go#SyncPod
  1498. // TODO(yifan): Take care of host network change.
  1499. containerChanged := c.Hash != 0 && c.Hash != expectedHash
  1500. if containerChanged {
  1501. glog.Infof("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", format.Pod(pod), container.Name, c.Hash, expectedHash)
  1502. restartPod = true
  1503. break
  1504. }
  1505. liveness, found := r.livenessManager.Get(c.ID)
  1506. if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
  1507. glog.Infof("Pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)
  1508. restartPod = true
  1509. break
  1510. }
  1511. delete(unidentifiedContainers, c.ID)
  1512. }
  1513. // If there is any unidentified containers, restart the pod.
  1514. if len(unidentifiedContainers) > 0 {
  1515. restartPod = true
  1516. }
  1517. if restartPod {
  1518. // Kill the pod only if the pod is actually running.
  1519. if len(runningPod.Containers) > 0 {
  1520. if err = r.KillPod(pod, runningPod, nil); err != nil {
  1521. return
  1522. }
  1523. }
  1524. if err = r.RunPod(pod, pullSecrets); err != nil {
  1525. return
  1526. }
  1527. }
  1528. return
  1529. }
  1530. // Sort rkt pods by creation time.
  1531. type podsByCreatedAt []*rktapi.Pod
  1532. func (s podsByCreatedAt) Len() int { return len(s) }
  1533. func (s podsByCreatedAt) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  1534. func (s podsByCreatedAt) Less(i, j int) bool { return s[i].CreatedAt < s[j].CreatedAt }
  1535. // getPodUID returns the pod's API UID, it returns
  1536. // empty UID if the UID cannot be determined.
  1537. func getPodUID(pod *rktapi.Pod) kubetypes.UID {
  1538. for _, anno := range pod.Annotations {
  1539. if anno.Key == types.KubernetesPodUIDLabel {
  1540. return kubetypes.UID(anno.Value)
  1541. }
  1542. }
  1543. return kubetypes.UID("")
  1544. }
  1545. // podIsActive returns true if the pod is embryo, preparing or running.
  1546. // If a pod is prepared, it is not guaranteed to be active (e.g. the systemd
  1547. // service might fail).
  1548. func podIsActive(pod *rktapi.Pod) bool {
  1549. return pod.State == rktapi.PodState_POD_STATE_EMBRYO ||
  1550. pod.State == rktapi.PodState_POD_STATE_PREPARING ||
  1551. pod.State == rktapi.PodState_POD_STATE_RUNNING
  1552. }
  1553. // GetNetNS returns the network namespace path for the given container
  1554. func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) {
  1555. // This is a slight hack, kubenet shouldn't be asking us about a container id
  1556. // but a pod id. This is because it knows too much about the infra container.
  1557. // We pretend the pod.UID is an infra container ID.
  1558. // This deception is only possible because we played the same trick in
  1559. // `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`.
  1560. return netnsPathFromName(makePodNetnsName(kubetypes.UID(containerID.ID))), nil
  1561. }
  1562. func (r *Runtime) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
  1563. return kubecontainer.ContainerID{ID: string(pod.ID)}, nil
  1564. }
  1565. func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, bool, error) {
  1566. f, err := os.Open(serviceFilePath)
  1567. if err != nil {
  1568. return "", "", "", false, err
  1569. }
  1570. defer f.Close()
  1571. opts, err := unit.Deserialize(f)
  1572. if err != nil {
  1573. return "", "", "", false, err
  1574. }
  1575. var id, name, namespace, hostnetwork string
  1576. for _, o := range opts {
  1577. if o.Section != unitKubernetesSection {
  1578. continue
  1579. }
  1580. switch o.Name {
  1581. case unitPodUID:
  1582. id = o.Value
  1583. case unitPodName:
  1584. name = o.Value
  1585. case unitPodNamespace:
  1586. namespace = o.Value
  1587. case unitPodHostNetwork:
  1588. hostnetwork = o.Value
  1589. }
  1590. if id != "" && name != "" && namespace != "" && hostnetwork != "" {
  1591. podHostNetwork, err := strconv.ParseBool(hostnetwork)
  1592. if err != nil {
  1593. return "", "", "", false, err
  1594. }
  1595. return id, name, namespace, podHostNetwork, nil
  1596. }
  1597. }
  1598. return "", "", "", false, fmt.Errorf("failed to parse pod from file %s", serviceFilePath)
  1599. }
  1600. func (r *Runtime) DeleteContainer(containerID kubecontainer.ContainerID) error {
  1601. return fmt.Errorf("unimplemented")
  1602. }
  1603. // GarbageCollect collects the pods/containers.
  1604. // After one GC iteration:
  1605. // - The deleted pods will be removed.
  1606. // - If the number of containers exceeds gcPolicy.MaxContainers,
  1607. // then containers whose ages are older than gcPolicy.minAge will
  1608. // be removed.
  1609. func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
  1610. var errlist []error
  1611. var totalInactiveContainers int
  1612. var inactivePods []*rktapi.Pod
  1613. var removeCandidates []*rktapi.Pod
  1614. var allPods = map[string]*rktapi.Pod{}
  1615. glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy)
  1616. // GC all inactive systemd service files and pods.
  1617. files, err := r.os.ReadDir(systemdServiceDir)
  1618. if err != nil {
  1619. glog.Errorf("rkt: Failed to read the systemd service directory: %v", err)
  1620. return err
  1621. }
  1622. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  1623. defer cancel()
  1624. resp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{Filters: kubernetesPodsFilters()})
  1625. if err != nil {
  1626. glog.Errorf("rkt: Failed to list pods: %v", err)
  1627. return err
  1628. }
  1629. // Mark inactive pods.
  1630. for _, pod := range resp.Pods {
  1631. allPods[pod.Id] = pod
  1632. if !podIsActive(pod) {
  1633. uid := getPodUID(pod)
  1634. if uid == kubetypes.UID("") {
  1635. glog.Errorf("rkt: Cannot get the UID of pod %q, pod is broken, will remove it", pod.Id)
  1636. removeCandidates = append(removeCandidates, pod)
  1637. continue
  1638. }
  1639. _, found := r.podGetter.GetPodByUID(uid)
  1640. if !found && allSourcesReady {
  1641. removeCandidates = append(removeCandidates, pod)
  1642. continue
  1643. }
  1644. inactivePods = append(inactivePods, pod)
  1645. totalInactiveContainers = totalInactiveContainers + len(pod.Apps)
  1646. }
  1647. }
  1648. // Remove any orphan service files.
  1649. for _, f := range files {
  1650. serviceName := f.Name()
  1651. if strings.HasPrefix(serviceName, kubernetesUnitPrefix) {
  1652. rktUUID := getRktUUIDFromServiceFileName(serviceName)
  1653. if _, ok := allPods[rktUUID]; !ok {
  1654. glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName)
  1655. if err := r.systemd.ResetFailedUnit(serviceName); err != nil {
  1656. glog.Warningf("rkt: Failed to reset the failed systemd service %q: %v", serviceName, err)
  1657. }
  1658. serviceFile := serviceFilePath(serviceName)
  1659. // Network may not be around anymore so errors are ignored
  1660. if err := r.cleanupPodNetworkFromServiceFile(serviceFile); err != nil {
  1661. glog.Warningf("rkt: Failed to clean up pod network from service %q: %v, the network may not be around already", serviceName, err)
  1662. }
  1663. if err := r.os.Remove(serviceFile); err != nil {
  1664. errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceFile, err))
  1665. }
  1666. }
  1667. }
  1668. }
  1669. sort.Sort(podsByCreatedAt(inactivePods))
  1670. // Enforce GCPolicy.MaxContainers.
  1671. for _, pod := range inactivePods {
  1672. if totalInactiveContainers <= gcPolicy.MaxContainers {
  1673. break
  1674. }
  1675. creationTime := time.Unix(0, pod.CreatedAt)
  1676. if creationTime.Add(gcPolicy.MinAge).Before(time.Now()) {
  1677. // The pod is old and we are exceeding the MaxContainers limit.
  1678. // Delete the pod.
  1679. removeCandidates = append(removeCandidates, pod)
  1680. totalInactiveContainers = totalInactiveContainers - len(pod.Apps)
  1681. }
  1682. }
  1683. // Remove pods and their servie files.
  1684. for _, pod := range removeCandidates {
  1685. if err := r.removePod(pod.Id); err != nil {
  1686. errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err))
  1687. }
  1688. }
  1689. return errors.NewAggregate(errlist)
  1690. }
  1691. // Read kubernetes pod UUID, namespace, and name from systemd service file and
  1692. // use that to clean up any pod network that may still exist.
  1693. func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) error {
  1694. id, name, namespace, hostnetwork, err := podDetailsFromServiceFile(serviceFilePath)
  1695. if err != nil {
  1696. return err
  1697. }
  1698. return r.cleanupPodNetwork(&api.Pod{
  1699. ObjectMeta: api.ObjectMeta{
  1700. UID: kubetypes.UID(id),
  1701. Name: name,
  1702. Namespace: namespace,
  1703. },
  1704. Spec: api.PodSpec{
  1705. SecurityContext: &api.PodSecurityContext{
  1706. HostNetwork: hostnetwork,
  1707. },
  1708. },
  1709. })
  1710. }
  1711. // removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file
  1712. // related to the pod.
  1713. func (r *Runtime) removePod(uuid string) error {
  1714. var errlist []error
  1715. glog.V(4).Infof("rkt: GC is removing pod %q", uuid)
  1716. serviceName := makePodServiceFileName(uuid)
  1717. serviceFile := serviceFilePath(serviceName)
  1718. // Network may not be around anymore so errors are ignored
  1719. if err := r.cleanupPodNetworkFromServiceFile(serviceFile); err != nil {
  1720. glog.Warningf("rkt: Failed to clean up pod network from service %q: %v, the network may not be around already", serviceName, err)
  1721. }
  1722. if _, err := r.cli.RunCommand(nil, "rm", uuid); err != nil {
  1723. errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err))
  1724. }
  1725. // GC systemd service files as well.
  1726. if err := r.systemd.ResetFailedUnit(serviceName); err != nil {
  1727. glog.Warningf("rkt: Failed to reset the failed systemd service %q: %v", serviceName, err)
  1728. }
  1729. if err := r.os.Remove(serviceFile); err != nil {
  1730. errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceFile, uuid, err))
  1731. }
  1732. return errors.NewAggregate(errlist)
  1733. }
  1734. // rktExitError implements /pkg/util/exec.ExitError interface.
  1735. type rktExitError struct{ *exec.ExitError }
  1736. var _ utilexec.ExitError = &rktExitError{}
  1737. func (r *rktExitError) ExitStatus() int {
  1738. if status, ok := r.Sys().(syscall.WaitStatus); ok {
  1739. return status.ExitStatus()
  1740. }
  1741. return 0
  1742. }
  1743. func newRktExitError(e error) error {
  1744. if exitErr, ok := e.(*exec.ExitError); ok {
  1745. return &rktExitError{exitErr}
  1746. }
  1747. return e
  1748. }
  1749. func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
  1750. return fmt.Errorf("unimplemented")
  1751. }
  1752. // Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is
  1753. // the rkt UUID, and appName is the container name.
  1754. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
  1755. func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
  1756. glog.V(4).Infof("Rkt execing in container.")
  1757. id, err := parseContainerID(containerID)
  1758. if err != nil {
  1759. return err
  1760. }
  1761. args := []string{"enter", fmt.Sprintf("--app=%s", id.appName), id.uuid}
  1762. args = append(args, cmd...)
  1763. command := buildCommand(r.config, args...)
  1764. if tty {
  1765. p, err := kubecontainer.StartPty(command)
  1766. if err != nil {
  1767. return err
  1768. }
  1769. defer p.Close()
  1770. // make sure to close the stdout stream
  1771. defer stdout.Close()
  1772. kubecontainer.HandleResizing(resize, func(size term.Size) {
  1773. term.SetSize(p.Fd(), size)
  1774. })
  1775. if stdin != nil {
  1776. go io.Copy(p, stdin)
  1777. }
  1778. if stdout != nil {
  1779. go io.Copy(stdout, p)
  1780. }
  1781. return newRktExitError(command.Wait())
  1782. }
  1783. if stdin != nil {
  1784. // Use an os.Pipe here as it returns true *os.File objects.
  1785. // This way, if you run 'kubectl exec <pod> -i bash' (no tty) and type 'exit',
  1786. // the call below to command.Run() can unblock because its Stdin is the read half
  1787. // of the pipe.
  1788. r, w, err := r.os.Pipe()
  1789. if err != nil {
  1790. return newRktExitError(err)
  1791. }
  1792. go io.Copy(w, stdin)
  1793. command.Stdin = r
  1794. }
  1795. if stdout != nil {
  1796. command.Stdout = stdout
  1797. }
  1798. if stderr != nil {
  1799. command.Stderr = stderr
  1800. }
  1801. return newRktExitError(command.Run())
  1802. }
  1803. // PortForward executes socat in the pod's network namespace and copies
  1804. // data between stream (representing the user's local connection on their
  1805. // computer) and the specified port in the container.
  1806. //
  1807. // TODO:
  1808. // - match cgroups of container
  1809. // - should we support nsenter + socat on the host? (current impl)
  1810. // - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
  1811. //
  1812. // TODO(yifan): Merge with the same function in dockertools.
  1813. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
  1814. func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
  1815. glog.V(4).Infof("Rkt port forwarding in container.")
  1816. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  1817. defer cancel()
  1818. listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
  1819. Detail: true,
  1820. Filters: runningKubernetesPodFilters(pod.ID),
  1821. })
  1822. if err != nil {
  1823. return fmt.Errorf("couldn't list pods: %v", err)
  1824. }
  1825. if len(listResp.Pods) != 1 {
  1826. var podlist []string
  1827. for _, p := range listResp.Pods {
  1828. podlist = append(podlist, p.Id)
  1829. }
  1830. return fmt.Errorf("more than one running rkt pod for the kubernetes pod [%s]", strings.Join(podlist, ", "))
  1831. }
  1832. socatPath, lookupErr := exec.LookPath("socat")
  1833. if lookupErr != nil {
  1834. return fmt.Errorf("unable to do port forwarding: socat not found.")
  1835. }
  1836. args := []string{"-t", fmt.Sprintf("%d", listResp.Pods[0].Pid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}
  1837. nsenterPath, lookupErr := exec.LookPath("nsenter")
  1838. if lookupErr != nil {
  1839. return fmt.Errorf("unable to do port forwarding: nsenter not found.")
  1840. }
  1841. command := exec.Command(nsenterPath, args...)
  1842. command.Stdout = stream
  1843. // If we use Stdin, command.Run() won't return until the goroutine that's copying
  1844. // from stream finishes. Unfortunately, if you have a client like telnet connected
  1845. // via port forwarding, as long as the user's telnet client is connected to the user's
  1846. // local listener that port forwarding sets up, the telnet session never exits. This
  1847. // means that even if socat has finished running, command.Run() won't ever return
  1848. // (because the client still has the connection and stream open).
  1849. //
  1850. // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
  1851. // when the command (socat) exits.
  1852. inPipe, err := command.StdinPipe()
  1853. if err != nil {
  1854. return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
  1855. }
  1856. go func() {
  1857. io.Copy(inPipe, stream)
  1858. inPipe.Close()
  1859. }()
  1860. return command.Run()
  1861. }
  1862. // appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState.
  1863. func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState {
  1864. switch state {
  1865. case rktapi.AppState_APP_STATE_RUNNING:
  1866. return kubecontainer.ContainerStateRunning
  1867. case rktapi.AppState_APP_STATE_EXITED:
  1868. return kubecontainer.ContainerStateExited
  1869. }
  1870. return kubecontainer.ContainerStateUnknown
  1871. }
  1872. // getPodInfo returns the pod manifest, creation time and restart count of the pod.
  1873. func getPodInfo(pod *rktapi.Pod) (podManifest *appcschema.PodManifest, restartCount int, err error) {
  1874. // TODO(yifan): The manifest is only used for getting the annotations.
  1875. // Consider to let the server to unmarshal the annotations.
  1876. var manifest appcschema.PodManifest
  1877. if err = json.Unmarshal(pod.Manifest, &manifest); err != nil {
  1878. return
  1879. }
  1880. if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
  1881. restartCount, err = strconv.Atoi(countString)
  1882. if err != nil {
  1883. return
  1884. }
  1885. }
  1886. return &manifest, restartCount, nil
  1887. }
  1888. // populateContainerStatus fills the container status according to the app's information.
  1889. func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcschema.RuntimeApp, restartCount int, finishedTime time.Time) (*kubecontainer.ContainerStatus, error) {
  1890. hashStr, ok := runtimeApp.Annotations.Get(k8sRktContainerHashAnno)
  1891. if !ok {
  1892. return nil, fmt.Errorf("No container hash in pod manifest")
  1893. }
  1894. hashNum, err := strconv.ParseUint(hashStr, 10, 64)
  1895. if err != nil {
  1896. return nil, err
  1897. }
  1898. var reason, message string
  1899. if app.State == rktapi.AppState_APP_STATE_EXITED {
  1900. if app.ExitCode == 0 {
  1901. reason = "Completed"
  1902. } else {
  1903. reason = "Error"
  1904. }
  1905. }
  1906. terminationMessagePath, ok := runtimeApp.Annotations.Get(k8sRktTerminationMessagePathAnno)
  1907. if ok {
  1908. if data, err := ioutil.ReadFile(terminationMessagePath); err != nil {
  1909. message = fmt.Sprintf("Error on reading termination-log %s: %v", terminationMessagePath, err)
  1910. } else {
  1911. message = string(data)
  1912. }
  1913. }
  1914. createdTime := time.Unix(0, pod.CreatedAt)
  1915. startedTime := time.Unix(0, pod.StartedAt)
  1916. return &kubecontainer.ContainerStatus{
  1917. ID: buildContainerID(&containerID{uuid: pod.Id, appName: app.Name}),
  1918. Name: app.Name,
  1919. State: appStateToContainerState(app.State),
  1920. CreatedAt: createdTime,
  1921. StartedAt: startedTime,
  1922. FinishedAt: finishedTime,
  1923. ExitCode: int(app.ExitCode),
  1924. // By default, the version returned by rkt API service will be "latest" if not specified.
  1925. Image: fmt.Sprintf("%s:%s", app.Image.Name, app.Image.Version),
  1926. ImageID: "rkt://" + app.Image.Id, // TODO(yifan): Add the prefix only in api.PodStatus.
  1927. Hash: hashNum,
  1928. // TODO(yifan): Note that now all apps share the same restart count, this might
  1929. // change once apps don't share the same lifecycle.
  1930. // See https://github.com/appc/spec/pull/547.
  1931. RestartCount: restartCount,
  1932. Reason: reason,
  1933. Message: message,
  1934. }, nil
  1935. }
  1936. // GetPodStatus returns the status for a pod specified by a given UID, name,
  1937. // and namespace. It will attempt to find pod's information via a request to
  1938. // the rkt api server.
  1939. // An error will be returned if the api server returns an error. If the api
  1940. // server doesn't error, but doesn't provide meaningful information about the
  1941. // pod, a status with no information (other than the passed in arguments) is
  1942. // returned anyways.
  1943. func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
  1944. podStatus := &kubecontainer.PodStatus{
  1945. ID: uid,
  1946. Name: name,
  1947. Namespace: namespace,
  1948. }
  1949. ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)
  1950. defer cancel()
  1951. listResp, err := r.apisvc.ListPods(ctx, &rktapi.ListPodsRequest{
  1952. Detail: true,
  1953. Filters: kubernetesPodFilters(uid),
  1954. })
  1955. if err != nil {
  1956. return nil, fmt.Errorf("couldn't list pods: %v", err)
  1957. }
  1958. var latestPod *rktapi.Pod
  1959. var latestRestartCount int = -1
  1960. // In this loop, we group all containers from all pods together,
  1961. // also we try to find the latest pod, so we can fill other info of the pod below.
  1962. for _, pod := range listResp.Pods {
  1963. manifest, restartCount, err := getPodInfo(pod)
  1964. if err != nil {
  1965. glog.Warningf("rkt: Couldn't get necessary info from the rkt pod, (uuid %q): %v", pod.Id, err)
  1966. continue
  1967. }
  1968. if restartCount > latestRestartCount {
  1969. latestPod = pod
  1970. latestRestartCount = restartCount
  1971. }
  1972. finishedTime := r.podFinishedAt(uid, pod.Id)
  1973. for i, app := range pod.Apps {
  1974. // The order of the apps is determined by the rkt pod manifest.
  1975. cs, err := populateContainerStatus(*pod, *app, manifest.Apps[i], restartCount, finishedTime)
  1976. if err != nil {
  1977. glog.Warningf("rkt: Failed to populate container status(uuid %q, app %q): %v", pod.Id, app.Name, err)
  1978. continue
  1979. }
  1980. podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, cs)
  1981. }
  1982. }
  1983. // If we are running no-op network plugin, then get the pod IP from the rkt pod status.
  1984. if r.networkPlugin.Name() == network.DefaultPluginName {
  1985. if latestPod != nil {
  1986. for _, n := range latestPod.Networks {
  1987. if n.Name == defaultNetworkName {
  1988. podStatus.IP = n.Ipv4
  1989. break
  1990. }
  1991. }
  1992. }
  1993. } else {
  1994. containerID := kubecontainer.ContainerID{ID: string(uid)}
  1995. status, err := r.networkPlugin.GetPodNetworkStatus(namespace, name, containerID)
  1996. if err != nil {
  1997. glog.Warningf("rkt: Failed to get pod network status for pod (UID %q, name %q, namespace %q): %v", uid, name, namespace, err)
  1998. } else if status != nil {
  1999. // status can be nil when the pod is running on the host network, in which case the pod IP
  2000. // will be populated by the upper layer.
  2001. podStatus.IP = status.IP.String()
  2002. }
  2003. }
  2004. return podStatus, nil
  2005. }
  2006. // getOSReleaseInfo reads /etc/os-release and returns a map
  2007. // that contains the key value pairs in that file.
  2008. func getOSReleaseInfo() (map[string]string, error) {
  2009. result := make(map[string]string)
  2010. path := "/etc/os-release"
  2011. f, err := os.Open(path)
  2012. if err != nil {
  2013. return nil, err
  2014. }
  2015. defer f.Close()
  2016. scanner := bufio.NewScanner(f)
  2017. for scanner.Scan() {
  2018. line := scanner.Text()
  2019. if len(strings.TrimSpace(line)) == 0 {
  2020. // Skips empty lines
  2021. continue
  2022. }
  2023. info := strings.SplitN(line, "=", 2)
  2024. if len(info) != 2 {
  2025. glog.Warningf("Unexpected entry in os-release %q", line)
  2026. continue
  2027. }
  2028. result[info[0]] = info[1]
  2029. }
  2030. if err := scanner.Err(); err != nil {
  2031. return nil, err
  2032. }
  2033. return result, nil
  2034. }
  2035. // convertKubeMounts creates appc volumes and mount points according to the given mounts.
  2036. // Only one volume will be created for every unique host path.
  2037. // Only one mount point will be created for every unique container path.
  2038. func convertKubeMounts(mounts []kubecontainer.Mount) ([]appctypes.Volume, []appctypes.MountPoint) {
  2039. volumeMap := make(map[string]*appctypes.Volume)
  2040. mountPointMap := make(map[string]*appctypes.MountPoint)
  2041. for _, mnt := range mounts {
  2042. readOnly := mnt.ReadOnly
  2043. if _, existed := volumeMap[mnt.HostPath]; !existed {
  2044. volumeMap[mnt.HostPath] = &appctypes.Volume{
  2045. Name: *appctypes.MustACName(string(uuid.NewUUID())),
  2046. Kind: "host",
  2047. Source: mnt.HostPath,
  2048. ReadOnly: &readOnly,
  2049. }
  2050. }
  2051. if _, existed := mountPointMap[mnt.ContainerPath]; existed {
  2052. glog.Warningf("Multiple mount points with the same container path %v, ignore it", mnt)
  2053. continue
  2054. }
  2055. mountPointMap[mnt.ContainerPath] = &appctypes.MountPoint{
  2056. Name: volumeMap[mnt.HostPath].Name,
  2057. Path: mnt.ContainerPath,
  2058. ReadOnly: readOnly,
  2059. }
  2060. }
  2061. volumes := make([]appctypes.Volume, 0, len(volumeMap))
  2062. mountPoints := make([]appctypes.MountPoint, 0, len(mountPointMap))
  2063. for _, vol := range volumeMap {
  2064. volumes = append(volumes, *vol)
  2065. }
  2066. for _, mnt := range mountPointMap {
  2067. mountPoints = append(mountPoints, *mnt)
  2068. }
  2069. return volumes, mountPoints
  2070. }
  2071. // convertKubePortMappings creates appc container ports and host ports according to the given port mappings.
  2072. // The container ports and host ports are mapped by PortMapping.Name.
  2073. func convertKubePortMappings(portMappings []kubecontainer.PortMapping) ([]appctypes.Port, []appctypes.ExposedPort) {
  2074. containerPorts := make([]appctypes.Port, 0, len(portMappings))
  2075. hostPorts := make([]appctypes.ExposedPort, 0, len(portMappings))
  2076. for _, p := range portMappings {
  2077. // This matches the docker code's behaviour.
  2078. if p.HostPort == 0 {
  2079. continue
  2080. }
  2081. portName := convertToACName(p.Name)
  2082. containerPorts = append(containerPorts, appctypes.Port{
  2083. Name: portName,
  2084. Protocol: string(p.Protocol),
  2085. Port: uint(p.ContainerPort),
  2086. })
  2087. hostPorts = append(hostPorts, appctypes.ExposedPort{
  2088. Name: portName,
  2089. HostPort: uint(p.HostPort),
  2090. })
  2091. }
  2092. return containerPorts, hostPorts
  2093. }