util.go 174 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "math"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/url"
  25. "os"
  26. "os/exec"
  27. "path"
  28. "path/filepath"
  29. "regexp"
  30. goRuntime "runtime"
  31. "sort"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "syscall"
  36. "time"
  37. "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
  38. "k8s.io/kubernetes/pkg/api"
  39. apierrs "k8s.io/kubernetes/pkg/api/errors"
  40. "k8s.io/kubernetes/pkg/api/resource"
  41. "k8s.io/kubernetes/pkg/api/unversioned"
  42. "k8s.io/kubernetes/pkg/api/v1"
  43. "k8s.io/kubernetes/pkg/apis/extensions"
  44. "k8s.io/kubernetes/pkg/client/cache"
  45. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  46. "k8s.io/kubernetes/pkg/client/restclient"
  47. "k8s.io/kubernetes/pkg/client/typed/discovery"
  48. "k8s.io/kubernetes/pkg/client/typed/dynamic"
  49. client "k8s.io/kubernetes/pkg/client/unversioned"
  50. "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
  51. clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
  52. gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
  53. "k8s.io/kubernetes/pkg/controller"
  54. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  55. "k8s.io/kubernetes/pkg/fields"
  56. "k8s.io/kubernetes/pkg/kubectl"
  57. "k8s.io/kubernetes/pkg/kubelet/util/format"
  58. "k8s.io/kubernetes/pkg/labels"
  59. "k8s.io/kubernetes/pkg/master/ports"
  60. "k8s.io/kubernetes/pkg/runtime"
  61. sshutil "k8s.io/kubernetes/pkg/ssh"
  62. "k8s.io/kubernetes/pkg/types"
  63. uexec "k8s.io/kubernetes/pkg/util/exec"
  64. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  65. "k8s.io/kubernetes/pkg/util/sets"
  66. "k8s.io/kubernetes/pkg/util/system"
  67. "k8s.io/kubernetes/pkg/util/uuid"
  68. "k8s.io/kubernetes/pkg/util/wait"
  69. utilyaml "k8s.io/kubernetes/pkg/util/yaml"
  70. "k8s.io/kubernetes/pkg/version"
  71. "k8s.io/kubernetes/pkg/watch"
  72. "github.com/blang/semver"
  73. "golang.org/x/crypto/ssh"
  74. "golang.org/x/net/websocket"
  75. . "github.com/onsi/ginkgo"
  76. . "github.com/onsi/gomega"
  77. gomegatypes "github.com/onsi/gomega/types"
  78. )
  79. const (
  80. // How long to wait for the pod to be listable
  81. PodListTimeout = time.Minute
  82. // Initial pod start can be delayed O(minutes) by slow docker pulls
  83. // TODO: Make this 30 seconds once #4566 is resolved.
  84. PodStartTimeout = 5 * time.Minute
  85. // How long to wait for the pod to no longer be running
  86. podNoLongerRunningTimeout = 30 * time.Second
  87. // If there are any orphaned namespaces to clean up, this test is running
  88. // on a long lived cluster. A long wait here is preferably to spurious test
  89. // failures caused by leaked resources from a previous test run.
  90. NamespaceCleanupTimeout = 15 * time.Minute
  91. // Some pods can take much longer to get ready due to volume attach/detach latency.
  92. slowPodStartTimeout = 15 * time.Minute
  93. // How long to wait for a service endpoint to be resolvable.
  94. ServiceStartTimeout = 1 * time.Minute
  95. // String used to mark pod deletion
  96. nonExist = "NonExist"
  97. // How often to Poll pods, nodes and claims.
  98. Poll = 2 * time.Second
  99. // service accounts are provisioned after namespace creation
  100. // a service account is required to support pod creation in a namespace as part of admission control
  101. ServiceAccountProvisionTimeout = 2 * time.Minute
  102. // How long to try single API calls (like 'get' or 'list'). Used to prevent
  103. // transient failures from failing tests.
  104. // TODO: client should not apply this timeout to Watch calls. Increased from 30s until that is fixed.
  105. SingleCallTimeout = 5 * time.Minute
  106. // How long nodes have to be "ready" when a test begins. They should already
  107. // be "ready" before the test starts, so this is small.
  108. NodeReadyInitialTimeout = 20 * time.Second
  109. // How long pods have to be "ready" when a test begins.
  110. PodReadyBeforeTimeout = 5 * time.Minute
  111. // How long pods have to become scheduled onto nodes
  112. podScheduledBeforeTimeout = PodListTimeout + (20 * time.Second)
  113. podRespondingTimeout = 2 * time.Minute
  114. ServiceRespondingTimeout = 2 * time.Minute
  115. EndpointRegisterTimeout = time.Minute
  116. // How long claims have to become dynamically provisioned
  117. ClaimProvisionTimeout = 5 * time.Minute
  118. // When these values are updated, also update cmd/kubelet/app/options/options.go
  119. currentPodInfraContainerImageName = "gcr.io/google_containers/pause"
  120. currentPodInfraContainerImageVersion = "3.0"
  121. // How long each node is given during a process that restarts all nodes
  122. // before the test is considered failed. (Note that the total time to
  123. // restart all nodes will be this number times the number of nodes.)
  124. RestartPerNodeTimeout = 5 * time.Minute
  125. // How often to Poll the statues of a restart.
  126. RestartPoll = 20 * time.Second
  127. // How long a node is allowed to become "Ready" after it is restarted before
  128. // the test is considered failed.
  129. RestartNodeReadyAgainTimeout = 5 * time.Minute
  130. // How long a pod is allowed to become "running" and "ready" after a node
  131. // restart before test is considered failed.
  132. RestartPodReadyAgainTimeout = 5 * time.Minute
  133. // Number of times we want to retry Updates in case of conflict
  134. UpdateRetries = 5
  135. )
  136. var (
  137. // Label allocated to the image puller static pod that runs on each node
  138. // before e2es.
  139. ImagePullerLabels = map[string]string{"name": "e2e-image-puller"}
  140. // For parsing Kubectl version for version-skewed testing.
  141. gitVersionRegexp = regexp.MustCompile("GitVersion:\"(v.+?)\"")
  142. // Slice of regexps for names of pods that have to be running to consider a Node "healthy"
  143. requiredPerNodePods = []*regexp.Regexp{
  144. regexp.MustCompile(".*kube-proxy.*"),
  145. regexp.MustCompile(".*fluentd-elasticsearch.*"),
  146. regexp.MustCompile(".*node-problem-detector.*"),
  147. }
  148. )
  149. // GetServerArchitecture fetches the architecture of the cluster's apiserver.
  150. func GetServerArchitecture(c *client.Client) string {
  151. arch := ""
  152. sVer, err := c.Discovery().ServerVersion()
  153. if err != nil || sVer.Platform == "" {
  154. // If we failed to get the server version for some reason, default to amd64.
  155. arch = "amd64"
  156. } else {
  157. // Split the platform string into OS and Arch separately.
  158. // The platform string may for example be "linux/amd64", "linux/arm" or "windows/amd64".
  159. osArchArray := strings.Split(sVer.Platform, "/")
  160. arch = osArchArray[1]
  161. }
  162. return arch
  163. }
  164. // GetPauseImageName fetches the pause image name for the same architecture as the apiserver.
  165. func GetPauseImageName(c *client.Client) string {
  166. return currentPodInfraContainerImageName + "-" + GetServerArchitecture(c) + ":" + currentPodInfraContainerImageVersion
  167. }
  168. // GetPauseImageNameForHostArch fetches the pause image name for the same architecture the test is running on.
  169. func GetPauseImageNameForHostArch() string {
  170. return currentPodInfraContainerImageName + "-" + goRuntime.GOARCH + ":" + currentPodInfraContainerImageVersion
  171. }
  172. // SubResource proxy should have been functional in v1.0.0, but SubResource
  173. // proxy via tunneling is known to be broken in v1.0. See
  174. // https://github.com/kubernetes/kubernetes/pull/15224#issuecomment-146769463
  175. //
  176. // TODO(ihmccreery): remove once we don't care about v1.0 anymore, (tentatively
  177. // in v1.3).
  178. var SubResourcePodProxyVersion = version.MustParse("v1.1.0")
  179. var subResourceServiceAndNodeProxyVersion = version.MustParse("v1.2.0")
  180. func GetServicesProxyRequest(c *client.Client, request *restclient.Request) (*restclient.Request, error) {
  181. subResourceProxyAvailable, err := ServerVersionGTE(subResourceServiceAndNodeProxyVersion, c)
  182. if err != nil {
  183. return nil, err
  184. }
  185. if subResourceProxyAvailable {
  186. return request.Resource("services").SubResource("proxy"), nil
  187. }
  188. return request.Prefix("proxy").Resource("services"), nil
  189. }
  190. // unique identifier of the e2e run
  191. var RunId = uuid.NewUUID()
  192. type CreateTestingNSFn func(baseName string, c *client.Client, labels map[string]string) (*api.Namespace, error)
  193. type ContainerFailures struct {
  194. status *api.ContainerStateTerminated
  195. Restarts int
  196. }
  197. func GetMasterHost() string {
  198. masterUrl, err := url.Parse(TestContext.Host)
  199. ExpectNoError(err)
  200. return masterUrl.Host
  201. }
  202. // Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}.
  203. type PodStore struct {
  204. cache.Store
  205. stopCh chan struct{}
  206. reflector *cache.Reflector
  207. }
  208. func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore {
  209. lw := &cache.ListWatch{
  210. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  211. options.LabelSelector = label
  212. options.FieldSelector = field
  213. return c.Pods(namespace).List(options)
  214. },
  215. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  216. options.LabelSelector = label
  217. options.FieldSelector = field
  218. return c.Pods(namespace).Watch(options)
  219. },
  220. }
  221. store := cache.NewStore(cache.MetaNamespaceKeyFunc)
  222. stopCh := make(chan struct{})
  223. reflector := cache.NewReflector(lw, &api.Pod{}, store, 0)
  224. reflector.RunUntil(stopCh)
  225. return &PodStore{store, stopCh, reflector}
  226. }
  227. func (s *PodStore) List() []*api.Pod {
  228. objects := s.Store.List()
  229. pods := make([]*api.Pod, 0)
  230. for _, o := range objects {
  231. pods = append(pods, o.(*api.Pod))
  232. }
  233. return pods
  234. }
  235. func (s *PodStore) Stop() {
  236. close(s.stopCh)
  237. }
  238. type RCConfig struct {
  239. Client *client.Client
  240. Image string
  241. Command []string
  242. Name string
  243. Namespace string
  244. PollInterval time.Duration
  245. Timeout time.Duration
  246. PodStatusFile *os.File
  247. Replicas int
  248. CpuRequest int64 // millicores
  249. CpuLimit int64 // millicores
  250. MemRequest int64 // bytes
  251. MemLimit int64 // bytes
  252. ReadinessProbe *api.Probe
  253. DNSPolicy *api.DNSPolicy
  254. // Env vars, set the same for every pod.
  255. Env map[string]string
  256. // Extra labels added to every pod.
  257. Labels map[string]string
  258. // Node selector for pods in the RC.
  259. NodeSelector map[string]string
  260. // Ports to declare in the container (map of name to containerPort).
  261. Ports map[string]int
  262. // Ports to declare in the container as host and container ports.
  263. HostPorts map[string]int
  264. Volumes []api.Volume
  265. VolumeMounts []api.VolumeMount
  266. // Pointer to a list of pods; if non-nil, will be set to a list of pods
  267. // created by this RC by RunRC.
  268. CreatedPods *[]*api.Pod
  269. // Maximum allowable container failures. If exceeded, RunRC returns an error.
  270. // Defaults to replicas*0.1 if unspecified.
  271. MaxContainerFailures *int
  272. // If set to false starting RC will print progress, otherwise only errors will be printed.
  273. Silent bool
  274. }
  275. type DeploymentConfig struct {
  276. RCConfig
  277. }
  278. type ReplicaSetConfig struct {
  279. RCConfig
  280. }
  281. func nowStamp() string {
  282. return time.Now().Format(time.StampMilli)
  283. }
  284. func log(level string, format string, args ...interface{}) {
  285. fmt.Fprintf(GinkgoWriter, nowStamp()+": "+level+": "+format+"\n", args...)
  286. }
  287. func Logf(format string, args ...interface{}) {
  288. log("INFO", format, args...)
  289. }
  290. func Failf(format string, args ...interface{}) {
  291. msg := fmt.Sprintf(format, args...)
  292. log("INFO", msg)
  293. Fail(nowStamp()+": "+msg, 1)
  294. }
  295. func Skipf(format string, args ...interface{}) {
  296. msg := fmt.Sprintf(format, args...)
  297. log("INFO", msg)
  298. Skip(nowStamp() + ": " + msg)
  299. }
  300. func SkipUnlessNodeCountIsAtLeast(minNodeCount int) {
  301. if TestContext.CloudConfig.NumNodes < minNodeCount {
  302. Skipf("Requires at least %d nodes (not %d)", minNodeCount, TestContext.CloudConfig.NumNodes)
  303. }
  304. }
  305. func SkipUnlessAtLeast(value int, minValue int, message string) {
  306. if value < minValue {
  307. Skipf(message)
  308. }
  309. }
  310. func SkipIfProviderIs(unsupportedProviders ...string) {
  311. if ProviderIs(unsupportedProviders...) {
  312. Skipf("Not supported for providers %v (found %s)", unsupportedProviders, TestContext.Provider)
  313. }
  314. }
  315. func SkipUnlessProviderIs(supportedProviders ...string) {
  316. if !ProviderIs(supportedProviders...) {
  317. Skipf("Only supported for providers %v (not %s)", supportedProviders, TestContext.Provider)
  318. }
  319. }
  320. func SkipIfContainerRuntimeIs(runtimes ...string) {
  321. for _, runtime := range runtimes {
  322. if runtime == TestContext.ContainerRuntime {
  323. Skipf("Not supported under container runtime %s", runtime)
  324. }
  325. }
  326. }
  327. func ProviderIs(providers ...string) bool {
  328. for _, provider := range providers {
  329. if strings.ToLower(provider) == strings.ToLower(TestContext.Provider) {
  330. return true
  331. }
  332. }
  333. return false
  334. }
  335. func SkipUnlessServerVersionGTE(v semver.Version, c discovery.ServerVersionInterface) {
  336. gte, err := ServerVersionGTE(v, c)
  337. if err != nil {
  338. Failf("Failed to get server version: %v", err)
  339. }
  340. if !gte {
  341. Skipf("Not supported for server versions before %q", v)
  342. }
  343. }
  344. // Detects whether the federation namespace exists in the underlying cluster
  345. func SkipUnlessFederated(c *client.Client) {
  346. federationNS := os.Getenv("FEDERATION_NAMESPACE")
  347. if federationNS == "" {
  348. federationNS = "federation"
  349. }
  350. _, err := c.Namespaces().Get(federationNS)
  351. if err != nil {
  352. if apierrs.IsNotFound(err) {
  353. Skipf("Could not find federation namespace %s: skipping federated test", federationNS)
  354. } else {
  355. Failf("Unexpected error getting namespace: %v", err)
  356. }
  357. }
  358. }
  359. // ProvidersWithSSH are those providers where each node is accessible with SSH
  360. var ProvidersWithSSH = []string{"gce", "gke", "aws"}
  361. // providersWithMasterSSH are those providers where master node is accessible with SSH
  362. var providersWithMasterSSH = []string{"gce", "gke", "kubemark", "aws"}
  363. type podCondition func(pod *api.Pod) (bool, error)
  364. // podReady returns whether pod has a condition of Ready with a status of true.
  365. // TODO: should be replaced with api.IsPodReady
  366. func podReady(pod *api.Pod) bool {
  367. for _, cond := range pod.Status.Conditions {
  368. if cond.Type == api.PodReady && cond.Status == api.ConditionTrue {
  369. return true
  370. }
  371. }
  372. return false
  373. }
  374. // logPodStates logs basic info of provided pods for debugging.
  375. func logPodStates(pods []api.Pod) {
  376. // Find maximum widths for pod, node, and phase strings for column printing.
  377. maxPodW, maxNodeW, maxPhaseW, maxGraceW := len("POD"), len("NODE"), len("PHASE"), len("GRACE")
  378. for i := range pods {
  379. pod := &pods[i]
  380. if len(pod.ObjectMeta.Name) > maxPodW {
  381. maxPodW = len(pod.ObjectMeta.Name)
  382. }
  383. if len(pod.Spec.NodeName) > maxNodeW {
  384. maxNodeW = len(pod.Spec.NodeName)
  385. }
  386. if len(pod.Status.Phase) > maxPhaseW {
  387. maxPhaseW = len(pod.Status.Phase)
  388. }
  389. }
  390. // Increase widths by one to separate by a single space.
  391. maxPodW++
  392. maxNodeW++
  393. maxPhaseW++
  394. maxGraceW++
  395. // Log pod info. * does space padding, - makes them left-aligned.
  396. Logf("%-[1]*[2]s %-[3]*[4]s %-[5]*[6]s %-[7]*[8]s %[9]s",
  397. maxPodW, "POD", maxNodeW, "NODE", maxPhaseW, "PHASE", maxGraceW, "GRACE", "CONDITIONS")
  398. for _, pod := range pods {
  399. grace := ""
  400. if pod.DeletionGracePeriodSeconds != nil {
  401. grace = fmt.Sprintf("%ds", *pod.DeletionGracePeriodSeconds)
  402. }
  403. Logf("%-[1]*[2]s %-[3]*[4]s %-[5]*[6]s %-[7]*[8]s %[9]s",
  404. maxPodW, pod.ObjectMeta.Name, maxNodeW, pod.Spec.NodeName, maxPhaseW, pod.Status.Phase, maxGraceW, grace, pod.Status.Conditions)
  405. }
  406. Logf("") // Final empty line helps for readability.
  407. }
  408. // PodRunningReady checks whether pod p's phase is running and it has a ready
  409. // condition of status true.
  410. func PodRunningReady(p *api.Pod) (bool, error) {
  411. // Check the phase is running.
  412. if p.Status.Phase != api.PodRunning {
  413. return false, fmt.Errorf("want pod '%s' on '%s' to be '%v' but was '%v'",
  414. p.ObjectMeta.Name, p.Spec.NodeName, api.PodRunning, p.Status.Phase)
  415. }
  416. // Check the ready condition is true.
  417. if !podReady(p) {
  418. return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
  419. p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionTrue, p.Status.Conditions)
  420. }
  421. return true, nil
  422. }
  423. func PodRunningReadyOrSucceeded(p *api.Pod) (bool, error) {
  424. // Check if the phase is succeeded.
  425. if p.Status.Phase == api.PodSucceeded {
  426. return true, nil
  427. }
  428. return PodRunningReady(p)
  429. }
  430. // PodNotReady checks whether pod p's has a ready condition of status false.
  431. func PodNotReady(p *api.Pod) (bool, error) {
  432. // Check the ready condition is false.
  433. if podReady(p) {
  434. return false, fmt.Errorf("pod '%s' on '%s' didn't have condition {%v %v}; conditions: %v",
  435. p.ObjectMeta.Name, p.Spec.NodeName, api.PodReady, api.ConditionFalse, p.Status.Conditions)
  436. }
  437. return true, nil
  438. }
  439. // check if a Pod is controlled by a Replication Controller in the List
  440. func hasReplicationControllersForPod(rcs *api.ReplicationControllerList, pod api.Pod) bool {
  441. for _, rc := range rcs.Items {
  442. selector := labels.SelectorFromSet(rc.Spec.Selector)
  443. if selector.Matches(labels.Set(pod.ObjectMeta.Labels)) {
  444. return true
  445. }
  446. }
  447. return false
  448. }
  449. // WaitForPodsSuccess waits till all labels matching the given selector enter
  450. // the Success state. The caller is expected to only invoke this method once the
  451. // pods have been created.
  452. func WaitForPodsSuccess(c *client.Client, ns string, successPodLabels map[string]string, timeout time.Duration) error {
  453. successPodSelector := labels.SelectorFromSet(successPodLabels)
  454. start, badPods := time.Now(), []api.Pod{}
  455. if wait.PollImmediate(30*time.Second, timeout, func() (bool, error) {
  456. podList, err := c.Pods(ns).List(api.ListOptions{LabelSelector: successPodSelector})
  457. if err != nil {
  458. Logf("Error getting pods in namespace %q: %v", ns, err)
  459. return false, nil
  460. }
  461. if len(podList.Items) == 0 {
  462. Logf("Waiting for pods to enter Success, but no pods in %q match label %v", ns, successPodLabels)
  463. return true, nil
  464. }
  465. badPods = []api.Pod{}
  466. for _, pod := range podList.Items {
  467. if pod.Status.Phase != api.PodSucceeded {
  468. badPods = append(badPods, pod)
  469. }
  470. }
  471. successPods := len(podList.Items) - len(badPods)
  472. Logf("%d / %d pods in namespace %q are in Success state (%d seconds elapsed)",
  473. successPods, len(podList.Items), ns, int(time.Since(start).Seconds()))
  474. if len(badPods) == 0 {
  475. return true, nil
  476. }
  477. return false, nil
  478. }) != nil {
  479. logPodStates(badPods)
  480. LogPodsWithLabels(c, ns, successPodLabels)
  481. return fmt.Errorf("Not all pods in namespace %q are successful within %v", ns, timeout)
  482. }
  483. return nil
  484. }
  485. // WaitForPodsRunningReady waits up to timeout to ensure that all pods in
  486. // namespace ns are either running and ready, or failed but controlled by a
  487. // replication controller. Also, it ensures that at least minPods are running
  488. // and ready. It has separate behavior from other 'wait for' pods functions in
  489. // that it requires the list of pods on every iteration. This is useful, for
  490. // example, in cluster startup, because the number of pods increases while
  491. // waiting.
  492. // If ignoreLabels is not empty, pods matching this selector are ignored and
  493. // this function waits for minPods to enter Running/Ready and for all pods
  494. // matching ignoreLabels to enter Success phase. Otherwise an error is returned
  495. // even if there are minPods pods, some of which are in Running/Ready
  496. // and some in Success. This is to allow the client to decide if "Success"
  497. // means "Ready" or not.
  498. func WaitForPodsRunningReady(c *client.Client, ns string, minPods int32, timeout time.Duration, ignoreLabels map[string]string) error {
  499. ignoreSelector := labels.SelectorFromSet(ignoreLabels)
  500. start := time.Now()
  501. Logf("Waiting up to %v for all pods (need at least %d) in namespace '%s' to be running and ready",
  502. timeout, minPods, ns)
  503. wg := sync.WaitGroup{}
  504. wg.Add(1)
  505. var waitForSuccessError error
  506. go func() {
  507. waitForSuccessError = WaitForPodsSuccess(c, ns, ignoreLabels, timeout)
  508. wg.Done()
  509. }()
  510. if wait.PollImmediate(Poll, timeout, func() (bool, error) {
  511. // We get the new list of pods and replication controllers in every
  512. // iteration because more pods come online during startup and we want to
  513. // ensure they are also checked.
  514. rcList, err := c.ReplicationControllers(ns).List(api.ListOptions{})
  515. if err != nil {
  516. Logf("Error getting replication controllers in namespace '%s': %v", ns, err)
  517. return false, nil
  518. }
  519. replicas := int32(0)
  520. for _, rc := range rcList.Items {
  521. replicas += rc.Spec.Replicas
  522. }
  523. podList, err := c.Pods(ns).List(api.ListOptions{})
  524. if err != nil {
  525. Logf("Error getting pods in namespace '%s': %v", ns, err)
  526. return false, nil
  527. }
  528. nOk, replicaOk, badPods := int32(0), int32(0), []api.Pod{}
  529. for _, pod := range podList.Items {
  530. if len(ignoreLabels) != 0 && ignoreSelector.Matches(labels.Set(pod.Labels)) {
  531. Logf("%v in state %v, ignoring", pod.Name, pod.Status.Phase)
  532. continue
  533. }
  534. if res, err := PodRunningReady(&pod); res && err == nil {
  535. nOk++
  536. if hasReplicationControllersForPod(rcList, pod) {
  537. replicaOk++
  538. }
  539. } else {
  540. if pod.Status.Phase != api.PodFailed {
  541. Logf("The status of Pod %s is %s, waiting for it to be either Running or Failed", pod.ObjectMeta.Name, pod.Status.Phase)
  542. badPods = append(badPods, pod)
  543. } else if !hasReplicationControllersForPod(rcList, pod) {
  544. Logf("Pod %s is Failed, but it's not controlled by a ReplicationController", pod.ObjectMeta.Name)
  545. badPods = append(badPods, pod)
  546. }
  547. //ignore failed pods that are controlled by a replication controller
  548. }
  549. }
  550. Logf("%d / %d pods in namespace '%s' are running and ready (%d seconds elapsed)",
  551. nOk, len(podList.Items), ns, int(time.Since(start).Seconds()))
  552. Logf("expected %d pod replicas in namespace '%s', %d are Running and Ready.", replicas, ns, replicaOk)
  553. if replicaOk == replicas && nOk >= minPods && len(badPods) == 0 {
  554. return true, nil
  555. }
  556. logPodStates(badPods)
  557. return false, nil
  558. }) != nil {
  559. return fmt.Errorf("Not all pods in namespace '%s' running and ready within %v", ns, timeout)
  560. }
  561. wg.Wait()
  562. if waitForSuccessError != nil {
  563. return waitForSuccessError
  564. }
  565. return nil
  566. }
  567. func podFromManifest(filename string) (*api.Pod, error) {
  568. var pod api.Pod
  569. Logf("Parsing pod from %v", filename)
  570. data := ReadOrDie(filename)
  571. json, err := utilyaml.ToJSON(data)
  572. if err != nil {
  573. return nil, err
  574. }
  575. if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, &pod); err != nil {
  576. return nil, err
  577. }
  578. return &pod, nil
  579. }
  580. // Run a test container to try and contact the Kubernetes api-server from a pod, wait for it
  581. // to flip to Ready, log its output and delete it.
  582. func RunKubernetesServiceTestContainer(c *client.Client, ns string) {
  583. path := "test/images/clusterapi-tester/pod.yaml"
  584. p, err := podFromManifest(path)
  585. if err != nil {
  586. Logf("Failed to parse clusterapi-tester from manifest %v: %v", path, err)
  587. return
  588. }
  589. p.Namespace = ns
  590. if _, err := c.Pods(ns).Create(p); err != nil {
  591. Logf("Failed to create %v: %v", p.Name, err)
  592. return
  593. }
  594. defer func() {
  595. if err := c.Pods(ns).Delete(p.Name, nil); err != nil {
  596. Logf("Failed to delete pod %v: %v", p.Name, err)
  597. }
  598. }()
  599. timeout := 5 * time.Minute
  600. if err := waitForPodCondition(c, ns, p.Name, "clusterapi-tester", timeout, PodRunningReady); err != nil {
  601. Logf("Pod %v took longer than %v to enter running/ready: %v", p.Name, timeout, err)
  602. return
  603. }
  604. logs, err := GetPodLogs(c, ns, p.Name, p.Spec.Containers[0].Name)
  605. if err != nil {
  606. Logf("Failed to retrieve logs from %v: %v", p.Name, err)
  607. } else {
  608. Logf("Output of clusterapi-tester:\n%v", logs)
  609. }
  610. }
  611. func kubectlLogPod(c *client.Client, pod api.Pod, containerNameSubstr string) {
  612. for _, container := range pod.Spec.Containers {
  613. if strings.Contains(container.Name, containerNameSubstr) {
  614. // Contains() matches all strings if substr is empty
  615. logs, err := GetPodLogs(c, pod.Namespace, pod.Name, container.Name)
  616. if err != nil {
  617. logs, err = getPreviousPodLogs(c, pod.Namespace, pod.Name, container.Name)
  618. if err != nil {
  619. Logf("Failed to get logs of pod %v, container %v, err: %v", pod.Name, container.Name, err)
  620. }
  621. }
  622. By(fmt.Sprintf("Logs of %v/%v:%v on node %v", pod.Namespace, pod.Name, container.Name, pod.Spec.NodeName))
  623. Logf("%s : STARTLOG\n%s\nENDLOG for container %v:%v:%v", containerNameSubstr, logs, pod.Namespace, pod.Name, container.Name)
  624. }
  625. }
  626. }
  627. func LogFailedContainers(c *client.Client, ns string) {
  628. podList, err := c.Pods(ns).List(api.ListOptions{})
  629. if err != nil {
  630. Logf("Error getting pods in namespace '%s': %v", ns, err)
  631. return
  632. }
  633. Logf("Running kubectl logs on non-ready containers in %v", ns)
  634. for _, pod := range podList.Items {
  635. if res, err := PodRunningReady(&pod); !res || err != nil {
  636. kubectlLogPod(c, pod, "")
  637. }
  638. }
  639. }
  640. func LogPodsWithLabels(c *client.Client, ns string, match map[string]string) {
  641. podList, err := c.Pods(ns).List(api.ListOptions{LabelSelector: labels.SelectorFromSet(match)})
  642. if err != nil {
  643. Logf("Error getting pods in namespace %q: %v", ns, err)
  644. return
  645. }
  646. Logf("Running kubectl logs on pods with labels %v in %v", match, ns)
  647. for _, pod := range podList.Items {
  648. kubectlLogPod(c, pod, "")
  649. }
  650. }
  651. func LogContainersInPodsWithLabels(c *client.Client, ns string, match map[string]string, containerSubstr string) {
  652. podList, err := c.Pods(ns).List(api.ListOptions{LabelSelector: labels.SelectorFromSet(match)})
  653. if err != nil {
  654. Logf("Error getting pods in namespace %q: %v", ns, err)
  655. return
  656. }
  657. for _, pod := range podList.Items {
  658. kubectlLogPod(c, pod, containerSubstr)
  659. }
  660. }
  661. // DeleteNamespaces deletes all namespaces that match the given delete and skip filters.
  662. // Filter is by simple strings.Contains; first skip filter, then delete filter.
  663. // Returns the list of deleted namespaces or an error.
  664. func DeleteNamespaces(c *client.Client, deleteFilter, skipFilter []string) ([]string, error) {
  665. By("Deleting namespaces")
  666. nsList, err := c.Namespaces().List(api.ListOptions{})
  667. Expect(err).NotTo(HaveOccurred())
  668. var deleted []string
  669. var wg sync.WaitGroup
  670. OUTER:
  671. for _, item := range nsList.Items {
  672. if skipFilter != nil {
  673. for _, pattern := range skipFilter {
  674. if strings.Contains(item.Name, pattern) {
  675. continue OUTER
  676. }
  677. }
  678. }
  679. if deleteFilter != nil {
  680. var shouldDelete bool
  681. for _, pattern := range deleteFilter {
  682. if strings.Contains(item.Name, pattern) {
  683. shouldDelete = true
  684. break
  685. }
  686. }
  687. if !shouldDelete {
  688. continue OUTER
  689. }
  690. }
  691. wg.Add(1)
  692. deleted = append(deleted, item.Name)
  693. go func(nsName string) {
  694. defer wg.Done()
  695. defer GinkgoRecover()
  696. Expect(c.Namespaces().Delete(nsName)).To(Succeed())
  697. Logf("namespace : %v api call to delete is complete ", nsName)
  698. }(item.Name)
  699. }
  700. wg.Wait()
  701. return deleted, nil
  702. }
  703. func WaitForNamespacesDeleted(c *client.Client, namespaces []string, timeout time.Duration) error {
  704. By("Waiting for namespaces to vanish")
  705. nsMap := map[string]bool{}
  706. for _, ns := range namespaces {
  707. nsMap[ns] = true
  708. }
  709. //Now POLL until all namespaces have been eradicated.
  710. return wait.Poll(2*time.Second, timeout,
  711. func() (bool, error) {
  712. nsList, err := c.Namespaces().List(api.ListOptions{})
  713. if err != nil {
  714. return false, err
  715. }
  716. for _, item := range nsList.Items {
  717. if _, ok := nsMap[item.Name]; ok {
  718. return false, nil
  719. }
  720. }
  721. return true, nil
  722. })
  723. }
  724. func waitForServiceAccountInNamespace(c *client.Client, ns, serviceAccountName string, timeout time.Duration) error {
  725. w, err := c.ServiceAccounts(ns).Watch(api.SingleObject(api.ObjectMeta{Name: serviceAccountName}))
  726. if err != nil {
  727. return err
  728. }
  729. _, err = watch.Until(timeout, w, client.ServiceAccountHasSecrets)
  730. return err
  731. }
  732. func waitForPodCondition(c *client.Client, ns, podName, desc string, timeout time.Duration, condition podCondition) error {
  733. Logf("Waiting up to %[1]v for pod %[2]s status to be %[3]s", timeout, podName, desc)
  734. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  735. pod, err := c.Pods(ns).Get(podName)
  736. if err != nil {
  737. if apierrs.IsNotFound(err) {
  738. Logf("Pod %q in namespace %q disappeared. Error: %v", podName, ns, err)
  739. return err
  740. }
  741. // Aligning this text makes it much more readable
  742. Logf("Get pod %[1]s in namespace '%[2]s' failed, ignoring for %[3]v. Error: %[4]v",
  743. podName, ns, Poll, err)
  744. continue
  745. }
  746. done, err := condition(pod)
  747. if done {
  748. return err
  749. }
  750. Logf("Waiting for pod %[1]s in namespace '%[2]s' status to be '%[3]s'"+
  751. "(found phase: %[4]q, readiness: %[5]t) (%[6]v elapsed)",
  752. podName, ns, desc, pod.Status.Phase, podReady(pod), time.Since(start))
  753. }
  754. return fmt.Errorf("gave up waiting for pod '%s' to be '%s' after %v", podName, desc, timeout)
  755. }
  756. // WaitForMatchPodsCondition finds match pods based on the input ListOptions.
  757. // waits and checks if all match pods are in the given podCondition
  758. func WaitForMatchPodsCondition(c *client.Client, opts api.ListOptions, desc string, timeout time.Duration, condition podCondition) error {
  759. Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc)
  760. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  761. pods, err := c.Pods(api.NamespaceAll).List(opts)
  762. if err != nil {
  763. return err
  764. }
  765. conditionNotMatch := []string{}
  766. for _, pod := range pods.Items {
  767. done, err := condition(&pod)
  768. if done && err != nil {
  769. return fmt.Errorf("Unexpected error: %v", err)
  770. }
  771. if !done {
  772. conditionNotMatch = append(conditionNotMatch, format.Pod(&pod))
  773. }
  774. }
  775. if len(conditionNotMatch) <= 0 {
  776. return err
  777. }
  778. Logf("%d pods are not %s", len(conditionNotMatch), desc)
  779. }
  780. return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout)
  781. }
  782. // WaitForDefaultServiceAccountInNamespace waits for the default service account to be provisioned
  783. // the default service account is what is associated with pods when they do not specify a service account
  784. // as a result, pods are not able to be provisioned in a namespace until the service account is provisioned
  785. func WaitForDefaultServiceAccountInNamespace(c *client.Client, namespace string) error {
  786. return waitForServiceAccountInNamespace(c, namespace, "default", ServiceAccountProvisionTimeout)
  787. }
  788. // WaitForFederationApiserverReady waits for the federation apiserver to be ready.
  789. // It tests the readiness by sending a GET request and expecting a non error response.
  790. func WaitForFederationApiserverReady(c *federation_release_1_4.Clientset) error {
  791. return wait.PollImmediate(time.Second, 1*time.Minute, func() (bool, error) {
  792. _, err := c.Federation().Clusters().List(api.ListOptions{})
  793. if err != nil {
  794. return false, nil
  795. }
  796. return true, nil
  797. })
  798. }
  799. // WaitForPersistentVolumePhase waits for a PersistentVolume to be in a specific phase or until timeout occurs, whichever comes first.
  800. func WaitForPersistentVolumePhase(phase api.PersistentVolumePhase, c *client.Client, pvName string, Poll, timeout time.Duration) error {
  801. Logf("Waiting up to %v for PersistentVolume %s to have phase %s", timeout, pvName, phase)
  802. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  803. pv, err := c.PersistentVolumes().Get(pvName)
  804. if err != nil {
  805. Logf("Get persistent volume %s in failed, ignoring for %v: %v", pvName, Poll, err)
  806. continue
  807. } else {
  808. if pv.Status.Phase == phase {
  809. Logf("PersistentVolume %s found and phase=%s (%v)", pvName, phase, time.Since(start))
  810. return nil
  811. } else {
  812. Logf("PersistentVolume %s found but phase is %s instead of %s.", pvName, pv.Status.Phase, phase)
  813. }
  814. }
  815. }
  816. return fmt.Errorf("PersistentVolume %s not in phase %s within %v", pvName, phase, timeout)
  817. }
  818. // WaitForPersistentVolumeDeleted waits for a PersistentVolume to get deleted or until timeout occurs, whichever comes first.
  819. func WaitForPersistentVolumeDeleted(c *client.Client, pvName string, Poll, timeout time.Duration) error {
  820. Logf("Waiting up to %v for PersistentVolume %s to get deleted", timeout, pvName)
  821. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  822. pv, err := c.PersistentVolumes().Get(pvName)
  823. if err == nil {
  824. Logf("PersistentVolume %s found and phase=%s (%v)", pvName, pv.Status.Phase, time.Since(start))
  825. continue
  826. } else {
  827. if apierrs.IsNotFound(err) {
  828. Logf("PersistentVolume %s was removed", pvName)
  829. return nil
  830. } else {
  831. Logf("Get persistent volume %s in failed, ignoring for %v: %v", pvName, Poll, err)
  832. }
  833. }
  834. }
  835. return fmt.Errorf("PersistentVolume %s still exists within %v", pvName, timeout)
  836. }
  837. // WaitForPersistentVolumeClaimPhase waits for a PersistentVolumeClaim to be in a specific phase or until timeout occurs, whichever comes first.
  838. func WaitForPersistentVolumeClaimPhase(phase api.PersistentVolumeClaimPhase, c *client.Client, ns string, pvcName string, Poll, timeout time.Duration) error {
  839. Logf("Waiting up to %v for PersistentVolumeClaim %s to have phase %s", timeout, pvcName, phase)
  840. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  841. pvc, err := c.PersistentVolumeClaims(ns).Get(pvcName)
  842. if err != nil {
  843. Logf("Get persistent volume claim %s in failed, ignoring for %v: %v", pvcName, Poll, err)
  844. continue
  845. } else {
  846. if pvc.Status.Phase == phase {
  847. Logf("PersistentVolumeClaim %s found and phase=%s (%v)", pvcName, phase, time.Since(start))
  848. return nil
  849. } else {
  850. Logf("PersistentVolumeClaim %s found but phase is %s instead of %s.", pvcName, pvc.Status.Phase, phase)
  851. }
  852. }
  853. }
  854. return fmt.Errorf("PersistentVolumeClaim %s not in phase %s within %v", pvcName, phase, timeout)
  855. }
  856. // CreateTestingNS should be used by every test, note that we append a common prefix to the provided test name.
  857. // Please see NewFramework instead of using this directly.
  858. func CreateTestingNS(baseName string, c *client.Client, labels map[string]string) (*api.Namespace, error) {
  859. if labels == nil {
  860. labels = map[string]string{}
  861. }
  862. labels["e2e-run"] = string(RunId)
  863. namespaceObj := &api.Namespace{
  864. ObjectMeta: api.ObjectMeta{
  865. GenerateName: fmt.Sprintf("e2e-tests-%v-", baseName),
  866. Namespace: "",
  867. Labels: labels,
  868. },
  869. Status: api.NamespaceStatus{},
  870. }
  871. // Be robust about making the namespace creation call.
  872. var got *api.Namespace
  873. if err := wait.PollImmediate(Poll, SingleCallTimeout, func() (bool, error) {
  874. var err error
  875. got, err = c.Namespaces().Create(namespaceObj)
  876. if err != nil {
  877. Logf("Unexpected error while creating namespace: %v", err)
  878. return false, nil
  879. }
  880. return true, nil
  881. }); err != nil {
  882. return nil, err
  883. }
  884. if TestContext.VerifyServiceAccount {
  885. if err := WaitForDefaultServiceAccountInNamespace(c, got.Name); err != nil {
  886. return nil, err
  887. }
  888. }
  889. return got, nil
  890. }
  891. // CheckTestingNSDeletedExcept checks whether all e2e based existing namespaces are in the Terminating state
  892. // and waits until they are finally deleted. It ignores namespace skip.
  893. func CheckTestingNSDeletedExcept(c *client.Client, skip string) error {
  894. // TODO: Since we don't have support for bulk resource deletion in the API,
  895. // while deleting a namespace we are deleting all objects from that namespace
  896. // one by one (one deletion == one API call). This basically exposes us to
  897. // throttling - currently controller-manager has a limit of max 20 QPS.
  898. // Once #10217 is implemented and used in namespace-controller, deleting all
  899. // object from a given namespace should be much faster and we will be able
  900. // to lower this timeout.
  901. // However, now Density test is producing ~26000 events and Load capacity test
  902. // is producing ~35000 events, thus assuming there are no other requests it will
  903. // take ~30 minutes to fully delete the namespace. Thus I'm setting it to 60
  904. // minutes to avoid any timeouts here.
  905. timeout := 60 * time.Minute
  906. Logf("Waiting for terminating namespaces to be deleted...")
  907. for start := time.Now(); time.Since(start) < timeout; time.Sleep(15 * time.Second) {
  908. namespaces, err := c.Namespaces().List(api.ListOptions{})
  909. if err != nil {
  910. Logf("Listing namespaces failed: %v", err)
  911. continue
  912. }
  913. terminating := 0
  914. for _, ns := range namespaces.Items {
  915. if strings.HasPrefix(ns.ObjectMeta.Name, "e2e-tests-") && ns.ObjectMeta.Name != skip {
  916. if ns.Status.Phase == api.NamespaceActive {
  917. return fmt.Errorf("Namespace %s is active", ns.ObjectMeta.Name)
  918. }
  919. terminating++
  920. }
  921. }
  922. if terminating == 0 {
  923. return nil
  924. }
  925. }
  926. return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
  927. }
  928. // deleteNS deletes the provided namespace, waits for it to be completely deleted, and then checks
  929. // whether there are any pods remaining in a non-terminating state.
  930. func deleteNS(c *client.Client, clientPool dynamic.ClientPool, namespace string, timeout time.Duration) error {
  931. if err := c.Namespaces().Delete(namespace); err != nil {
  932. return err
  933. }
  934. // wait for namespace to delete or timeout.
  935. err := wait.PollImmediate(5*time.Second, timeout, func() (bool, error) {
  936. if _, err := c.Namespaces().Get(namespace); err != nil {
  937. if apierrs.IsNotFound(err) {
  938. return true, nil
  939. }
  940. Logf("Error while waiting for namespace to be terminated: %v", err)
  941. return false, nil
  942. }
  943. return false, nil
  944. })
  945. // verify there is no more remaining content in the namespace
  946. remainingContent, cerr := hasRemainingContent(c, clientPool, namespace)
  947. if cerr != nil {
  948. return cerr
  949. }
  950. // if content remains, let's dump information about the namespace, and system for flake debugging.
  951. remainingPods := 0
  952. missingTimestamp := 0
  953. if remainingContent {
  954. // log information about namespace, and set of namespaces in api server to help flake detection
  955. logNamespace(c, namespace)
  956. logNamespaces(c, namespace)
  957. // if we can, check if there were pods remaining with no timestamp.
  958. remainingPods, missingTimestamp, _ = countRemainingPods(c, namespace)
  959. }
  960. // a timeout waiting for namespace deletion happened!
  961. if err != nil {
  962. // some content remains in the namespace
  963. if remainingContent {
  964. // pods remain
  965. if remainingPods > 0 {
  966. // but they were all undergoing deletion (kubelet is probably culprit)
  967. if missingTimestamp == 0 {
  968. return fmt.Errorf("namespace %v was not deleted with limit: %v, pods remaining: %v, pods missing deletion timestamp: %v", namespace, err, remainingPods, missingTimestamp)
  969. }
  970. // pods remained, but were not undergoing deletion (namespace controller is probably culprit)
  971. return fmt.Errorf("namespace %v was not deleted with limit: %v, pods remaining: %v", namespace, err, remainingPods)
  972. }
  973. // other content remains (namespace controller is probably screwed up)
  974. return fmt.Errorf("namespace %v was not deleted with limit: %v, namespaced content other than pods remain", namespace, err)
  975. }
  976. // no remaining content, but namespace was not deleted (namespace controller is probably wedged)
  977. return fmt.Errorf("namespace %v was not deleted with limit: %v, namespace is empty but is not yet removed", namespace, err)
  978. }
  979. return nil
  980. }
  981. // logNamespaces logs the number of namespaces by phase
  982. // namespace is the namespace the test was operating against that failed to delete so it can be grepped in logs
  983. func logNamespaces(c *client.Client, namespace string) {
  984. namespaceList, err := c.Namespaces().List(api.ListOptions{})
  985. if err != nil {
  986. Logf("namespace: %v, unable to list namespaces: %v", namespace, err)
  987. return
  988. }
  989. numActive := 0
  990. numTerminating := 0
  991. for _, namespace := range namespaceList.Items {
  992. if namespace.Status.Phase == api.NamespaceActive {
  993. numActive++
  994. } else {
  995. numTerminating++
  996. }
  997. }
  998. Logf("namespace: %v, total namespaces: %v, active: %v, terminating: %v", namespace, len(namespaceList.Items), numActive, numTerminating)
  999. }
  1000. // logNamespace logs detail about a namespace
  1001. func logNamespace(c *client.Client, namespace string) {
  1002. ns, err := c.Namespaces().Get(namespace)
  1003. if err != nil {
  1004. if apierrs.IsNotFound(err) {
  1005. Logf("namespace: %v no longer exists", namespace)
  1006. return
  1007. }
  1008. Logf("namespace: %v, unable to get namespace due to error: %v", namespace, err)
  1009. return
  1010. }
  1011. Logf("namespace: %v, DeletionTimetamp: %v, Finalizers: %v, Phase: %v", ns.Name, ns.DeletionTimestamp, ns.Spec.Finalizers, ns.Status.Phase)
  1012. }
  1013. // countRemainingPods queries the server to count number of remaining pods, and number of pods that had a missing deletion timestamp.
  1014. func countRemainingPods(c *client.Client, namespace string) (int, int, error) {
  1015. // check for remaining pods
  1016. pods, err := c.Pods(namespace).List(api.ListOptions{})
  1017. if err != nil {
  1018. return 0, 0, err
  1019. }
  1020. // nothing remains!
  1021. if len(pods.Items) == 0 {
  1022. return 0, 0, nil
  1023. }
  1024. // stuff remains, log about it
  1025. logPodStates(pods.Items)
  1026. // check if there were any pods with missing deletion timestamp
  1027. numPods := len(pods.Items)
  1028. missingTimestamp := 0
  1029. for _, pod := range pods.Items {
  1030. if pod.DeletionTimestamp == nil {
  1031. missingTimestamp++
  1032. }
  1033. }
  1034. return numPods, missingTimestamp, nil
  1035. }
  1036. // hasRemainingContent checks if there is remaining content in the namespace via API discovery
  1037. func hasRemainingContent(c *client.Client, clientPool dynamic.ClientPool, namespace string) (bool, error) {
  1038. // some tests generate their own framework.Client rather than the default
  1039. // TODO: ensure every test call has a configured clientPool
  1040. if clientPool == nil {
  1041. return false, nil
  1042. }
  1043. // find out what content is supported on the server
  1044. groupVersionResources, err := c.Discovery().ServerPreferredNamespacedResources()
  1045. if err != nil {
  1046. return false, err
  1047. }
  1048. // TODO: temporary hack for https://github.com/kubernetes/kubernetes/issues/31798
  1049. ignoredResources := sets.NewString("bindings")
  1050. contentRemaining := false
  1051. // dump how many of resource type is on the server in a log.
  1052. for _, gvr := range groupVersionResources {
  1053. // get a client for this group version...
  1054. dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion())
  1055. if err != nil {
  1056. // not all resource types support list, so some errors here are normal depending on the resource type.
  1057. Logf("namespace: %s, unable to get client - gvr: %v, error: %v", namespace, gvr, err)
  1058. continue
  1059. }
  1060. // get the api resource
  1061. apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
  1062. // TODO: temporary hack for https://github.com/kubernetes/kubernetes/issues/31798
  1063. if ignoredResources.Has(apiResource.Name) {
  1064. Logf("namespace: %s, resource: %s, ignored listing per whitelist", namespace, apiResource.Name)
  1065. continue
  1066. }
  1067. obj, err := dynamicClient.Resource(&apiResource, namespace).List(&v1.ListOptions{})
  1068. if err != nil {
  1069. // not all resources support list, so we ignore those
  1070. if apierrs.IsMethodNotSupported(err) || apierrs.IsNotFound(err) || apierrs.IsForbidden(err) {
  1071. continue
  1072. }
  1073. return false, err
  1074. }
  1075. unstructuredList, ok := obj.(*runtime.UnstructuredList)
  1076. if !ok {
  1077. return false, fmt.Errorf("namespace: %s, resource: %s, expected *runtime.UnstructuredList, got %#v", namespace, apiResource.Name, obj)
  1078. }
  1079. if len(unstructuredList.Items) > 0 {
  1080. Logf("namespace: %s, resource: %s, items remaining: %v", namespace, apiResource.Name, len(unstructuredList.Items))
  1081. contentRemaining = true
  1082. }
  1083. }
  1084. return contentRemaining, nil
  1085. }
  1086. func ContainerInitInvariant(older, newer runtime.Object) error {
  1087. oldPod := older.(*api.Pod)
  1088. newPod := newer.(*api.Pod)
  1089. if len(oldPod.Spec.InitContainers) == 0 {
  1090. return nil
  1091. }
  1092. if len(oldPod.Spec.InitContainers) != len(newPod.Spec.InitContainers) {
  1093. return fmt.Errorf("init container list changed")
  1094. }
  1095. if oldPod.UID != newPod.UID {
  1096. return fmt.Errorf("two different pods exist in the condition: %s vs %s", oldPod.UID, newPod.UID)
  1097. }
  1098. if err := initContainersInvariants(oldPod); err != nil {
  1099. return err
  1100. }
  1101. if err := initContainersInvariants(newPod); err != nil {
  1102. return err
  1103. }
  1104. oldInit, _, _ := podInitialized(oldPod)
  1105. newInit, _, _ := podInitialized(newPod)
  1106. if oldInit && !newInit {
  1107. // TODO: we may in the future enable resetting PodInitialized = false if the kubelet needs to restart it
  1108. // from scratch
  1109. return fmt.Errorf("pod cannot be initialized and then regress to not being initialized")
  1110. }
  1111. return nil
  1112. }
  1113. func podInitialized(pod *api.Pod) (ok bool, failed bool, err error) {
  1114. allInit := true
  1115. initFailed := false
  1116. for _, s := range pod.Status.InitContainerStatuses {
  1117. switch {
  1118. case initFailed && s.State.Waiting == nil:
  1119. return allInit, initFailed, fmt.Errorf("container %s is after a failed container but isn't waiting", s.Name)
  1120. case allInit && s.State.Waiting == nil:
  1121. return allInit, initFailed, fmt.Errorf("container %s is after an initializing container but isn't waiting", s.Name)
  1122. case s.State.Terminated == nil:
  1123. allInit = false
  1124. case s.State.Terminated.ExitCode != 0:
  1125. allInit = false
  1126. initFailed = true
  1127. case !s.Ready:
  1128. return allInit, initFailed, fmt.Errorf("container %s initialized but isn't marked as ready", s.Name)
  1129. }
  1130. }
  1131. return allInit, initFailed, nil
  1132. }
  1133. func initContainersInvariants(pod *api.Pod) error {
  1134. allInit, initFailed, err := podInitialized(pod)
  1135. if err != nil {
  1136. return err
  1137. }
  1138. if !allInit || initFailed {
  1139. for _, s := range pod.Status.ContainerStatuses {
  1140. if s.State.Waiting == nil || s.RestartCount != 0 {
  1141. return fmt.Errorf("container %s is not waiting but initialization not complete", s.Name)
  1142. }
  1143. if s.State.Waiting.Reason != "PodInitializing" {
  1144. return fmt.Errorf("container %s should have reason PodInitializing: %s", s.Name, s.State.Waiting.Reason)
  1145. }
  1146. }
  1147. }
  1148. _, c := api.GetPodCondition(&pod.Status, api.PodInitialized)
  1149. if c == nil {
  1150. return fmt.Errorf("pod does not have initialized condition")
  1151. }
  1152. if c.LastTransitionTime.IsZero() {
  1153. return fmt.Errorf("PodInitialized condition should always have a transition time")
  1154. }
  1155. switch {
  1156. case c.Status == api.ConditionUnknown:
  1157. return fmt.Errorf("PodInitialized condition should never be Unknown")
  1158. case c.Status == api.ConditionTrue && (initFailed || !allInit):
  1159. return fmt.Errorf("PodInitialized condition was True but all not all containers initialized")
  1160. case c.Status == api.ConditionFalse && (!initFailed && allInit):
  1161. return fmt.Errorf("PodInitialized condition was False but all containers initialized")
  1162. }
  1163. return nil
  1164. }
  1165. type InvariantFunc func(older, newer runtime.Object) error
  1166. func CheckInvariants(events []watch.Event, fns ...InvariantFunc) error {
  1167. errs := sets.NewString()
  1168. for i := range events {
  1169. j := i + 1
  1170. if j >= len(events) {
  1171. continue
  1172. }
  1173. for _, fn := range fns {
  1174. if err := fn(events[i].Object, events[j].Object); err != nil {
  1175. errs.Insert(err.Error())
  1176. }
  1177. }
  1178. }
  1179. if errs.Len() > 0 {
  1180. return fmt.Errorf("invariants violated:\n* %s", strings.Join(errs.List(), "\n* "))
  1181. }
  1182. return nil
  1183. }
  1184. // Waits default amount of time (PodStartTimeout) for the specified pod to become running.
  1185. // Returns an error if timeout occurs first, or pod goes in to failed state.
  1186. func WaitForPodRunningInNamespace(c *client.Client, pod *api.Pod) error {
  1187. // this short-cicuit is needed for cases when we pass a list of pods instead
  1188. // of newly created pod (eg. VerifyPods) which means we are getting already
  1189. // running pod for which waiting does not make sense and will always fail
  1190. if pod.Status.Phase == api.PodRunning {
  1191. return nil
  1192. }
  1193. return waitTimeoutForPodRunningInNamespace(c, pod.Name, pod.Namespace, pod.ResourceVersion, PodStartTimeout)
  1194. }
  1195. // Waits default amount of time (PodStartTimeout) for the specified pod to become running.
  1196. // Returns an error if timeout occurs first, or pod goes in to failed state.
  1197. func WaitForPodNameRunningInNamespace(c *client.Client, podName, namespace string) error {
  1198. return waitTimeoutForPodRunningInNamespace(c, podName, namespace, "", PodStartTimeout)
  1199. }
  1200. // Waits an extended amount of time (slowPodStartTimeout) for the specified pod to become running.
  1201. // The resourceVersion is used when Watching object changes, it tells since when we care
  1202. // about changes to the pod. Returns an error if timeout occurs first, or pod goes in to failed state.
  1203. func waitForPodRunningInNamespaceSlow(c *client.Client, podName, namespace, resourceVersion string) error {
  1204. return waitTimeoutForPodRunningInNamespace(c, podName, namespace, resourceVersion, slowPodStartTimeout)
  1205. }
  1206. func waitTimeoutForPodRunningInNamespace(c *client.Client, podName, namespace, resourceVersion string, timeout time.Duration) error {
  1207. w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
  1208. if err != nil {
  1209. return err
  1210. }
  1211. _, err = watch.Until(timeout, w, client.PodRunning)
  1212. return err
  1213. }
  1214. // Waits default amount of time (podNoLongerRunningTimeout) for the specified pod to stop running.
  1215. // Returns an error if timeout occurs first.
  1216. func WaitForPodNoLongerRunningInNamespace(c *client.Client, podName, namespace, resourceVersion string) error {
  1217. return WaitTimeoutForPodNoLongerRunningInNamespace(c, podName, namespace, resourceVersion, podNoLongerRunningTimeout)
  1218. }
  1219. func WaitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName, namespace, resourceVersion string, timeout time.Duration) error {
  1220. w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
  1221. if err != nil {
  1222. return err
  1223. }
  1224. _, err = watch.Until(timeout, w, client.PodCompleted)
  1225. return err
  1226. }
  1227. func waitTimeoutForPodReadyInNamespace(c *client.Client, podName, namespace, resourceVersion string, timeout time.Duration) error {
  1228. w, err := c.Pods(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
  1229. if err != nil {
  1230. return err
  1231. }
  1232. _, err = watch.Until(timeout, w, client.PodRunningAndReady)
  1233. return err
  1234. }
  1235. // WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
  1236. // The resourceVersion is used when Watching object changes, it tells since when we care
  1237. // about changes to the pod.
  1238. func WaitForPodNotPending(c *client.Client, ns, podName, resourceVersion string) error {
  1239. w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
  1240. if err != nil {
  1241. return err
  1242. }
  1243. _, err = watch.Until(PodStartTimeout, w, client.PodNotPending)
  1244. return err
  1245. }
  1246. // waitForPodTerminatedInNamespace returns an error if it took too long for the pod
  1247. // to terminate or if the pod terminated with an unexpected reason.
  1248. func waitForPodTerminatedInNamespace(c *client.Client, podName, reason, namespace string) error {
  1249. return waitForPodCondition(c, namespace, podName, "terminated due to deadline exceeded", PodStartTimeout, func(pod *api.Pod) (bool, error) {
  1250. if pod.Status.Phase == api.PodFailed {
  1251. if pod.Status.Reason == reason {
  1252. return true, nil
  1253. } else {
  1254. return true, fmt.Errorf("Expected pod %v in namespace %v to be terminated with reason %v, got reason: %v", podName, namespace, reason, pod.Status.Reason)
  1255. }
  1256. }
  1257. return false, nil
  1258. })
  1259. }
  1260. // waitForPodSuccessInNamespaceTimeout returns nil if the pod reached state success, or an error if it reached failure or ran too long.
  1261. func waitForPodSuccessInNamespaceTimeout(c *client.Client, podName string, contName string, namespace string, timeout time.Duration) error {
  1262. return waitForPodCondition(c, namespace, podName, "success or failure", timeout, func(pod *api.Pod) (bool, error) {
  1263. // Cannot use pod.Status.Phase == api.PodSucceeded/api.PodFailed due to #2632
  1264. ci, ok := api.GetContainerStatus(pod.Status.ContainerStatuses, contName)
  1265. if !ok {
  1266. Logf("No Status.Info for container '%s' in pod '%s' yet", contName, podName)
  1267. } else {
  1268. if ci.State.Terminated != nil {
  1269. if ci.State.Terminated.ExitCode == 0 {
  1270. By("Saw pod success")
  1271. return true, nil
  1272. }
  1273. return true, fmt.Errorf("pod '%s' terminated with failure: %+v", podName, ci.State.Terminated)
  1274. }
  1275. Logf("Nil State.Terminated for container '%s' in pod '%s' in namespace '%s' so far", contName, podName, namespace)
  1276. }
  1277. return false, nil
  1278. })
  1279. }
  1280. // WaitForPodSuccessInNamespace returns nil if the pod reached state success, or an error if it reached failure or until podStartupTimeout.
  1281. func WaitForPodSuccessInNamespace(c *client.Client, podName string, contName string, namespace string) error {
  1282. return waitForPodSuccessInNamespaceTimeout(c, podName, contName, namespace, PodStartTimeout)
  1283. }
  1284. // WaitForPodSuccessInNamespaceSlow returns nil if the pod reached state success, or an error if it reached failure or until slowPodStartupTimeout.
  1285. func WaitForPodSuccessInNamespaceSlow(c *client.Client, podName string, contName string, namespace string) error {
  1286. return waitForPodSuccessInNamespaceTimeout(c, podName, contName, namespace, slowPodStartTimeout)
  1287. }
  1288. // waitForRCPodOnNode returns the pod from the given replication controller (described by rcName) which is scheduled on the given node.
  1289. // In case of failure or too long waiting time, an error is returned.
  1290. func waitForRCPodOnNode(c *client.Client, ns, rcName, node string) (*api.Pod, error) {
  1291. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
  1292. var p *api.Pod = nil
  1293. err := wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) {
  1294. Logf("Waiting for pod %s to appear on node %s", rcName, node)
  1295. options := api.ListOptions{LabelSelector: label}
  1296. pods, err := c.Pods(ns).List(options)
  1297. if err != nil {
  1298. return false, err
  1299. }
  1300. for _, pod := range pods.Items {
  1301. if pod.Spec.NodeName == node {
  1302. Logf("Pod %s found on node %s", pod.Name, node)
  1303. p = &pod
  1304. return true, nil
  1305. }
  1306. }
  1307. return false, nil
  1308. })
  1309. return p, err
  1310. }
  1311. // WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
  1312. func WaitForRCToStabilize(c *client.Client, ns, name string, timeout time.Duration) error {
  1313. options := api.ListOptions{FieldSelector: fields.Set{
  1314. "metadata.name": name,
  1315. "metadata.namespace": ns,
  1316. }.AsSelector()}
  1317. w, err := c.ReplicationControllers(ns).Watch(options)
  1318. if err != nil {
  1319. return err
  1320. }
  1321. _, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) {
  1322. switch event.Type {
  1323. case watch.Deleted:
  1324. return false, apierrs.NewNotFound(unversioned.GroupResource{Resource: "replicationcontrollers"}, "")
  1325. }
  1326. switch rc := event.Object.(type) {
  1327. case *api.ReplicationController:
  1328. if rc.Name == name && rc.Namespace == ns &&
  1329. rc.Generation <= rc.Status.ObservedGeneration &&
  1330. rc.Spec.Replicas == rc.Status.Replicas {
  1331. return true, nil
  1332. }
  1333. Logf("Waiting for rc %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
  1334. name, rc.Generation, rc.Status.ObservedGeneration, rc.Spec.Replicas, rc.Status.Replicas)
  1335. }
  1336. return false, nil
  1337. })
  1338. return err
  1339. }
  1340. func WaitForPodToDisappear(c *client.Client, ns, podName string, label labels.Selector, interval, timeout time.Duration) error {
  1341. return wait.PollImmediate(interval, timeout, func() (bool, error) {
  1342. Logf("Waiting for pod %s to disappear", podName)
  1343. options := api.ListOptions{LabelSelector: label}
  1344. pods, err := c.Pods(ns).List(options)
  1345. if err != nil {
  1346. return false, err
  1347. }
  1348. found := false
  1349. for _, pod := range pods.Items {
  1350. if pod.Name == podName {
  1351. Logf("Pod %s still exists", podName)
  1352. found = true
  1353. }
  1354. }
  1355. if !found {
  1356. Logf("Pod %s no longer exists", podName)
  1357. return true, nil
  1358. }
  1359. return false, nil
  1360. })
  1361. }
  1362. // WaitForRCPodToDisappear returns nil if the pod from the given replication controller (described by rcName) no longer exists.
  1363. // In case of failure or too long waiting time, an error is returned.
  1364. func WaitForRCPodToDisappear(c *client.Client, ns, rcName, podName string) error {
  1365. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
  1366. // NodeController evicts pod after 5 minutes, so we need timeout greater than that.
  1367. // Additionally, there can be non-zero grace period, so we are setting 10 minutes
  1368. // to be on the safe size.
  1369. return WaitForPodToDisappear(c, ns, podName, label, 20*time.Second, 10*time.Minute)
  1370. }
  1371. // WaitForService waits until the service appears (exist == true), or disappears (exist == false)
  1372. func WaitForService(c *client.Client, namespace, name string, exist bool, interval, timeout time.Duration) error {
  1373. err := wait.PollImmediate(interval, timeout, func() (bool, error) {
  1374. _, err := c.Services(namespace).Get(name)
  1375. switch {
  1376. case err == nil:
  1377. if !exist {
  1378. return false, nil
  1379. }
  1380. Logf("Service %s in namespace %s found.", name, namespace)
  1381. return true, nil
  1382. case apierrs.IsNotFound(err):
  1383. if exist {
  1384. return false, nil
  1385. }
  1386. Logf("Service %s in namespace %s disappeared.", name, namespace)
  1387. return true, nil
  1388. default:
  1389. Logf("Get service %s in namespace %s failed: %v", name, namespace, err)
  1390. return false, nil
  1391. }
  1392. })
  1393. if err != nil {
  1394. stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
  1395. return fmt.Errorf("error waiting for service %s/%s %s: %v", namespace, name, stateMsg[exist], err)
  1396. }
  1397. return nil
  1398. }
  1399. //WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
  1400. func WaitForServiceEndpointsNum(c *client.Client, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
  1401. return wait.Poll(interval, timeout, func() (bool, error) {
  1402. Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
  1403. list, err := c.Endpoints(namespace).List(api.ListOptions{})
  1404. if err != nil {
  1405. return false, err
  1406. }
  1407. for _, e := range list.Items {
  1408. if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
  1409. return true, nil
  1410. }
  1411. }
  1412. return false, nil
  1413. })
  1414. }
  1415. func countEndpointsNum(e *api.Endpoints) int {
  1416. num := 0
  1417. for _, sub := range e.Subsets {
  1418. num += len(sub.Addresses)
  1419. }
  1420. return num
  1421. }
  1422. // WaitForReplicationController waits until the RC appears (exist == true), or disappears (exist == false)
  1423. func WaitForReplicationController(c *client.Client, namespace, name string, exist bool, interval, timeout time.Duration) error {
  1424. err := wait.PollImmediate(interval, timeout, func() (bool, error) {
  1425. _, err := c.ReplicationControllers(namespace).Get(name)
  1426. if err != nil {
  1427. Logf("Get ReplicationController %s in namespace %s failed (%v).", name, namespace, err)
  1428. return !exist, nil
  1429. } else {
  1430. Logf("ReplicationController %s in namespace %s found.", name, namespace)
  1431. return exist, nil
  1432. }
  1433. })
  1434. if err != nil {
  1435. stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
  1436. return fmt.Errorf("error waiting for ReplicationController %s/%s %s: %v", namespace, name, stateMsg[exist], err)
  1437. }
  1438. return nil
  1439. }
  1440. func WaitForEndpoint(c *client.Client, ns, name string) error {
  1441. for t := time.Now(); time.Since(t) < EndpointRegisterTimeout; time.Sleep(Poll) {
  1442. endpoint, err := c.Endpoints(ns).Get(name)
  1443. Expect(err).NotTo(HaveOccurred())
  1444. if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 {
  1445. Logf("Endpoint %s/%s is not ready yet", ns, name)
  1446. continue
  1447. } else {
  1448. return nil
  1449. }
  1450. }
  1451. return fmt.Errorf("Failed to get endpoints for %s/%s", ns, name)
  1452. }
  1453. // Context for checking pods responses by issuing GETs to them (via the API
  1454. // proxy) and verifying that they answer with ther own pod name.
  1455. type podProxyResponseChecker struct {
  1456. c *client.Client
  1457. ns string
  1458. label labels.Selector
  1459. controllerName string
  1460. respondName bool // Whether the pod should respond with its own name.
  1461. pods *api.PodList
  1462. }
  1463. func PodProxyResponseChecker(c *client.Client, ns string, label labels.Selector, controllerName string, respondName bool, pods *api.PodList) podProxyResponseChecker {
  1464. return podProxyResponseChecker{c, ns, label, controllerName, respondName, pods}
  1465. }
  1466. // CheckAllResponses issues GETs to all pods in the context and verify they
  1467. // reply with their own pod name.
  1468. func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
  1469. successes := 0
  1470. options := api.ListOptions{LabelSelector: r.label}
  1471. currentPods, err := r.c.Pods(r.ns).List(options)
  1472. Expect(err).NotTo(HaveOccurred())
  1473. for i, pod := range r.pods.Items {
  1474. // Check that the replica list remains unchanged, otherwise we have problems.
  1475. if !isElementOf(pod.UID, currentPods) {
  1476. return false, fmt.Errorf("pod with UID %s is no longer a member of the replica set. Must have been restarted for some reason. Current replica set: %v", pod.UID, currentPods)
  1477. }
  1478. subResourceProxyAvailable, err := ServerVersionGTE(SubResourcePodProxyVersion, r.c)
  1479. if err != nil {
  1480. return false, err
  1481. }
  1482. var body []byte
  1483. if subResourceProxyAvailable {
  1484. body, err = r.c.Get().
  1485. Namespace(r.ns).
  1486. Resource("pods").
  1487. SubResource("proxy").
  1488. Name(string(pod.Name)).
  1489. Do().
  1490. Raw()
  1491. } else {
  1492. body, err = r.c.Get().
  1493. Prefix("proxy").
  1494. Namespace(r.ns).
  1495. Resource("pods").
  1496. Name(string(pod.Name)).
  1497. Do().
  1498. Raw()
  1499. }
  1500. if err != nil {
  1501. Logf("Controller %s: Failed to GET from replica %d [%s]: %v:", r.controllerName, i+1, pod.Name, err)
  1502. continue
  1503. }
  1504. // The response checker expects the pod's name unless !respondName, in
  1505. // which case it just checks for a non-empty response.
  1506. got := string(body)
  1507. what := ""
  1508. if r.respondName {
  1509. what = "expected"
  1510. want := pod.Name
  1511. if got != want {
  1512. Logf("Controller %s: Replica %d [%s] expected response %q but got %q",
  1513. r.controllerName, i+1, pod.Name, want, got)
  1514. continue
  1515. }
  1516. } else {
  1517. what = "non-empty"
  1518. if len(got) == 0 {
  1519. Logf("Controller %s: Replica %d [%s] expected non-empty response",
  1520. r.controllerName, i+1, pod.Name)
  1521. continue
  1522. }
  1523. }
  1524. successes++
  1525. Logf("Controller %s: Got %s result from replica %d [%s]: %q, %d of %d required successes so far",
  1526. r.controllerName, what, i+1, pod.Name, got, successes, len(r.pods.Items))
  1527. }
  1528. if successes < len(r.pods.Items) {
  1529. return false, nil
  1530. }
  1531. return true, nil
  1532. }
  1533. // ServerVersionGTE returns true if v is greater than or equal to the server
  1534. // version.
  1535. //
  1536. // TODO(18726): This should be incorporated into client.VersionInterface.
  1537. func ServerVersionGTE(v semver.Version, c discovery.ServerVersionInterface) (bool, error) {
  1538. serverVersion, err := c.ServerVersion()
  1539. if err != nil {
  1540. return false, fmt.Errorf("Unable to get server version: %v", err)
  1541. }
  1542. sv, err := version.Parse(serverVersion.GitVersion)
  1543. if err != nil {
  1544. return false, fmt.Errorf("Unable to parse server version %q: %v", serverVersion.GitVersion, err)
  1545. }
  1546. return sv.GTE(v), nil
  1547. }
  1548. // KubectlVersionGTE returns true if the kubectl version is greater than or
  1549. // equal to v.
  1550. func KubectlVersionGTE(v semver.Version) (bool, error) {
  1551. kv, err := KubectlVersion()
  1552. if err != nil {
  1553. return false, err
  1554. }
  1555. return kv.GTE(v), nil
  1556. }
  1557. // KubectlVersion gets the version of kubectl that's currently being used (see
  1558. // --kubectl-path in e2e.go to use an alternate kubectl).
  1559. func KubectlVersion() (semver.Version, error) {
  1560. output := RunKubectlOrDie("version", "--client")
  1561. matches := gitVersionRegexp.FindStringSubmatch(output)
  1562. if len(matches) != 2 {
  1563. return semver.Version{}, fmt.Errorf("Could not find kubectl version in output %v", output)
  1564. }
  1565. // Don't use the full match, as it contains "GitVersion:\"" and a
  1566. // trailing "\"". Just use the submatch.
  1567. return version.Parse(matches[1])
  1568. }
  1569. func PodsResponding(c *client.Client, ns, name string, wantName bool, pods *api.PodList) error {
  1570. By("trying to dial each unique pod")
  1571. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  1572. return wait.PollImmediate(Poll, podRespondingTimeout, PodProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
  1573. }
  1574. func PodsCreated(c *client.Client, ns, name string, replicas int32) (*api.PodList, error) {
  1575. timeout := 2 * time.Minute
  1576. // List the pods, making sure we observe all the replicas.
  1577. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  1578. for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
  1579. options := api.ListOptions{LabelSelector: label}
  1580. pods, err := c.Pods(ns).List(options)
  1581. if err != nil {
  1582. return nil, err
  1583. }
  1584. created := []api.Pod{}
  1585. for _, pod := range pods.Items {
  1586. if pod.DeletionTimestamp != nil {
  1587. continue
  1588. }
  1589. created = append(created, pod)
  1590. }
  1591. Logf("Pod name %s: Found %d pods out of %d", name, len(created), replicas)
  1592. if int32(len(created)) == replicas {
  1593. pods.Items = created
  1594. return pods, nil
  1595. }
  1596. }
  1597. return nil, fmt.Errorf("Pod name %s: Gave up waiting %v for %d pods to come up", name, timeout, replicas)
  1598. }
  1599. func podsRunning(c *client.Client, pods *api.PodList) []error {
  1600. // Wait for the pods to enter the running state. Waiting loops until the pods
  1601. // are running so non-running pods cause a timeout for this test.
  1602. By("ensuring each pod is running")
  1603. e := []error{}
  1604. error_chan := make(chan error)
  1605. for _, pod := range pods.Items {
  1606. go func(p api.Pod) {
  1607. error_chan <- WaitForPodRunningInNamespace(c, &p)
  1608. }(pod)
  1609. }
  1610. for range pods.Items {
  1611. err := <-error_chan
  1612. if err != nil {
  1613. e = append(e, err)
  1614. }
  1615. }
  1616. return e
  1617. }
  1618. func VerifyPods(c *client.Client, ns, name string, wantName bool, replicas int32) error {
  1619. pods, err := PodsCreated(c, ns, name, replicas)
  1620. if err != nil {
  1621. return err
  1622. }
  1623. e := podsRunning(c, pods)
  1624. if len(e) > 0 {
  1625. return fmt.Errorf("failed to wait for pods running: %v", e)
  1626. }
  1627. err = PodsResponding(c, ns, name, wantName, pods)
  1628. if err != nil {
  1629. return fmt.Errorf("failed to wait for pods responding: %v", err)
  1630. }
  1631. return nil
  1632. }
  1633. func ServiceResponding(c *client.Client, ns, name string) error {
  1634. By(fmt.Sprintf("trying to dial the service %s.%s via the proxy", ns, name))
  1635. return wait.PollImmediate(Poll, ServiceRespondingTimeout, func() (done bool, err error) {
  1636. proxyRequest, errProxy := GetServicesProxyRequest(c, c.Get())
  1637. if errProxy != nil {
  1638. Logf("Failed to get services proxy request: %v:", errProxy)
  1639. return false, nil
  1640. }
  1641. body, err := proxyRequest.Namespace(ns).
  1642. Name(name).
  1643. Do().
  1644. Raw()
  1645. if err != nil {
  1646. Logf("Failed to GET from service %s: %v:", name, err)
  1647. return false, nil
  1648. }
  1649. got := string(body)
  1650. if len(got) == 0 {
  1651. Logf("Service %s: expected non-empty response", name)
  1652. return false, err // stop polling
  1653. }
  1654. Logf("Service %s: found nonempty answer: %s", name, got)
  1655. return true, nil
  1656. })
  1657. }
  1658. func restclientConfig(kubeContext string) (*clientcmdapi.Config, error) {
  1659. Logf(">>> kubeConfig: %s\n", TestContext.KubeConfig)
  1660. if TestContext.KubeConfig == "" {
  1661. return nil, fmt.Errorf("KubeConfig must be specified to load client config")
  1662. }
  1663. c, err := clientcmd.LoadFromFile(TestContext.KubeConfig)
  1664. if err != nil {
  1665. return nil, fmt.Errorf("error loading KubeConfig: %v", err.Error())
  1666. }
  1667. if kubeContext != "" {
  1668. Logf(">>> kubeContext: %s\n", kubeContext)
  1669. c.CurrentContext = kubeContext
  1670. }
  1671. return c, nil
  1672. }
  1673. type ClientConfigGetter func() (*restclient.Config, error)
  1674. func LoadConfig() (*restclient.Config, error) {
  1675. if TestContext.NodeName != "" {
  1676. // This is a node e2e test, apply the node e2e configuration
  1677. return &restclient.Config{Host: TestContext.Host}, nil
  1678. }
  1679. c, err := restclientConfig(TestContext.KubeContext)
  1680. if err != nil {
  1681. return nil, err
  1682. }
  1683. return clientcmd.NewDefaultClientConfig(*c, &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: TestContext.Host}}).ClientConfig()
  1684. }
  1685. func LoadFederatedConfig(overrides *clientcmd.ConfigOverrides) (*restclient.Config, error) {
  1686. c, err := restclientConfig(federatedKubeContext)
  1687. if err != nil {
  1688. return nil, fmt.Errorf("error creating federation client config: %v", err.Error())
  1689. }
  1690. cfg, err := clientcmd.NewDefaultClientConfig(*c, overrides).ClientConfig()
  1691. if cfg != nil {
  1692. //TODO(colhom): this is only here because https://github.com/kubernetes/kubernetes/issues/25422
  1693. cfg.NegotiatedSerializer = api.Codecs
  1694. }
  1695. if err != nil {
  1696. return cfg, fmt.Errorf("error creating federation client config: %v", err.Error())
  1697. }
  1698. return cfg, nil
  1699. }
  1700. func loadClientFromConfig(config *restclient.Config) (*client.Client, error) {
  1701. c, err := client.New(config)
  1702. if err != nil {
  1703. return nil, fmt.Errorf("error creating client: %v", err.Error())
  1704. }
  1705. if c.Client.Timeout == 0 {
  1706. c.Client.Timeout = SingleCallTimeout
  1707. }
  1708. return c, nil
  1709. }
  1710. func setTimeouts(cs ...*http.Client) {
  1711. for _, client := range cs {
  1712. if client.Timeout == 0 {
  1713. client.Timeout = SingleCallTimeout
  1714. }
  1715. }
  1716. }
  1717. func LoadFederationClientset_1_4() (*federation_release_1_4.Clientset, error) {
  1718. config, err := LoadFederatedConfig(&clientcmd.ConfigOverrides{})
  1719. if err != nil {
  1720. return nil, err
  1721. }
  1722. c, err := federation_release_1_4.NewForConfig(config)
  1723. if err != nil {
  1724. return nil, fmt.Errorf("error creating federation clientset: %v", err.Error())
  1725. }
  1726. // Set timeout for each client in the set.
  1727. setTimeouts(c.DiscoveryClient.Client, c.FederationClient.Client, c.CoreClient.Client, c.ExtensionsClient.Client)
  1728. return c, nil
  1729. }
  1730. func LoadClient() (*client.Client, error) {
  1731. config, err := LoadConfig()
  1732. if err != nil {
  1733. return nil, fmt.Errorf("error creating client: %v", err.Error())
  1734. }
  1735. return loadClientFromConfig(config)
  1736. }
  1737. // randomSuffix provides a random string to append to pods,services,rcs.
  1738. // TODO: Allow service names to have the same form as names
  1739. // for pods and replication controllers so we don't
  1740. // need to use such a function and can instead
  1741. // use the UUID utility function.
  1742. func randomSuffix() string {
  1743. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1744. return strconv.Itoa(r.Int() % 10000)
  1745. }
  1746. func ExpectNoError(err error, explain ...interface{}) {
  1747. if err != nil {
  1748. Logf("Unexpected error occurred: %v", err)
  1749. }
  1750. ExpectWithOffset(1, err).NotTo(HaveOccurred(), explain...)
  1751. }
  1752. func ExpectNoErrorWithRetries(fn func() error, maxRetries int, explain ...interface{}) {
  1753. var err error
  1754. for i := 0; i < maxRetries; i++ {
  1755. err = fn()
  1756. if err == nil {
  1757. return
  1758. }
  1759. Logf("(Attempt %d of %d) Unexpected error occurred: %v", i+1, maxRetries, err)
  1760. }
  1761. ExpectWithOffset(1, err).NotTo(HaveOccurred(), explain...)
  1762. }
  1763. // Stops everything from filePath from namespace ns and checks if everything matching selectors from the given namespace is correctly stopped.
  1764. func Cleanup(filePath, ns string, selectors ...string) {
  1765. By("using delete to clean up resources")
  1766. var nsArg string
  1767. if ns != "" {
  1768. nsArg = fmt.Sprintf("--namespace=%s", ns)
  1769. }
  1770. RunKubectlOrDie("delete", "--grace-period=0", "-f", filePath, nsArg)
  1771. AssertCleanup(ns, selectors...)
  1772. }
  1773. // Asserts that cleanup of a namespace wrt selectors occurred.
  1774. func AssertCleanup(ns string, selectors ...string) {
  1775. var nsArg string
  1776. if ns != "" {
  1777. nsArg = fmt.Sprintf("--namespace=%s", ns)
  1778. }
  1779. for _, selector := range selectors {
  1780. resources := RunKubectlOrDie("get", "rc,svc", "-l", selector, "--no-headers", nsArg)
  1781. if resources != "" {
  1782. Failf("Resources left running after stop:\n%s", resources)
  1783. }
  1784. pods := RunKubectlOrDie("get", "pods", "-l", selector, nsArg, "-o", "go-template={{ range .items }}{{ if not .metadata.deletionTimestamp }}{{ .metadata.name }}{{ \"\\n\" }}{{ end }}{{ end }}")
  1785. if pods != "" {
  1786. Failf("Pods left unterminated after stop:\n%s", pods)
  1787. }
  1788. }
  1789. }
  1790. // validatorFn is the function which is individual tests will implement.
  1791. // we may want it to return more than just an error, at some point.
  1792. type validatorFn func(c *client.Client, podID string) error
  1793. // ValidateController is a generic mechanism for testing RC's that are running.
  1794. // It takes a container name, a test name, and a validator function which is plugged in by a specific test.
  1795. // "containername": this is grepped for.
  1796. // "containerImage" : this is the name of the image we expect to be launched. Not to confuse w/ images (kitten.jpg) which are validated.
  1797. // "testname": which gets bubbled up to the logging/failure messages if errors happen.
  1798. // "validator" function: This function is given a podID and a client, and it can do some specific validations that way.
  1799. func ValidateController(c *client.Client, containerImage string, replicas int, containername string, testname string, validator validatorFn, ns string) {
  1800. getPodsTemplate := "--template={{range.items}}{{.metadata.name}} {{end}}"
  1801. // NB: kubectl adds the "exists" function to the standard template functions.
  1802. // This lets us check to see if the "running" entry exists for each of the containers
  1803. // we care about. Exists will never return an error and it's safe to check a chain of
  1804. // things, any one of which may not exist. In the below template, all of info,
  1805. // containername, and running might be nil, so the normal index function isn't very
  1806. // helpful.
  1807. // This template is unit-tested in kubectl, so if you change it, update the unit test.
  1808. // You can read about the syntax here: http://golang.org/pkg/text/template/.
  1809. getContainerStateTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if (and (eq .name "%s") (exists . "state" "running"))}}true{{end}}{{end}}{{end}}`, containername)
  1810. getImageTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if eq .name "%s"}}{{.image}}{{end}}{{end}}{{end}}`, containername)
  1811. By(fmt.Sprintf("waiting for all containers in %s pods to come up.", testname)) //testname should be selector
  1812. waitLoop:
  1813. for start := time.Now(); time.Since(start) < PodStartTimeout; time.Sleep(5 * time.Second) {
  1814. getPodsOutput := RunKubectlOrDie("get", "pods", "-o", "template", getPodsTemplate, "-l", testname, fmt.Sprintf("--namespace=%v", ns))
  1815. pods := strings.Fields(getPodsOutput)
  1816. if numPods := len(pods); numPods != replicas {
  1817. By(fmt.Sprintf("Replicas for %s: expected=%d actual=%d", testname, replicas, numPods))
  1818. continue
  1819. }
  1820. var runningPods []string
  1821. for _, podID := range pods {
  1822. running := RunKubectlOrDie("get", "pods", podID, "-o", "template", getContainerStateTemplate, fmt.Sprintf("--namespace=%v", ns))
  1823. if running != "true" {
  1824. Logf("%s is created but not running", podID)
  1825. continue waitLoop
  1826. }
  1827. currentImage := RunKubectlOrDie("get", "pods", podID, "-o", "template", getImageTemplate, fmt.Sprintf("--namespace=%v", ns))
  1828. if currentImage != containerImage {
  1829. Logf("%s is created but running wrong image; expected: %s, actual: %s", podID, containerImage, currentImage)
  1830. continue waitLoop
  1831. }
  1832. // Call the generic validator function here.
  1833. // This might validate for example, that (1) getting a url works and (2) url is serving correct content.
  1834. if err := validator(c, podID); err != nil {
  1835. Logf("%s is running right image but validator function failed: %v", podID, err)
  1836. continue waitLoop
  1837. }
  1838. Logf("%s is verified up and running", podID)
  1839. runningPods = append(runningPods, podID)
  1840. }
  1841. // If we reach here, then all our checks passed.
  1842. if len(runningPods) == replicas {
  1843. return
  1844. }
  1845. }
  1846. // Reaching here means that one of more checks failed multiple times. Assuming its not a race condition, something is broken.
  1847. Failf("Timed out after %v seconds waiting for %s pods to reach valid state", PodStartTimeout.Seconds(), testname)
  1848. }
  1849. // KubectlCmd runs the kubectl executable through the wrapper script.
  1850. func KubectlCmd(args ...string) *exec.Cmd {
  1851. defaultArgs := []string{}
  1852. // Reference a --server option so tests can run anywhere.
  1853. if TestContext.Host != "" {
  1854. defaultArgs = append(defaultArgs, "--"+clientcmd.FlagAPIServer+"="+TestContext.Host)
  1855. }
  1856. if TestContext.KubeConfig != "" {
  1857. defaultArgs = append(defaultArgs, "--"+clientcmd.RecommendedConfigPathFlag+"="+TestContext.KubeConfig)
  1858. // Reference the KubeContext
  1859. if TestContext.KubeContext != "" {
  1860. defaultArgs = append(defaultArgs, "--"+clientcmd.FlagContext+"="+TestContext.KubeContext)
  1861. }
  1862. } else {
  1863. if TestContext.CertDir != "" {
  1864. defaultArgs = append(defaultArgs,
  1865. fmt.Sprintf("--certificate-authority=%s", filepath.Join(TestContext.CertDir, "ca.crt")),
  1866. fmt.Sprintf("--client-certificate=%s", filepath.Join(TestContext.CertDir, "kubecfg.crt")),
  1867. fmt.Sprintf("--client-key=%s", filepath.Join(TestContext.CertDir, "kubecfg.key")))
  1868. }
  1869. }
  1870. kubectlArgs := append(defaultArgs, args...)
  1871. //We allow users to specify path to kubectl, so you can test either "kubectl" or "cluster/kubectl.sh"
  1872. //and so on.
  1873. cmd := exec.Command(TestContext.KubectlPath, kubectlArgs...)
  1874. //caller will invoke this and wait on it.
  1875. return cmd
  1876. }
  1877. // kubectlBuilder is used to build, customize and execute a kubectl Command.
  1878. // Add more functions to customize the builder as needed.
  1879. type kubectlBuilder struct {
  1880. cmd *exec.Cmd
  1881. timeout <-chan time.Time
  1882. }
  1883. func NewKubectlCommand(args ...string) *kubectlBuilder {
  1884. b := new(kubectlBuilder)
  1885. b.cmd = KubectlCmd(args...)
  1886. return b
  1887. }
  1888. func (b *kubectlBuilder) WithEnv(env []string) *kubectlBuilder {
  1889. b.cmd.Env = env
  1890. return b
  1891. }
  1892. func (b *kubectlBuilder) WithTimeout(t <-chan time.Time) *kubectlBuilder {
  1893. b.timeout = t
  1894. return b
  1895. }
  1896. func (b kubectlBuilder) WithStdinData(data string) *kubectlBuilder {
  1897. b.cmd.Stdin = strings.NewReader(data)
  1898. return &b
  1899. }
  1900. func (b kubectlBuilder) WithStdinReader(reader io.Reader) *kubectlBuilder {
  1901. b.cmd.Stdin = reader
  1902. return &b
  1903. }
  1904. func (b kubectlBuilder) ExecOrDie() string {
  1905. str, err := b.Exec()
  1906. Logf("stdout: %q", str)
  1907. // In case of i/o timeout error, try talking to the apiserver again after 2s before dying.
  1908. // Note that we're still dying after retrying so that we can get visibility to triage it further.
  1909. if isTimeout(err) {
  1910. Logf("Hit i/o timeout error, talking to the server 2s later to see if it's temporary.")
  1911. time.Sleep(2 * time.Second)
  1912. retryStr, retryErr := RunKubectl("version")
  1913. Logf("stdout: %q", retryStr)
  1914. Logf("err: %v", retryErr)
  1915. }
  1916. Expect(err).NotTo(HaveOccurred())
  1917. return str
  1918. }
  1919. func isTimeout(err error) bool {
  1920. switch err := err.(type) {
  1921. case net.Error:
  1922. if err.Timeout() {
  1923. return true
  1924. }
  1925. case *url.Error:
  1926. if err, ok := err.Err.(net.Error); ok && err.Timeout() {
  1927. return true
  1928. }
  1929. }
  1930. return false
  1931. }
  1932. func (b kubectlBuilder) Exec() (string, error) {
  1933. var stdout, stderr bytes.Buffer
  1934. cmd := b.cmd
  1935. cmd.Stdout, cmd.Stderr = &stdout, &stderr
  1936. Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately
  1937. if err := cmd.Start(); err != nil {
  1938. return "", fmt.Errorf("error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err)
  1939. }
  1940. errCh := make(chan error, 1)
  1941. go func() {
  1942. errCh <- cmd.Wait()
  1943. }()
  1944. select {
  1945. case err := <-errCh:
  1946. if err != nil {
  1947. var rc int = 127
  1948. if ee, ok := err.(*exec.ExitError); ok {
  1949. Logf("rc: %d", rc)
  1950. rc = int(ee.Sys().(syscall.WaitStatus).ExitStatus())
  1951. }
  1952. return "", uexec.CodeExitError{
  1953. Err: fmt.Errorf("error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err),
  1954. Code: rc,
  1955. }
  1956. }
  1957. case <-b.timeout:
  1958. b.cmd.Process.Kill()
  1959. return "", fmt.Errorf("timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr)
  1960. }
  1961. Logf("stderr: %q", stderr.String())
  1962. return stdout.String(), nil
  1963. }
  1964. // RunKubectlOrDie is a convenience wrapper over kubectlBuilder
  1965. func RunKubectlOrDie(args ...string) string {
  1966. return NewKubectlCommand(args...).ExecOrDie()
  1967. }
  1968. // RunKubectl is a convenience wrapper over kubectlBuilder
  1969. func RunKubectl(args ...string) (string, error) {
  1970. return NewKubectlCommand(args...).Exec()
  1971. }
  1972. // RunKubectlOrDieInput is a convenience wrapper over kubectlBuilder that takes input to stdin
  1973. func RunKubectlOrDieInput(data string, args ...string) string {
  1974. return NewKubectlCommand(args...).WithStdinData(data).ExecOrDie()
  1975. }
  1976. func StartCmdAndStreamOutput(cmd *exec.Cmd) (stdout, stderr io.ReadCloser, err error) {
  1977. stdout, err = cmd.StdoutPipe()
  1978. if err != nil {
  1979. return
  1980. }
  1981. stderr, err = cmd.StderrPipe()
  1982. if err != nil {
  1983. return
  1984. }
  1985. Logf("Asynchronously running '%s %s'", cmd.Path, strings.Join(cmd.Args, " "))
  1986. err = cmd.Start()
  1987. return
  1988. }
  1989. // Rough equivalent of ctrl+c for cleaning up processes. Intended to be run in defer.
  1990. func TryKill(cmd *exec.Cmd) {
  1991. if err := cmd.Process.Kill(); err != nil {
  1992. Logf("ERROR failed to kill command %v! The process may leak", cmd)
  1993. }
  1994. }
  1995. // testContainerOutputMatcher runs the given pod in the given namespace and waits
  1996. // for all of the containers in the podSpec to move into the 'Success' status, and tests
  1997. // the specified container log against the given expected output using the given matcher.
  1998. func (f *Framework) testContainerOutputMatcher(scenarioName string,
  1999. pod *api.Pod,
  2000. containerIndex int,
  2001. expectedOutput []string,
  2002. matcher func(string, ...interface{}) gomegatypes.GomegaMatcher) {
  2003. By(fmt.Sprintf("Creating a pod to test %v", scenarioName))
  2004. if containerIndex < 0 || containerIndex >= len(pod.Spec.Containers) {
  2005. Failf("Invalid container index: %d", containerIndex)
  2006. }
  2007. ExpectNoError(f.MatchContainerOutput(pod, pod.Spec.Containers[containerIndex].Name, expectedOutput, matcher))
  2008. }
  2009. // MatchContainerOutput creates a pod and waits for all it's containers to exit with success.
  2010. // It then tests that the matcher with each expectedOutput matches the output of the specified container.
  2011. func (f *Framework) MatchContainerOutput(
  2012. pod *api.Pod,
  2013. containerName string,
  2014. expectedOutput []string,
  2015. matcher func(string, ...interface{}) gomegatypes.GomegaMatcher) error {
  2016. podClient := f.PodClient()
  2017. ns := f.Namespace.Name
  2018. defer podClient.Delete(pod.Name, api.NewDeleteOptions(0))
  2019. podClient.Create(pod)
  2020. // Wait for client pod to complete. All containers should succeed.
  2021. for _, container := range pod.Spec.Containers {
  2022. if err := WaitForPodSuccessInNamespace(f.Client, pod.Name, container.Name, ns); err != nil {
  2023. return fmt.Errorf("expected container %s success: %v", container.Name, err)
  2024. }
  2025. }
  2026. // Grab its logs. Get host first.
  2027. podStatus, err := podClient.Get(pod.Name)
  2028. if err != nil {
  2029. return fmt.Errorf("failed to get pod status: %v", err)
  2030. }
  2031. Logf("Trying to get logs from node %s pod %s container %s: %v",
  2032. podStatus.Spec.NodeName, podStatus.Name, containerName, err)
  2033. // Sometimes the actual containers take a second to get started, try to get logs for 60s
  2034. logs, err := GetPodLogs(f.Client, ns, pod.Name, containerName)
  2035. if err != nil {
  2036. Logf("Failed to get logs from node %q pod %q container %q. %v",
  2037. podStatus.Spec.NodeName, podStatus.Name, containerName, err)
  2038. return fmt.Errorf("failed to get logs from %s for %s: %v", podStatus.Name, containerName, err)
  2039. }
  2040. for _, expected := range expectedOutput {
  2041. m := matcher(expected)
  2042. matches, err := m.Match(logs)
  2043. if err != nil {
  2044. return fmt.Errorf("expected %q in container output: %v", expected, err)
  2045. } else if !matches {
  2046. return fmt.Errorf("expected %q in container output: %s", expected, m.FailureMessage(logs))
  2047. }
  2048. }
  2049. return nil
  2050. }
  2051. // podInfo contains pod information useful for debugging e2e tests.
  2052. type podInfo struct {
  2053. oldHostname string
  2054. oldPhase string
  2055. hostname string
  2056. phase string
  2057. }
  2058. // PodDiff is a map of pod name to podInfos
  2059. type PodDiff map[string]*podInfo
  2060. // Print formats and prints the give PodDiff.
  2061. func (p PodDiff) Print(ignorePhases sets.String) {
  2062. for name, info := range p {
  2063. if ignorePhases.Has(info.phase) {
  2064. continue
  2065. }
  2066. if info.phase == nonExist {
  2067. Logf("Pod %v was deleted, had phase %v and host %v", name, info.oldPhase, info.oldHostname)
  2068. continue
  2069. }
  2070. phaseChange, hostChange := false, false
  2071. msg := fmt.Sprintf("Pod %v ", name)
  2072. if info.oldPhase != info.phase {
  2073. phaseChange = true
  2074. if info.oldPhase == nonExist {
  2075. msg += fmt.Sprintf("in phase %v ", info.phase)
  2076. } else {
  2077. msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
  2078. }
  2079. }
  2080. if info.oldHostname != info.hostname {
  2081. hostChange = true
  2082. if info.oldHostname == nonExist || info.oldHostname == "" {
  2083. msg += fmt.Sprintf("assigned host %v ", info.hostname)
  2084. } else {
  2085. msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
  2086. }
  2087. }
  2088. if phaseChange || hostChange {
  2089. Logf(msg)
  2090. }
  2091. }
  2092. }
  2093. // Diff computes a PodDiff given 2 lists of pods.
  2094. func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
  2095. podInfoMap := PodDiff{}
  2096. // New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
  2097. for _, pod := range curPods {
  2098. podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
  2099. }
  2100. // Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
  2101. for _, pod := range oldPods {
  2102. if info, ok := podInfoMap[pod.Name]; ok {
  2103. info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
  2104. } else {
  2105. podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
  2106. }
  2107. }
  2108. return podInfoMap
  2109. }
  2110. // RunDeployment Launches (and verifies correctness) of a Deployment
  2111. // and will wait for all pods it spawns to become "Running".
  2112. // It's the caller's responsibility to clean up externally (i.e. use the
  2113. // namespace lifecycle for handling Cleanup).
  2114. func RunDeployment(config DeploymentConfig) error {
  2115. err := config.create()
  2116. if err != nil {
  2117. return err
  2118. }
  2119. return config.start()
  2120. }
  2121. func (config *DeploymentConfig) create() error {
  2122. By(fmt.Sprintf("creating deployment %s in namespace %s", config.Name, config.Namespace))
  2123. deployment := &extensions.Deployment{
  2124. ObjectMeta: api.ObjectMeta{
  2125. Name: config.Name,
  2126. },
  2127. Spec: extensions.DeploymentSpec{
  2128. Replicas: int32(config.Replicas),
  2129. Selector: &unversioned.LabelSelector{
  2130. MatchLabels: map[string]string{
  2131. "name": config.Name,
  2132. },
  2133. },
  2134. Template: api.PodTemplateSpec{
  2135. ObjectMeta: api.ObjectMeta{
  2136. Labels: map[string]string{"name": config.Name},
  2137. },
  2138. Spec: api.PodSpec{
  2139. Containers: []api.Container{
  2140. {
  2141. Name: config.Name,
  2142. Image: config.Image,
  2143. Command: config.Command,
  2144. Ports: []api.ContainerPort{{ContainerPort: 80}},
  2145. },
  2146. },
  2147. },
  2148. },
  2149. },
  2150. }
  2151. config.applyTo(&deployment.Spec.Template)
  2152. _, err := config.Client.Deployments(config.Namespace).Create(deployment)
  2153. if err != nil {
  2154. return fmt.Errorf("Error creating deployment: %v", err)
  2155. }
  2156. Logf("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, deployment.Spec.Replicas)
  2157. return nil
  2158. }
  2159. // RunReplicaSet launches (and verifies correctness) of a ReplicaSet
  2160. // and waits until all the pods it launches to reach the "Running" state.
  2161. // It's the caller's responsibility to clean up externally (i.e. use the
  2162. // namespace lifecycle for handling Cleanup).
  2163. func RunReplicaSet(config ReplicaSetConfig) error {
  2164. err := config.create()
  2165. if err != nil {
  2166. return err
  2167. }
  2168. return config.start()
  2169. }
  2170. func (config *ReplicaSetConfig) create() error {
  2171. By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace))
  2172. rs := &extensions.ReplicaSet{
  2173. ObjectMeta: api.ObjectMeta{
  2174. Name: config.Name,
  2175. },
  2176. Spec: extensions.ReplicaSetSpec{
  2177. Replicas: int32(config.Replicas),
  2178. Selector: &unversioned.LabelSelector{
  2179. MatchLabels: map[string]string{
  2180. "name": config.Name,
  2181. },
  2182. },
  2183. Template: api.PodTemplateSpec{
  2184. ObjectMeta: api.ObjectMeta{
  2185. Labels: map[string]string{"name": config.Name},
  2186. },
  2187. Spec: api.PodSpec{
  2188. Containers: []api.Container{
  2189. {
  2190. Name: config.Name,
  2191. Image: config.Image,
  2192. Command: config.Command,
  2193. Ports: []api.ContainerPort{{ContainerPort: 80}},
  2194. },
  2195. },
  2196. },
  2197. },
  2198. },
  2199. }
  2200. config.applyTo(&rs.Spec.Template)
  2201. _, err := config.Client.ReplicaSets(config.Namespace).Create(rs)
  2202. if err != nil {
  2203. return fmt.Errorf("Error creating replica set: %v", err)
  2204. }
  2205. Logf("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, rs.Spec.Replicas)
  2206. return nil
  2207. }
  2208. // RunRC Launches (and verifies correctness) of a Replication Controller
  2209. // and will wait for all pods it spawns to become "Running".
  2210. // It's the caller's responsibility to clean up externally (i.e. use the
  2211. // namespace lifecycle for handling Cleanup).
  2212. func RunRC(config RCConfig) error {
  2213. err := config.create()
  2214. if err != nil {
  2215. return err
  2216. }
  2217. return config.start()
  2218. }
  2219. func (config *RCConfig) create() error {
  2220. By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace))
  2221. dnsDefault := api.DNSDefault
  2222. if config.DNSPolicy == nil {
  2223. config.DNSPolicy = &dnsDefault
  2224. }
  2225. rc := &api.ReplicationController{
  2226. ObjectMeta: api.ObjectMeta{
  2227. Name: config.Name,
  2228. },
  2229. Spec: api.ReplicationControllerSpec{
  2230. Replicas: int32(config.Replicas),
  2231. Selector: map[string]string{
  2232. "name": config.Name,
  2233. },
  2234. Template: &api.PodTemplateSpec{
  2235. ObjectMeta: api.ObjectMeta{
  2236. Labels: map[string]string{"name": config.Name},
  2237. },
  2238. Spec: api.PodSpec{
  2239. Containers: []api.Container{
  2240. {
  2241. Name: config.Name,
  2242. Image: config.Image,
  2243. Command: config.Command,
  2244. Ports: []api.ContainerPort{{ContainerPort: 80}},
  2245. ReadinessProbe: config.ReadinessProbe,
  2246. },
  2247. },
  2248. DNSPolicy: *config.DNSPolicy,
  2249. NodeSelector: config.NodeSelector,
  2250. },
  2251. },
  2252. },
  2253. }
  2254. config.applyTo(rc.Spec.Template)
  2255. _, err := config.Client.ReplicationControllers(config.Namespace).Create(rc)
  2256. if err != nil {
  2257. return fmt.Errorf("Error creating replication controller: %v", err)
  2258. }
  2259. Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, rc.Spec.Replicas)
  2260. return nil
  2261. }
  2262. func (config *RCConfig) applyTo(template *api.PodTemplateSpec) {
  2263. if config.Env != nil {
  2264. for k, v := range config.Env {
  2265. c := &template.Spec.Containers[0]
  2266. c.Env = append(c.Env, api.EnvVar{Name: k, Value: v})
  2267. }
  2268. }
  2269. if config.Labels != nil {
  2270. for k, v := range config.Labels {
  2271. template.ObjectMeta.Labels[k] = v
  2272. }
  2273. }
  2274. if config.NodeSelector != nil {
  2275. template.Spec.NodeSelector = make(map[string]string)
  2276. for k, v := range config.NodeSelector {
  2277. template.Spec.NodeSelector[k] = v
  2278. }
  2279. }
  2280. if config.Ports != nil {
  2281. for k, v := range config.Ports {
  2282. c := &template.Spec.Containers[0]
  2283. c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v)})
  2284. }
  2285. }
  2286. if config.HostPorts != nil {
  2287. for k, v := range config.HostPorts {
  2288. c := &template.Spec.Containers[0]
  2289. c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
  2290. }
  2291. }
  2292. if config.CpuLimit > 0 || config.MemLimit > 0 {
  2293. template.Spec.Containers[0].Resources.Limits = api.ResourceList{}
  2294. }
  2295. if config.CpuLimit > 0 {
  2296. template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
  2297. }
  2298. if config.MemLimit > 0 {
  2299. template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
  2300. }
  2301. if config.CpuRequest > 0 || config.MemRequest > 0 {
  2302. template.Spec.Containers[0].Resources.Requests = api.ResourceList{}
  2303. }
  2304. if config.CpuRequest > 0 {
  2305. template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
  2306. }
  2307. if config.MemRequest > 0 {
  2308. template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
  2309. }
  2310. if len(config.Volumes) > 0 {
  2311. template.Spec.Volumes = config.Volumes
  2312. }
  2313. if len(config.VolumeMounts) > 0 {
  2314. template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
  2315. }
  2316. }
  2317. type RCStartupStatus struct {
  2318. Expected int
  2319. Terminating int
  2320. Running int
  2321. RunningButNotReady int
  2322. Waiting int
  2323. Pending int
  2324. Unknown int
  2325. Inactive int
  2326. FailedContainers int
  2327. Created []*api.Pod
  2328. ContainerRestartNodes sets.String
  2329. }
  2330. func (s *RCStartupStatus) Print(name string) {
  2331. Logf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
  2332. name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
  2333. }
  2334. func ComputeRCStartupStatus(pods []*api.Pod, expected int) RCStartupStatus {
  2335. startupStatus := RCStartupStatus{
  2336. Expected: expected,
  2337. Created: make([]*api.Pod, 0, expected),
  2338. ContainerRestartNodes: sets.NewString(),
  2339. }
  2340. for _, p := range pods {
  2341. if p.DeletionTimestamp != nil {
  2342. startupStatus.Terminating++
  2343. continue
  2344. }
  2345. startupStatus.Created = append(startupStatus.Created, p)
  2346. if p.Status.Phase == api.PodRunning {
  2347. ready := false
  2348. for _, c := range p.Status.Conditions {
  2349. if c.Type == api.PodReady && c.Status == api.ConditionTrue {
  2350. ready = true
  2351. break
  2352. }
  2353. }
  2354. if ready {
  2355. // Only count a pod is running when it is also ready.
  2356. startupStatus.Running++
  2357. } else {
  2358. startupStatus.RunningButNotReady++
  2359. }
  2360. for _, v := range FailedContainers(p) {
  2361. startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
  2362. startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
  2363. }
  2364. } else if p.Status.Phase == api.PodPending {
  2365. if p.Spec.NodeName == "" {
  2366. startupStatus.Waiting++
  2367. } else {
  2368. startupStatus.Pending++
  2369. }
  2370. } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
  2371. startupStatus.Inactive++
  2372. } else if p.Status.Phase == api.PodUnknown {
  2373. startupStatus.Unknown++
  2374. }
  2375. }
  2376. return startupStatus
  2377. }
  2378. func (config *RCConfig) start() error {
  2379. // Don't force tests to fail if they don't care about containers restarting.
  2380. var maxContainerFailures int
  2381. if config.MaxContainerFailures == nil {
  2382. maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
  2383. } else {
  2384. maxContainerFailures = *config.MaxContainerFailures
  2385. }
  2386. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
  2387. PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
  2388. defer PodStore.Stop()
  2389. interval := config.PollInterval
  2390. if interval <= 0 {
  2391. interval = 10 * time.Second
  2392. }
  2393. timeout := config.Timeout
  2394. if timeout <= 0 {
  2395. timeout = 5 * time.Minute
  2396. }
  2397. oldPods := make([]*api.Pod, 0)
  2398. oldRunning := 0
  2399. lastChange := time.Now()
  2400. for oldRunning != config.Replicas {
  2401. time.Sleep(interval)
  2402. pods := PodStore.List()
  2403. startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
  2404. pods = startupStatus.Created
  2405. if config.CreatedPods != nil {
  2406. *config.CreatedPods = pods
  2407. }
  2408. if !config.Silent {
  2409. startupStatus.Print(config.Name)
  2410. }
  2411. promPushRunningPending(startupStatus.Running, startupStatus.Pending)
  2412. if config.PodStatusFile != nil {
  2413. fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
  2414. }
  2415. if startupStatus.FailedContainers > maxContainerFailures {
  2416. DumpNodeDebugInfo(config.Client, startupStatus.ContainerRestartNodes.List())
  2417. // Get the logs from the failed containers to help diagnose what caused them to fail
  2418. LogFailedContainers(config.Client, config.Namespace)
  2419. return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
  2420. }
  2421. if len(pods) < len(oldPods) || len(pods) > config.Replicas {
  2422. // This failure mode includes:
  2423. // kubelet is dead, so node controller deleted pods and rc creates more
  2424. // - diagnose by noting the pod diff below.
  2425. // pod is unhealthy, so replication controller creates another to take its place
  2426. // - diagnose by comparing the previous "2 Pod states" lines for inactive pods
  2427. errorStr := fmt.Sprintf("Number of reported pods for %s changed: %d vs %d", config.Name, len(pods), len(oldPods))
  2428. Logf("%v, pods that changed since the last iteration:", errorStr)
  2429. Diff(oldPods, pods).Print(sets.NewString())
  2430. return fmt.Errorf(errorStr)
  2431. }
  2432. if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
  2433. lastChange = time.Now()
  2434. }
  2435. oldPods = pods
  2436. oldRunning = startupStatus.Running
  2437. if time.Since(lastChange) > timeout {
  2438. dumpPodDebugInfo(config.Client, pods)
  2439. break
  2440. }
  2441. }
  2442. if oldRunning != config.Replicas {
  2443. // List only pods from a given replication controller.
  2444. options := api.ListOptions{LabelSelector: label}
  2445. if pods, err := config.Client.Pods(api.NamespaceAll).List(options); err == nil {
  2446. for _, pod := range pods.Items {
  2447. Logf("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
  2448. }
  2449. } else {
  2450. Logf("Can't list pod debug info: %v", err)
  2451. }
  2452. return fmt.Errorf("Only %d pods started out of %d", oldRunning, config.Replicas)
  2453. }
  2454. return nil
  2455. }
  2456. // Simplified version of RunRC, that does not create RC, but creates plain Pods.
  2457. // Optionally waits for pods to start running (if waitForRunning == true).
  2458. // The number of replicas must be non-zero.
  2459. func StartPods(c *client.Client, replicas int, namespace string, podNamePrefix string, pod api.Pod, waitForRunning bool) {
  2460. // no pod to start
  2461. if replicas < 1 {
  2462. panic("StartPods: number of replicas must be non-zero")
  2463. }
  2464. startPodsID := string(uuid.NewUUID()) // So that we can label and find them
  2465. for i := 0; i < replicas; i++ {
  2466. podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
  2467. pod.ObjectMeta.Name = podName
  2468. pod.ObjectMeta.Labels["name"] = podName
  2469. pod.ObjectMeta.Labels["startPodsID"] = startPodsID
  2470. pod.Spec.Containers[0].Name = podName
  2471. _, err := c.Pods(namespace).Create(&pod)
  2472. ExpectNoError(err)
  2473. }
  2474. Logf("Waiting for running...")
  2475. if waitForRunning {
  2476. label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
  2477. err := WaitForPodsWithLabelRunning(c, namespace, label)
  2478. ExpectNoError(err, "Error waiting for %d pods to be running - probably a timeout", replicas)
  2479. }
  2480. }
  2481. func dumpPodDebugInfo(c *client.Client, pods []*api.Pod) {
  2482. badNodes := sets.NewString()
  2483. for _, p := range pods {
  2484. if p.Status.Phase != api.PodRunning {
  2485. if p.Spec.NodeName != "" {
  2486. Logf("Pod %v assigned to host %v (IP: %v) in %v", p.Name, p.Spec.NodeName, p.Status.HostIP, p.Status.Phase)
  2487. badNodes.Insert(p.Spec.NodeName)
  2488. } else {
  2489. Logf("Pod %v still unassigned", p.Name)
  2490. }
  2491. }
  2492. }
  2493. DumpNodeDebugInfo(c, badNodes.List())
  2494. }
  2495. func DumpAllNamespaceInfo(c *client.Client, namespace string) {
  2496. By(fmt.Sprintf("Collecting events from namespace %q.", namespace))
  2497. events, err := c.Events(namespace).List(api.ListOptions{})
  2498. Expect(err).NotTo(HaveOccurred())
  2499. // Sort events by their first timestamp
  2500. sortedEvents := events.Items
  2501. if len(sortedEvents) > 1 {
  2502. sort.Sort(byFirstTimestamp(sortedEvents))
  2503. }
  2504. for _, e := range sortedEvents {
  2505. Logf("At %v - event for %v: %v %v: %v", e.FirstTimestamp, e.InvolvedObject.Name, e.Source, e.Reason, e.Message)
  2506. }
  2507. // Note that we don't wait for any Cleanup to propagate, which means
  2508. // that if you delete a bunch of pods right before ending your test,
  2509. // you may or may not see the killing/deletion/Cleanup events.
  2510. // If cluster is large, then the following logs are basically useless, because:
  2511. // 1. it takes tens of minutes or hours to grab all of them
  2512. // 2. there are so many of them that working with them are mostly impossible
  2513. // So we dump them only if the cluster is relatively small.
  2514. maxNodesForDump := 20
  2515. if nodes, err := c.Nodes().List(api.ListOptions{}); err == nil {
  2516. if len(nodes.Items) <= maxNodesForDump {
  2517. dumpAllPodInfo(c)
  2518. dumpAllNodeInfo(c)
  2519. } else {
  2520. Logf("skipping dumping cluster info - cluster too large")
  2521. }
  2522. } else {
  2523. Logf("unable to fetch node list: %v", err)
  2524. }
  2525. }
  2526. // byFirstTimestamp sorts a slice of events by first timestamp, using their involvedObject's name as a tie breaker.
  2527. type byFirstTimestamp []api.Event
  2528. func (o byFirstTimestamp) Len() int { return len(o) }
  2529. func (o byFirstTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  2530. func (o byFirstTimestamp) Less(i, j int) bool {
  2531. if o[i].FirstTimestamp.Equal(o[j].FirstTimestamp) {
  2532. return o[i].InvolvedObject.Name < o[j].InvolvedObject.Name
  2533. }
  2534. return o[i].FirstTimestamp.Before(o[j].FirstTimestamp)
  2535. }
  2536. func dumpAllPodInfo(c *client.Client) {
  2537. pods, err := c.Pods("").List(api.ListOptions{})
  2538. if err != nil {
  2539. Logf("unable to fetch pod debug info: %v", err)
  2540. }
  2541. logPodStates(pods.Items)
  2542. }
  2543. func dumpAllNodeInfo(c *client.Client) {
  2544. // It should be OK to list unschedulable Nodes here.
  2545. nodes, err := c.Nodes().List(api.ListOptions{})
  2546. if err != nil {
  2547. Logf("unable to fetch node list: %v", err)
  2548. return
  2549. }
  2550. names := make([]string, len(nodes.Items))
  2551. for ix := range nodes.Items {
  2552. names[ix] = nodes.Items[ix].Name
  2553. }
  2554. DumpNodeDebugInfo(c, names)
  2555. }
  2556. func DumpNodeDebugInfo(c *client.Client, nodeNames []string) {
  2557. for _, n := range nodeNames {
  2558. Logf("\nLogging node info for node %v", n)
  2559. node, err := c.Nodes().Get(n)
  2560. if err != nil {
  2561. Logf("Error getting node info %v", err)
  2562. }
  2563. Logf("Node Info: %v", node)
  2564. Logf("\nLogging kubelet events for node %v", n)
  2565. for _, e := range getNodeEvents(c, n) {
  2566. Logf("source %v type %v message %v reason %v first ts %v last ts %v, involved obj %+v",
  2567. e.Source, e.Type, e.Message, e.Reason, e.FirstTimestamp, e.LastTimestamp, e.InvolvedObject)
  2568. }
  2569. Logf("\nLogging pods the kubelet thinks is on node %v", n)
  2570. podList, err := GetKubeletPods(c, n)
  2571. if err != nil {
  2572. Logf("Unable to retrieve kubelet pods for node %v", n)
  2573. continue
  2574. }
  2575. for _, p := range podList.Items {
  2576. Logf("%v started at %v (%d+%d container statuses recorded)", p.Name, p.Status.StartTime, len(p.Status.InitContainerStatuses), len(p.Status.ContainerStatuses))
  2577. for _, c := range p.Status.InitContainerStatuses {
  2578. Logf("\tInit container %v ready: %v, restart count %v",
  2579. c.Name, c.Ready, c.RestartCount)
  2580. }
  2581. for _, c := range p.Status.ContainerStatuses {
  2582. Logf("\tContainer %v ready: %v, restart count %v",
  2583. c.Name, c.Ready, c.RestartCount)
  2584. }
  2585. }
  2586. HighLatencyKubeletOperations(c, 10*time.Second, n)
  2587. // TODO: Log node resource info
  2588. }
  2589. }
  2590. // logNodeEvents logs kubelet events from the given node. This includes kubelet
  2591. // restart and node unhealthy events. Note that listing events like this will mess
  2592. // with latency metrics, beware of calling it during a test.
  2593. func getNodeEvents(c *client.Client, nodeName string) []api.Event {
  2594. selector := fields.Set{
  2595. "involvedObject.kind": "Node",
  2596. "involvedObject.name": nodeName,
  2597. "involvedObject.namespace": api.NamespaceAll,
  2598. "source": "kubelet",
  2599. }.AsSelector()
  2600. options := api.ListOptions{FieldSelector: selector}
  2601. events, err := c.Events(api.NamespaceSystem).List(options)
  2602. if err != nil {
  2603. Logf("Unexpected error retrieving node events %v", err)
  2604. return []api.Event{}
  2605. }
  2606. return events.Items
  2607. }
  2608. // waitListSchedulableNodesOrDie is a wrapper around listing nodes supporting retries.
  2609. func waitListSchedulableNodesOrDie(c *client.Client) *api.NodeList {
  2610. var nodes *api.NodeList
  2611. var err error
  2612. if wait.PollImmediate(Poll, SingleCallTimeout, func() (bool, error) {
  2613. nodes, err = c.Nodes().List(api.ListOptions{FieldSelector: fields.Set{
  2614. "spec.unschedulable": "false",
  2615. }.AsSelector()})
  2616. return err == nil, nil
  2617. }) != nil {
  2618. ExpectNoError(err, "Timed out while listing nodes for e2e cluster.")
  2619. }
  2620. return nodes
  2621. }
  2622. // Node is schedulable if:
  2623. // 1) doesn't have "unschedulable" field set
  2624. // 2) it's Ready condition is set to true
  2625. // 3) doesn't have NetworkUnavailable condition set to true
  2626. func isNodeSchedulable(node *api.Node) bool {
  2627. nodeReady := IsNodeConditionSetAsExpected(node, api.NodeReady, true)
  2628. networkReady := IsNodeConditionUnset(node, api.NodeNetworkUnavailable) ||
  2629. IsNodeConditionSetAsExpectedSilent(node, api.NodeNetworkUnavailable, false)
  2630. return !node.Spec.Unschedulable && nodeReady && networkReady
  2631. }
  2632. // GetReadySchedulableNodesOrDie addresses the common use case of getting nodes you can do work on.
  2633. // 1) Needs to be schedulable.
  2634. // 2) Needs to be ready.
  2635. // If EITHER 1 or 2 is not true, most tests will want to ignore the node entirely.
  2636. func GetReadySchedulableNodesOrDie(c *client.Client) (nodes *api.NodeList) {
  2637. nodes = waitListSchedulableNodesOrDie(c)
  2638. // previous tests may have cause failures of some nodes. Let's skip
  2639. // 'Not Ready' nodes, just in case (there is no need to fail the test).
  2640. FilterNodes(nodes, func(node api.Node) bool {
  2641. return isNodeSchedulable(&node)
  2642. })
  2643. return nodes
  2644. }
  2645. func WaitForAllNodesSchedulable(c *client.Client) error {
  2646. return wait.PollImmediate(30*time.Second, 4*time.Hour, func() (bool, error) {
  2647. opts := api.ListOptions{
  2648. ResourceVersion: "0",
  2649. FieldSelector: fields.Set{"spec.unschedulable": "false"}.AsSelector(),
  2650. }
  2651. nodes, err := c.Nodes().List(opts)
  2652. if err != nil {
  2653. Logf("Unexpected error listing nodes: %v", err)
  2654. // Ignore the error here - it will be retried.
  2655. return false, nil
  2656. }
  2657. schedulable := 0
  2658. for _, node := range nodes.Items {
  2659. if isNodeSchedulable(&node) {
  2660. schedulable++
  2661. }
  2662. }
  2663. if schedulable != len(nodes.Items) {
  2664. Logf("%d/%d nodes schedulable (polling after 30s)", schedulable, len(nodes.Items))
  2665. return false, nil
  2666. }
  2667. return true, nil
  2668. })
  2669. }
  2670. func AddOrUpdateLabelOnNode(c *client.Client, nodeName string, labelKey string, labelValue string) {
  2671. patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, labelKey, labelValue)
  2672. var err error
  2673. for attempt := 0; attempt < UpdateRetries; attempt++ {
  2674. err = c.Patch(api.MergePatchType).Resource("nodes").Name(nodeName).Body([]byte(patch)).Do().Error()
  2675. if err != nil {
  2676. if !apierrs.IsConflict(err) {
  2677. ExpectNoError(err)
  2678. } else {
  2679. Logf("Conflict when trying to add a label %v:%v to %v", labelKey, labelValue, nodeName)
  2680. }
  2681. } else {
  2682. break
  2683. }
  2684. time.Sleep(100 * time.Millisecond)
  2685. }
  2686. ExpectNoError(err)
  2687. }
  2688. func ExpectNodeHasLabel(c *client.Client, nodeName string, labelKey string, labelValue string) {
  2689. By("verifying the node has the label " + labelKey + " " + labelValue)
  2690. node, err := c.Nodes().Get(nodeName)
  2691. ExpectNoError(err)
  2692. Expect(node.Labels[labelKey]).To(Equal(labelValue))
  2693. }
  2694. // RemoveLabelOffNode is for cleaning up labels temporarily added to node,
  2695. // won't fail if target label doesn't exist or has been removed.
  2696. func RemoveLabelOffNode(c *client.Client, nodeName string, labelKey string) {
  2697. By("removing the label " + labelKey + " off the node " + nodeName)
  2698. var nodeUpdated *api.Node
  2699. var node *api.Node
  2700. var err error
  2701. for attempt := 0; attempt < UpdateRetries; attempt++ {
  2702. node, err = c.Nodes().Get(nodeName)
  2703. ExpectNoError(err)
  2704. if node.Labels == nil || len(node.Labels[labelKey]) == 0 {
  2705. return
  2706. }
  2707. delete(node.Labels, labelKey)
  2708. nodeUpdated, err = c.Nodes().Update(node)
  2709. if err != nil {
  2710. if !apierrs.IsConflict(err) {
  2711. ExpectNoError(err)
  2712. } else {
  2713. Logf("Conflict when trying to remove a label %v from %v", labelKey, nodeName)
  2714. }
  2715. } else {
  2716. break
  2717. }
  2718. time.Sleep(100 * time.Millisecond)
  2719. }
  2720. ExpectNoError(err)
  2721. By("verifying the node doesn't have the label " + labelKey)
  2722. if nodeUpdated.Labels != nil && len(nodeUpdated.Labels[labelKey]) != 0 {
  2723. Failf("Failed removing label " + labelKey + " of the node " + nodeName)
  2724. }
  2725. }
  2726. func AddOrUpdateTaintOnNode(c *client.Client, nodeName string, taint api.Taint) {
  2727. for attempt := 0; attempt < UpdateRetries; attempt++ {
  2728. node, err := c.Nodes().Get(nodeName)
  2729. ExpectNoError(err)
  2730. nodeTaints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
  2731. ExpectNoError(err)
  2732. var newTaints []api.Taint
  2733. updated := false
  2734. for _, existingTaint := range nodeTaints {
  2735. if existingTaint.Key == taint.Key {
  2736. newTaints = append(newTaints, taint)
  2737. updated = true
  2738. continue
  2739. }
  2740. newTaints = append(newTaints, existingTaint)
  2741. }
  2742. if !updated {
  2743. newTaints = append(newTaints, taint)
  2744. }
  2745. taintsData, err := json.Marshal(newTaints)
  2746. ExpectNoError(err)
  2747. if node.Annotations == nil {
  2748. node.Annotations = make(map[string]string)
  2749. }
  2750. node.Annotations[api.TaintsAnnotationKey] = string(taintsData)
  2751. _, err = c.Nodes().Update(node)
  2752. if err != nil {
  2753. if !apierrs.IsConflict(err) {
  2754. ExpectNoError(err)
  2755. } else {
  2756. Logf("Conflict when trying to add/update taint %v to %v", taint, nodeName)
  2757. }
  2758. } else {
  2759. break
  2760. }
  2761. time.Sleep(100 * time.Millisecond)
  2762. }
  2763. }
  2764. func taintExists(taints []api.Taint, taintKey string) bool {
  2765. for _, taint := range taints {
  2766. if taint.Key == taintKey {
  2767. return true
  2768. }
  2769. }
  2770. return false
  2771. }
  2772. func ExpectNodeHasTaint(c *client.Client, nodeName string, taintKey string) {
  2773. By("verifying the node has the taint " + taintKey)
  2774. node, err := c.Nodes().Get(nodeName)
  2775. ExpectNoError(err)
  2776. nodeTaints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
  2777. ExpectNoError(err)
  2778. if len(nodeTaints) == 0 || !taintExists(nodeTaints, taintKey) {
  2779. Failf("Failed to find taint %s on node %s", taintKey, nodeName)
  2780. }
  2781. }
  2782. func deleteTaintByKey(taints []api.Taint, taintKey string) ([]api.Taint, error) {
  2783. newTaints := []api.Taint{}
  2784. found := false
  2785. for _, taint := range taints {
  2786. if taint.Key == taintKey {
  2787. found = true
  2788. continue
  2789. }
  2790. newTaints = append(newTaints, taint)
  2791. }
  2792. if !found {
  2793. return nil, fmt.Errorf("taint key=\"%s\" not found.", taintKey)
  2794. }
  2795. return newTaints, nil
  2796. }
  2797. // RemoveTaintOffNode is for cleaning up taints temporarily added to node,
  2798. // won't fail if target taint doesn't exist or has been removed.
  2799. func RemoveTaintOffNode(c *client.Client, nodeName string, taintKey string) {
  2800. By("removing the taint " + taintKey + " off the node " + nodeName)
  2801. for attempt := 0; attempt < UpdateRetries; attempt++ {
  2802. node, err := c.Nodes().Get(nodeName)
  2803. ExpectNoError(err)
  2804. nodeTaints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
  2805. ExpectNoError(err)
  2806. if len(nodeTaints) == 0 {
  2807. return
  2808. }
  2809. if !taintExists(nodeTaints, taintKey) {
  2810. return
  2811. }
  2812. newTaints, err := deleteTaintByKey(nodeTaints, taintKey)
  2813. ExpectNoError(err)
  2814. taintsData, err := json.Marshal(newTaints)
  2815. ExpectNoError(err)
  2816. node.Annotations[api.TaintsAnnotationKey] = string(taintsData)
  2817. _, err = c.Nodes().Update(node)
  2818. if err != nil {
  2819. if !apierrs.IsConflict(err) {
  2820. ExpectNoError(err)
  2821. } else {
  2822. Logf("Conflict when trying to add/update taint %v to %v", taintKey, nodeName)
  2823. }
  2824. } else {
  2825. break
  2826. }
  2827. time.Sleep(100 * time.Millisecond)
  2828. }
  2829. nodeUpdated, err := c.Nodes().Get(nodeName)
  2830. ExpectNoError(err)
  2831. By("verifying the node doesn't have the taint " + taintKey)
  2832. taintsGot, err := api.GetTaintsFromNodeAnnotations(nodeUpdated.Annotations)
  2833. ExpectNoError(err)
  2834. if taintExists(taintsGot, taintKey) {
  2835. Failf("Failed removing taint " + taintKey + " of the node " + nodeName)
  2836. }
  2837. }
  2838. func ScaleRC(c *client.Client, ns, name string, size uint, wait bool) error {
  2839. By(fmt.Sprintf("Scaling replication controller %s in namespace %s to %d", name, ns, size))
  2840. scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), c)
  2841. if err != nil {
  2842. return err
  2843. }
  2844. waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
  2845. waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
  2846. if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
  2847. return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
  2848. }
  2849. if !wait {
  2850. return nil
  2851. }
  2852. return WaitForRCPodsRunning(c, ns, name)
  2853. }
  2854. // Wait up to 10 minutes for pods to become Running.
  2855. func WaitForRCPodsRunning(c *client.Client, ns, rcName string) error {
  2856. rc, err := c.ReplicationControllers(ns).Get(rcName)
  2857. if err != nil {
  2858. return err
  2859. }
  2860. selector := labels.SelectorFromSet(labels.Set(rc.Spec.Selector))
  2861. err = WaitForPodsWithLabelRunning(c, ns, selector)
  2862. if err != nil {
  2863. return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", rcName, err)
  2864. }
  2865. return nil
  2866. }
  2867. // Wait up to 10 minutes for all matching pods to become Running and at least one
  2868. // matching pod exists.
  2869. func WaitForPodsWithLabelRunning(c *client.Client, ns string, label labels.Selector) error {
  2870. running := false
  2871. PodStore := NewPodStore(c, ns, label, fields.Everything())
  2872. defer PodStore.Stop()
  2873. waitLoop:
  2874. for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
  2875. pods := PodStore.List()
  2876. if len(pods) == 0 {
  2877. continue waitLoop
  2878. }
  2879. for _, p := range pods {
  2880. if p.Status.Phase != api.PodRunning {
  2881. continue waitLoop
  2882. }
  2883. }
  2884. running = true
  2885. break
  2886. }
  2887. if !running {
  2888. return fmt.Errorf("Timeout while waiting for pods with labels %q to be running", label.String())
  2889. }
  2890. return nil
  2891. }
  2892. // Returns true if all the specified pods are scheduled, else returns false.
  2893. func podsWithLabelScheduled(c *client.Client, ns string, label labels.Selector) (bool, error) {
  2894. PodStore := NewPodStore(c, ns, label, fields.Everything())
  2895. defer PodStore.Stop()
  2896. pods := PodStore.List()
  2897. if len(pods) == 0 {
  2898. return false, nil
  2899. }
  2900. for _, pod := range pods {
  2901. if pod.Spec.NodeName == "" {
  2902. return false, nil
  2903. }
  2904. }
  2905. return true, nil
  2906. }
  2907. // Wait for all matching pods to become scheduled and at least one
  2908. // matching pod exists. Return the list of matching pods.
  2909. func WaitForPodsWithLabelScheduled(c *client.Client, ns string, label labels.Selector) (pods *api.PodList, err error) {
  2910. err = wait.PollImmediate(Poll, podScheduledBeforeTimeout,
  2911. func() (bool, error) {
  2912. pods, err = WaitForPodsWithLabel(c, ns, label)
  2913. if err != nil {
  2914. return false, err
  2915. }
  2916. for _, pod := range pods.Items {
  2917. if pod.Spec.NodeName == "" {
  2918. return false, nil
  2919. }
  2920. }
  2921. return true, nil
  2922. })
  2923. return pods, err
  2924. }
  2925. // Wait up to PodListTimeout for getting pods with certain label
  2926. func WaitForPodsWithLabel(c *client.Client, ns string, label labels.Selector) (pods *api.PodList, err error) {
  2927. for t := time.Now(); time.Since(t) < PodListTimeout; time.Sleep(Poll) {
  2928. options := api.ListOptions{LabelSelector: label}
  2929. pods, err = c.Pods(ns).List(options)
  2930. Expect(err).NotTo(HaveOccurred())
  2931. if len(pods.Items) > 0 {
  2932. break
  2933. }
  2934. }
  2935. if pods == nil || len(pods.Items) == 0 {
  2936. err = fmt.Errorf("Timeout while waiting for pods with label %v", label)
  2937. }
  2938. return
  2939. }
  2940. // DeleteRCAndPods a Replication Controller and all pods it spawned
  2941. func DeleteRCAndPods(c *client.Client, ns, name string) error {
  2942. By(fmt.Sprintf("deleting replication controller %s in namespace %s", name, ns))
  2943. rc, err := c.ReplicationControllers(ns).Get(name)
  2944. if err != nil {
  2945. if apierrs.IsNotFound(err) {
  2946. Logf("RC %s was already deleted: %v", name, err)
  2947. return nil
  2948. }
  2949. return err
  2950. }
  2951. reaper, err := kubectl.ReaperForReplicationController(c, 10*time.Minute)
  2952. if err != nil {
  2953. if apierrs.IsNotFound(err) {
  2954. Logf("RC %s was already deleted: %v", name, err)
  2955. return nil
  2956. }
  2957. return err
  2958. }
  2959. ps, err := podStoreForRC(c, rc)
  2960. if err != nil {
  2961. return err
  2962. }
  2963. defer ps.Stop()
  2964. startTime := time.Now()
  2965. err = reaper.Stop(ns, name, 0, nil)
  2966. if apierrs.IsNotFound(err) {
  2967. Logf("RC %s was already deleted: %v", name, err)
  2968. return nil
  2969. }
  2970. if err != nil {
  2971. return fmt.Errorf("error while stopping RC: %s: %v", name, err)
  2972. }
  2973. deleteRCTime := time.Now().Sub(startTime)
  2974. Logf("Deleting RC %s took: %v", name, deleteRCTime)
  2975. err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
  2976. if err != nil {
  2977. return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
  2978. }
  2979. terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
  2980. Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
  2981. // this is to relieve namespace controller's pressure when deleting the
  2982. // namespace after a test.
  2983. err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
  2984. if err != nil {
  2985. return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
  2986. }
  2987. return nil
  2988. }
  2989. // DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods.
  2990. func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
  2991. By(fmt.Sprintf("deleting replication controller %s in namespace %s, will wait for the garbage collector to delete the pods", name, ns))
  2992. rc, err := c.ReplicationControllers(ns).Get(name)
  2993. if err != nil {
  2994. if apierrs.IsNotFound(err) {
  2995. Logf("RC %s was already deleted: %v", name, err)
  2996. return nil
  2997. }
  2998. return err
  2999. }
  3000. ps, err := podStoreForRC(c, rc)
  3001. if err != nil {
  3002. return err
  3003. }
  3004. defer ps.Stop()
  3005. startTime := time.Now()
  3006. falseVar := false
  3007. deleteOption := &api.DeleteOptions{OrphanDependents: &falseVar}
  3008. err = c.ReplicationControllers(ns).Delete(name, deleteOption)
  3009. if err != nil && apierrs.IsNotFound(err) {
  3010. Logf("RC %s was already deleted: %v", name, err)
  3011. return nil
  3012. }
  3013. if err != nil {
  3014. return err
  3015. }
  3016. deleteRCTime := time.Now().Sub(startTime)
  3017. Logf("Deleting RC %s took: %v", name, deleteRCTime)
  3018. var interval, timeout time.Duration
  3019. switch {
  3020. case rc.Spec.Replicas < 100:
  3021. interval = 10 * time.Millisecond
  3022. timeout = 10 * time.Minute
  3023. case rc.Spec.Replicas < 1000:
  3024. interval = 1 * time.Second
  3025. timeout = 10 * time.Minute
  3026. case rc.Spec.Replicas < 10000:
  3027. interval = 10 * time.Second
  3028. timeout = 10 * time.Minute
  3029. default:
  3030. interval = 10 * time.Second
  3031. timeout = 40 * time.Minute
  3032. }
  3033. err = waitForPodsInactive(ps, interval, timeout)
  3034. if err != nil {
  3035. return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
  3036. }
  3037. terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
  3038. Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
  3039. err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
  3040. if err != nil {
  3041. return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
  3042. }
  3043. return nil
  3044. }
  3045. // podStoreForRC creates a PodStore that monitors pods belong to the rc. It
  3046. // waits until the reflector does a List() before returning.
  3047. func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, error) {
  3048. labels := labels.SelectorFromSet(rc.Spec.Selector)
  3049. ps := NewPodStore(c, rc.Namespace, labels, fields.Everything())
  3050. err := wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
  3051. if len(ps.reflector.LastSyncResourceVersion()) != 0 {
  3052. return true, nil
  3053. }
  3054. return false, nil
  3055. })
  3056. return ps, err
  3057. }
  3058. // waitForPodsInactive waits until there are no active pods left in the PodStore.
  3059. // This is to make a fair comparison of deletion time between DeleteRCAndPods
  3060. // and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
  3061. // when the pod is inactvie.
  3062. func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
  3063. return wait.PollImmediate(interval, timeout, func() (bool, error) {
  3064. pods := ps.List()
  3065. for _, pod := range pods {
  3066. if controller.IsPodActive(pod) {
  3067. return false, nil
  3068. }
  3069. }
  3070. return true, nil
  3071. })
  3072. }
  3073. // waitForPodsGone waits until there are no pods left in the PodStore.
  3074. func waitForPodsGone(ps *PodStore, interval, timeout time.Duration) error {
  3075. return wait.PollImmediate(interval, timeout, func() (bool, error) {
  3076. if pods := ps.List(); len(pods) == 0 {
  3077. return true, nil
  3078. }
  3079. return false, nil
  3080. })
  3081. }
  3082. // Delete a ReplicaSet and all pods it spawned
  3083. func DeleteReplicaSet(c *client.Client, ns, name string) error {
  3084. By(fmt.Sprintf("deleting ReplicaSet %s in namespace %s", name, ns))
  3085. rc, err := c.Extensions().ReplicaSets(ns).Get(name)
  3086. if err != nil {
  3087. if apierrs.IsNotFound(err) {
  3088. Logf("ReplicaSet %s was already deleted: %v", name, err)
  3089. return nil
  3090. }
  3091. return err
  3092. }
  3093. reaper, err := kubectl.ReaperFor(extensions.Kind("ReplicaSet"), c)
  3094. if err != nil {
  3095. if apierrs.IsNotFound(err) {
  3096. Logf("ReplicaSet %s was already deleted: %v", name, err)
  3097. return nil
  3098. }
  3099. return err
  3100. }
  3101. startTime := time.Now()
  3102. err = reaper.Stop(ns, name, 0, nil)
  3103. if apierrs.IsNotFound(err) {
  3104. Logf("ReplicaSet %s was already deleted: %v", name, err)
  3105. return nil
  3106. }
  3107. deleteRSTime := time.Now().Sub(startTime)
  3108. Logf("Deleting RS %s took: %v", name, deleteRSTime)
  3109. if err == nil {
  3110. err = waitForReplicaSetPodsGone(c, rc)
  3111. }
  3112. terminatePodTime := time.Now().Sub(startTime) - deleteRSTime
  3113. Logf("Terminating ReplicaSet %s pods took: %v", name, terminatePodTime)
  3114. return err
  3115. }
  3116. // waitForReplicaSetPodsGone waits until there are no pods reported under a
  3117. // ReplicaSet selector (because the pods have completed termination).
  3118. func waitForReplicaSetPodsGone(c *client.Client, rs *extensions.ReplicaSet) error {
  3119. return wait.PollImmediate(Poll, 2*time.Minute, func() (bool, error) {
  3120. selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
  3121. ExpectNoError(err)
  3122. options := api.ListOptions{LabelSelector: selector}
  3123. if pods, err := c.Pods(rs.Namespace).List(options); err == nil && len(pods.Items) == 0 {
  3124. return true, nil
  3125. }
  3126. return false, nil
  3127. })
  3128. }
  3129. // Waits for the deployment status to become valid (i.e. max unavailable and max surge aren't violated anymore).
  3130. // Note that the status should stay valid at all times unless shortly after a scaling event or the deployment is just created.
  3131. // To verify that the deployment status is valid and wait for the rollout to finish, use WaitForDeploymentStatus instead.
  3132. func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deployment) error {
  3133. var (
  3134. oldRSs, allOldRSs, allRSs []*extensions.ReplicaSet
  3135. newRS *extensions.ReplicaSet
  3136. deployment *extensions.Deployment
  3137. reason string
  3138. )
  3139. err := wait.Poll(Poll, 2*time.Minute, func() (bool, error) {
  3140. var err error
  3141. deployment, err = c.Extensions().Deployments(d.Namespace).Get(d.Name)
  3142. if err != nil {
  3143. return false, err
  3144. }
  3145. oldRSs, allOldRSs, newRS, err = deploymentutil.GetAllReplicaSets(deployment, c)
  3146. if err != nil {
  3147. return false, err
  3148. }
  3149. if newRS == nil {
  3150. // New RC hasn't been created yet.
  3151. reason = "new replica set hasn't been created yet"
  3152. Logf(reason)
  3153. return false, nil
  3154. }
  3155. allRSs = append(oldRSs, newRS)
  3156. // The old/new ReplicaSets need to contain the pod-template-hash label
  3157. for i := range allRSs {
  3158. if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
  3159. reason = "all replica sets need to contain the pod-template-hash label"
  3160. Logf(reason)
  3161. return false, nil
  3162. }
  3163. }
  3164. totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  3165. totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment)
  3166. if err != nil {
  3167. return false, err
  3168. }
  3169. maxCreated := deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment)
  3170. if totalCreated > maxCreated {
  3171. reason = fmt.Sprintf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
  3172. Logf(reason)
  3173. return false, nil
  3174. }
  3175. minAvailable := deploymentutil.MinAvailable(deployment)
  3176. if totalAvailable < minAvailable {
  3177. reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable)
  3178. Logf(reason)
  3179. return false, nil
  3180. }
  3181. return true, nil
  3182. })
  3183. if err == wait.ErrWaitTimeout {
  3184. logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
  3185. logPodsOfDeployment(c, deployment)
  3186. err = fmt.Errorf("%s", reason)
  3187. }
  3188. if err != nil {
  3189. return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
  3190. }
  3191. return nil
  3192. }
  3193. // Waits for the deployment to reach desired state.
  3194. // Returns an error if the deployment's rolling update strategy (max unavailable or max surge) is broken at any times.
  3195. func WaitForDeploymentStatus(c clientset.Interface, d *extensions.Deployment) error {
  3196. var (
  3197. oldRSs, allOldRSs, allRSs []*extensions.ReplicaSet
  3198. newRS *extensions.ReplicaSet
  3199. deployment *extensions.Deployment
  3200. )
  3201. err := wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3202. var err error
  3203. deployment, err = c.Extensions().Deployments(d.Namespace).Get(d.Name)
  3204. if err != nil {
  3205. return false, err
  3206. }
  3207. oldRSs, allOldRSs, newRS, err = deploymentutil.GetAllReplicaSets(deployment, c)
  3208. if err != nil {
  3209. return false, err
  3210. }
  3211. if newRS == nil {
  3212. // New RS hasn't been created yet.
  3213. return false, nil
  3214. }
  3215. allRSs = append(oldRSs, newRS)
  3216. // The old/new ReplicaSets need to contain the pod-template-hash label
  3217. for i := range allRSs {
  3218. if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
  3219. return false, nil
  3220. }
  3221. }
  3222. totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  3223. totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment)
  3224. if err != nil {
  3225. return false, err
  3226. }
  3227. maxCreated := deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment)
  3228. if totalCreated > maxCreated {
  3229. logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
  3230. logPodsOfDeployment(c, deployment)
  3231. return false, fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
  3232. }
  3233. minAvailable := deploymentutil.MinAvailable(deployment)
  3234. if totalAvailable < minAvailable {
  3235. logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
  3236. logPodsOfDeployment(c, deployment)
  3237. return false, fmt.Errorf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable)
  3238. }
  3239. // When the deployment status and its underlying resources reach the desired state, we're done
  3240. if deployment.Status.Replicas == deployment.Spec.Replicas &&
  3241. deployment.Status.UpdatedReplicas == deployment.Spec.Replicas &&
  3242. deploymentutil.GetReplicaCountForReplicaSets(oldRSs) == 0 &&
  3243. deploymentutil.GetReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}) == deployment.Spec.Replicas {
  3244. return true, nil
  3245. }
  3246. return false, nil
  3247. })
  3248. if err == wait.ErrWaitTimeout {
  3249. logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
  3250. logPodsOfDeployment(c, deployment)
  3251. }
  3252. if err != nil {
  3253. return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
  3254. }
  3255. return nil
  3256. }
  3257. // WaitForDeploymentUpdatedReplicasLTE waits for given deployment to be observed by the controller and has at least a number of updatedReplicas
  3258. func WaitForDeploymentUpdatedReplicasLTE(c clientset.Interface, ns, deploymentName string, minUpdatedReplicas int, desiredGeneration int64) error {
  3259. err := wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3260. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
  3261. if err != nil {
  3262. return false, err
  3263. }
  3264. if deployment.Status.ObservedGeneration >= desiredGeneration && deployment.Status.UpdatedReplicas >= int32(minUpdatedReplicas) {
  3265. return true, nil
  3266. }
  3267. return false, nil
  3268. })
  3269. if err != nil {
  3270. return fmt.Errorf("error waiting for deployment %s to have at least %d updpatedReplicas: %v", deploymentName, minUpdatedReplicas, err)
  3271. }
  3272. return nil
  3273. }
  3274. // WaitForDeploymentRollbackCleared waits for given deployment either started rolling back or doesn't need to rollback.
  3275. // Note that rollback should be cleared shortly, so we only wait for 1 minute here to fail early.
  3276. func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName string) error {
  3277. err := wait.Poll(Poll, 1*time.Minute, func() (bool, error) {
  3278. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
  3279. if err != nil {
  3280. return false, err
  3281. }
  3282. // Rollback not set or is kicked off
  3283. if deployment.Spec.RollbackTo == nil {
  3284. return true, nil
  3285. }
  3286. return false, nil
  3287. })
  3288. if err != nil {
  3289. return fmt.Errorf("error waiting for deployment %s rollbackTo to be cleared: %v", deploymentName, err)
  3290. }
  3291. return nil
  3292. }
  3293. // WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
  3294. // Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
  3295. func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error {
  3296. var deployment *extensions.Deployment
  3297. var newRS *extensions.ReplicaSet
  3298. err := wait.Poll(Poll, 1*time.Minute, func() (bool, error) {
  3299. var err error
  3300. deployment, err = c.Extensions().Deployments(ns).Get(deploymentName)
  3301. if err != nil {
  3302. return false, err
  3303. }
  3304. // The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
  3305. newRS, err = deploymentutil.GetNewReplicaSet(deployment, c)
  3306. if err != nil || newRS == nil || !labelsutil.SelectorHasLabel(newRS.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
  3307. return false, err
  3308. }
  3309. // Check revision of this deployment, and of the new replica set of this deployment
  3310. if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision ||
  3311. newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision ||
  3312. deployment.Spec.Template.Spec.Containers[0].Image != image || newRS.Spec.Template.Spec.Containers[0].Image != image {
  3313. return false, nil
  3314. }
  3315. return true, nil
  3316. })
  3317. if err == wait.ErrWaitTimeout {
  3318. logReplicaSetsOfDeployment(deployment, nil, newRS)
  3319. }
  3320. if newRS == nil {
  3321. return fmt.Errorf("deployment %s failed to create new RS: %v", deploymentName, err)
  3322. }
  3323. if err != nil {
  3324. return fmt.Errorf("error waiting for deployment %s (got %s / %s) and new RS %s (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
  3325. }
  3326. return nil
  3327. }
  3328. func WaitForOverlappingAnnotationMatch(c clientset.Interface, ns, deploymentName, expected string) error {
  3329. return wait.Poll(Poll, 1*time.Minute, func() (bool, error) {
  3330. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
  3331. if err != nil {
  3332. return false, err
  3333. }
  3334. if deployment.Annotations[deploymentutil.OverlapAnnotation] == expected {
  3335. return true, nil
  3336. }
  3337. return false, nil
  3338. })
  3339. }
  3340. // CheckNewRSAnnotations check if the new RS's annotation is as expected
  3341. func CheckNewRSAnnotations(c clientset.Interface, ns, deploymentName string, expectedAnnotations map[string]string) error {
  3342. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
  3343. if err != nil {
  3344. return err
  3345. }
  3346. newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
  3347. if err != nil {
  3348. return err
  3349. }
  3350. for k, v := range expectedAnnotations {
  3351. // Skip checking revision annotations
  3352. if k != deploymentutil.RevisionAnnotation && v != newRS.Annotations[k] {
  3353. return fmt.Errorf("Expected new RS annotations = %+v, got %+v", expectedAnnotations, newRS.Annotations)
  3354. }
  3355. }
  3356. return nil
  3357. }
  3358. func WaitForPodsReady(c *clientset.Clientset, ns, name string, minReadySeconds int) error {
  3359. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  3360. options := api.ListOptions{LabelSelector: label}
  3361. return wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3362. pods, err := c.Pods(ns).List(options)
  3363. if err != nil {
  3364. return false, nil
  3365. }
  3366. for _, pod := range pods.Items {
  3367. if !deploymentutil.IsPodAvailable(&pod, int32(minReadySeconds), time.Now()) {
  3368. return false, nil
  3369. }
  3370. }
  3371. return true, nil
  3372. })
  3373. }
  3374. // Waits for the deployment to clean up old rcs.
  3375. func WaitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName string, desiredRSNum int) error {
  3376. return wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3377. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
  3378. if err != nil {
  3379. return false, err
  3380. }
  3381. _, oldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c)
  3382. if err != nil {
  3383. return false, err
  3384. }
  3385. return len(oldRSs) == desiredRSNum, nil
  3386. })
  3387. }
  3388. func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
  3389. Logf("Deployment: %+v. Selector = %+v", *deployment, deployment.Spec.Selector)
  3390. for i := range allOldRSs {
  3391. Logf("All old ReplicaSets (%d/%d) of deployment %s: %+v. Selector = %+v", i+1, len(allOldRSs), deployment.Name, *allOldRSs[i], allOldRSs[i].Spec.Selector)
  3392. }
  3393. if newRS != nil {
  3394. Logf("New ReplicaSet of deployment %s: %+v. Selector = %+v", deployment.Name, *newRS, newRS.Spec.Selector)
  3395. } else {
  3396. Logf("New ReplicaSet of deployment %s is nil.", deployment.Name)
  3397. }
  3398. }
  3399. func WaitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string, desiredGeneration int64) error {
  3400. return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, Poll, 1*time.Minute)
  3401. }
  3402. func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment) {
  3403. minReadySeconds := deployment.Spec.MinReadySeconds
  3404. podList, err := deploymentutil.ListPods(deployment,
  3405. func(namespace string, options api.ListOptions) (*api.PodList, error) {
  3406. return c.Core().Pods(namespace).List(options)
  3407. })
  3408. if err != nil {
  3409. Logf("Failed to list pods of deployment %s: %v", deployment.Name, err)
  3410. return
  3411. }
  3412. if err == nil {
  3413. for _, pod := range podList.Items {
  3414. availability := "not available"
  3415. if deploymentutil.IsPodAvailable(&pod, minReadySeconds, time.Now()) {
  3416. availability = "available"
  3417. }
  3418. Logf("Pod %s is %s: %+v", pod.Name, availability, pod)
  3419. }
  3420. }
  3421. }
  3422. // Waits for the number of events on the given object to reach a desired count.
  3423. func WaitForEvents(c *client.Client, ns string, objOrRef runtime.Object, desiredEventsCount int) error {
  3424. return wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3425. events, err := c.Events(ns).Search(objOrRef)
  3426. if err != nil {
  3427. return false, fmt.Errorf("error in listing events: %s", err)
  3428. }
  3429. eventsCount := len(events.Items)
  3430. if eventsCount == desiredEventsCount {
  3431. return true, nil
  3432. }
  3433. if eventsCount < desiredEventsCount {
  3434. return false, nil
  3435. }
  3436. // Number of events has exceeded the desired count.
  3437. return false, fmt.Errorf("number of events has exceeded the desired count, eventsCount: %d, desiredCount: %d", eventsCount, desiredEventsCount)
  3438. })
  3439. }
  3440. // Waits for the number of events on the given object to be at least a desired count.
  3441. func WaitForPartialEvents(c *client.Client, ns string, objOrRef runtime.Object, atLeastEventsCount int) error {
  3442. return wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
  3443. events, err := c.Events(ns).Search(objOrRef)
  3444. if err != nil {
  3445. return false, fmt.Errorf("error in listing events: %s", err)
  3446. }
  3447. eventsCount := len(events.Items)
  3448. if eventsCount >= atLeastEventsCount {
  3449. return true, nil
  3450. }
  3451. return false, nil
  3452. })
  3453. }
  3454. type updateDeploymentFunc func(d *extensions.Deployment)
  3455. func UpdateDeploymentWithRetries(c *clientset.Clientset, namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
  3456. deployments := c.Extensions().Deployments(namespace)
  3457. err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
  3458. if deployment, err = deployments.Get(name); err != nil {
  3459. return false, err
  3460. }
  3461. // Apply the update, then attempt to push it to the apiserver.
  3462. applyUpdate(deployment)
  3463. if deployment, err = deployments.Update(deployment); err == nil {
  3464. Logf("Updating deployment %s", name)
  3465. return true, nil
  3466. }
  3467. return false, nil
  3468. })
  3469. return deployment, err
  3470. }
  3471. // FailedContainers inspects all containers in a pod and returns failure
  3472. // information for containers that have failed or been restarted.
  3473. // A map is returned where the key is the containerID and the value is a
  3474. // struct containing the restart and failure information
  3475. func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
  3476. var state ContainerFailures
  3477. states := make(map[string]ContainerFailures)
  3478. statuses := pod.Status.ContainerStatuses
  3479. if len(statuses) == 0 {
  3480. return nil
  3481. } else {
  3482. for _, status := range statuses {
  3483. if status.State.Terminated != nil {
  3484. states[status.ContainerID] = ContainerFailures{status: status.State.Terminated}
  3485. } else if status.LastTerminationState.Terminated != nil {
  3486. states[status.ContainerID] = ContainerFailures{status: status.LastTerminationState.Terminated}
  3487. }
  3488. if status.RestartCount > 0 {
  3489. var ok bool
  3490. if state, ok = states[status.ContainerID]; !ok {
  3491. state = ContainerFailures{}
  3492. }
  3493. state.Restarts = int(status.RestartCount)
  3494. states[status.ContainerID] = state
  3495. }
  3496. }
  3497. }
  3498. return states
  3499. }
  3500. // Prints the histogram of the events and returns the number of bad events.
  3501. func BadEvents(events []*api.Event) int {
  3502. type histogramKey struct {
  3503. reason string
  3504. source string
  3505. }
  3506. histogram := make(map[histogramKey]int)
  3507. for _, e := range events {
  3508. histogram[histogramKey{reason: e.Reason, source: e.Source.Component}]++
  3509. }
  3510. for key, number := range histogram {
  3511. Logf("- reason: %s, source: %s -> %d", key.reason, key.source, number)
  3512. }
  3513. badPatterns := []string{"kill", "fail"}
  3514. badEvents := 0
  3515. for key, number := range histogram {
  3516. for _, s := range badPatterns {
  3517. if strings.Contains(key.reason, s) {
  3518. Logf("WARNING %d events from %s with reason: %s", number, key.source, key.reason)
  3519. badEvents += number
  3520. break
  3521. }
  3522. }
  3523. }
  3524. return badEvents
  3525. }
  3526. // NodeAddresses returns the first address of the given type of each node.
  3527. func NodeAddresses(nodelist *api.NodeList, addrType api.NodeAddressType) []string {
  3528. hosts := []string{}
  3529. for _, n := range nodelist.Items {
  3530. for _, addr := range n.Status.Addresses {
  3531. // Use the first external IP address we find on the node, and
  3532. // use at most one per node.
  3533. // TODO(roberthbailey): Use the "preferred" address for the node, once
  3534. // such a thing is defined (#2462).
  3535. if addr.Type == addrType {
  3536. hosts = append(hosts, addr.Address)
  3537. break
  3538. }
  3539. }
  3540. }
  3541. return hosts
  3542. }
  3543. // NodeSSHHosts returns SSH-able host names for all schedulable nodes - this excludes master node.
  3544. // It returns an error if it can't find an external IP for every node, though it still returns all
  3545. // hosts that it found in that case.
  3546. func NodeSSHHosts(c *client.Client) ([]string, error) {
  3547. nodelist := waitListSchedulableNodesOrDie(c)
  3548. // TODO(roberthbailey): Use the "preferred" address for the node, once such a thing is defined (#2462).
  3549. hosts := NodeAddresses(nodelist, api.NodeExternalIP)
  3550. // Error if any node didn't have an external IP.
  3551. if len(hosts) != len(nodelist.Items) {
  3552. return hosts, fmt.Errorf(
  3553. "only found %d external IPs on nodes, but found %d nodes. Nodelist: %v",
  3554. len(hosts), len(nodelist.Items), nodelist)
  3555. }
  3556. sshHosts := make([]string, 0, len(hosts))
  3557. for _, h := range hosts {
  3558. sshHosts = append(sshHosts, net.JoinHostPort(h, "22"))
  3559. }
  3560. return sshHosts, nil
  3561. }
  3562. type SSHResult struct {
  3563. User string
  3564. Host string
  3565. Cmd string
  3566. Stdout string
  3567. Stderr string
  3568. Code int
  3569. }
  3570. // SSH synchronously SSHs to a node running on provider and runs cmd. If there
  3571. // is no error performing the SSH, the stdout, stderr, and exit code are
  3572. // returned.
  3573. func SSH(cmd, host, provider string) (SSHResult, error) {
  3574. result := SSHResult{Host: host, Cmd: cmd}
  3575. // Get a signer for the provider.
  3576. signer, err := GetSigner(provider)
  3577. if err != nil {
  3578. return result, fmt.Errorf("error getting signer for provider %s: '%v'", provider, err)
  3579. }
  3580. // RunSSHCommand will default to Getenv("USER") if user == "", but we're
  3581. // defaulting here as well for logging clarity.
  3582. result.User = os.Getenv("KUBE_SSH_USER")
  3583. if result.User == "" {
  3584. result.User = os.Getenv("USER")
  3585. }
  3586. stdout, stderr, code, err := sshutil.RunSSHCommand(cmd, result.User, host, signer)
  3587. result.Stdout = stdout
  3588. result.Stderr = stderr
  3589. result.Code = code
  3590. return result, err
  3591. }
  3592. func LogSSHResult(result SSHResult) {
  3593. remote := fmt.Sprintf("%s@%s", result.User, result.Host)
  3594. Logf("ssh %s: command: %s", remote, result.Cmd)
  3595. Logf("ssh %s: stdout: %q", remote, result.Stdout)
  3596. Logf("ssh %s: stderr: %q", remote, result.Stderr)
  3597. Logf("ssh %s: exit code: %d", remote, result.Code)
  3598. }
  3599. func IssueSSHCommand(cmd, provider string, node *api.Node) error {
  3600. Logf("Getting external IP address for %s", node.Name)
  3601. host := ""
  3602. for _, a := range node.Status.Addresses {
  3603. if a.Type == api.NodeExternalIP {
  3604. host = a.Address + ":22"
  3605. break
  3606. }
  3607. }
  3608. if host == "" {
  3609. return fmt.Errorf("couldn't find external IP address for node %s", node.Name)
  3610. }
  3611. Logf("Calling %s on %s(%s)", cmd, node.Name, host)
  3612. result, err := SSH(cmd, host, provider)
  3613. LogSSHResult(result)
  3614. if result.Code != 0 || err != nil {
  3615. return fmt.Errorf("failed running %q: %v (exit code %d)", cmd, err, result.Code)
  3616. }
  3617. return nil
  3618. }
  3619. // NewHostExecPodSpec returns the pod spec of hostexec pod
  3620. func NewHostExecPodSpec(ns, name string) *api.Pod {
  3621. pod := &api.Pod{
  3622. ObjectMeta: api.ObjectMeta{
  3623. Name: name,
  3624. Namespace: ns,
  3625. },
  3626. Spec: api.PodSpec{
  3627. Containers: []api.Container{
  3628. {
  3629. Name: "hostexec",
  3630. Image: "gcr.io/google_containers/hostexec:1.2",
  3631. ImagePullPolicy: api.PullIfNotPresent,
  3632. },
  3633. },
  3634. SecurityContext: &api.PodSecurityContext{
  3635. HostNetwork: true,
  3636. },
  3637. },
  3638. }
  3639. return pod
  3640. }
  3641. // RunHostCmd runs the given cmd in the context of the given pod using `kubectl exec`
  3642. // inside of a shell.
  3643. func RunHostCmd(ns, name, cmd string) (string, error) {
  3644. return RunKubectl("exec", fmt.Sprintf("--namespace=%v", ns), name, "--", "/bin/sh", "-c", cmd)
  3645. }
  3646. // RunHostCmdOrDie calls RunHostCmd and dies on error.
  3647. func RunHostCmdOrDie(ns, name, cmd string) string {
  3648. stdout, err := RunHostCmd(ns, name, cmd)
  3649. Logf("stdout: %v", stdout)
  3650. ExpectNoError(err)
  3651. return stdout
  3652. }
  3653. // LaunchHostExecPod launches a hostexec pod in the given namespace and waits
  3654. // until it's Running
  3655. func LaunchHostExecPod(client *client.Client, ns, name string) *api.Pod {
  3656. hostExecPod := NewHostExecPodSpec(ns, name)
  3657. pod, err := client.Pods(ns).Create(hostExecPod)
  3658. ExpectNoError(err)
  3659. err = WaitForPodRunningInNamespace(client, pod)
  3660. ExpectNoError(err)
  3661. return pod
  3662. }
  3663. // GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be
  3664. // used to SSH to their nodes.
  3665. func GetSigner(provider string) (ssh.Signer, error) {
  3666. // Get the directory in which SSH keys are located.
  3667. keydir := filepath.Join(os.Getenv("HOME"), ".ssh")
  3668. // Select the key itself to use. When implementing more providers here,
  3669. // please also add them to any SSH tests that are disabled because of signer
  3670. // support.
  3671. keyfile := ""
  3672. switch provider {
  3673. case "gce", "gke", "kubemark":
  3674. keyfile = "google_compute_engine"
  3675. case "aws":
  3676. // If there is an env. variable override, use that.
  3677. aws_keyfile := os.Getenv("AWS_SSH_KEY")
  3678. if len(aws_keyfile) != 0 {
  3679. return sshutil.MakePrivateKeySignerFromFile(aws_keyfile)
  3680. }
  3681. // Otherwise revert to home dir
  3682. keyfile = "kube_aws_rsa"
  3683. default:
  3684. return nil, fmt.Errorf("GetSigner(...) not implemented for %s", provider)
  3685. }
  3686. key := filepath.Join(keydir, keyfile)
  3687. return sshutil.MakePrivateKeySignerFromFile(key)
  3688. }
  3689. // CheckPodsRunningReady returns whether all pods whose names are listed in
  3690. // podNames in namespace ns are running and ready, using c and waiting at most
  3691. // timeout.
  3692. func CheckPodsRunningReady(c *client.Client, ns string, podNames []string, timeout time.Duration) bool {
  3693. return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReady, "running and ready")
  3694. }
  3695. // CheckPodsRunningReadyOrSucceeded returns whether all pods whose names are
  3696. // listed in podNames in namespace ns are running and ready, or succeeded; use
  3697. // c and waiting at most timeout.
  3698. func CheckPodsRunningReadyOrSucceeded(c *client.Client, ns string, podNames []string, timeout time.Duration) bool {
  3699. return CheckPodsCondition(c, ns, podNames, timeout, PodRunningReadyOrSucceeded, "running and ready, or succeeded")
  3700. }
  3701. // CheckPodsCondition returns whether all pods whose names are listed in podNames
  3702. // in namespace ns are in the condition, using c and waiting at most timeout.
  3703. func CheckPodsCondition(c *client.Client, ns string, podNames []string, timeout time.Duration, condition podCondition, desc string) bool {
  3704. np := len(podNames)
  3705. Logf("Waiting up to %v for %d pods to be %s: %s", timeout, np, desc, podNames)
  3706. result := make(chan bool, len(podNames))
  3707. for ix := range podNames {
  3708. // Launch off pod readiness checkers.
  3709. go func(name string) {
  3710. err := waitForPodCondition(c, ns, name, desc, timeout, condition)
  3711. result <- err == nil
  3712. }(podNames[ix])
  3713. }
  3714. // Wait for them all to finish.
  3715. success := true
  3716. // TODO(a-robinson): Change to `for range` syntax and remove logging once we
  3717. // support only Go >= 1.4.
  3718. for _, podName := range podNames {
  3719. if !<-result {
  3720. Logf("Pod %[1]s failed to be %[2]s.", podName, desc)
  3721. success = false
  3722. }
  3723. }
  3724. Logf("Wanted all %d pods to be %s. Result: %t. Pods: %v", np, desc, success, podNames)
  3725. return success
  3726. }
  3727. // WaitForNodeToBeReady returns whether node name is ready within timeout.
  3728. func WaitForNodeToBeReady(c *client.Client, name string, timeout time.Duration) bool {
  3729. return WaitForNodeToBe(c, name, api.NodeReady, true, timeout)
  3730. }
  3731. // WaitForNodeToBeNotReady returns whether node name is not ready (i.e. the
  3732. // readiness condition is anything but ready, e.g false or unknown) within
  3733. // timeout.
  3734. func WaitForNodeToBeNotReady(c *client.Client, name string, timeout time.Duration) bool {
  3735. return WaitForNodeToBe(c, name, api.NodeReady, false, timeout)
  3736. }
  3737. func isNodeConditionSetAsExpected(node *api.Node, conditionType api.NodeConditionType, wantTrue, silent bool) bool {
  3738. // Check the node readiness condition (logging all).
  3739. for _, cond := range node.Status.Conditions {
  3740. // Ensure that the condition type and the status matches as desired.
  3741. if cond.Type == conditionType {
  3742. if (cond.Status == api.ConditionTrue) == wantTrue {
  3743. return true
  3744. } else {
  3745. if !silent {
  3746. Logf("Condition %s of node %s is %v instead of %t. Reason: %v, message: %v",
  3747. conditionType, node.Name, cond.Status == api.ConditionTrue, wantTrue, cond.Reason, cond.Message)
  3748. }
  3749. return false
  3750. }
  3751. }
  3752. }
  3753. if !silent {
  3754. Logf("Couldn't find condition %v on node %v", conditionType, node.Name)
  3755. }
  3756. return false
  3757. }
  3758. func IsNodeConditionSetAsExpected(node *api.Node, conditionType api.NodeConditionType, wantTrue bool) bool {
  3759. return isNodeConditionSetAsExpected(node, conditionType, wantTrue, false)
  3760. }
  3761. func IsNodeConditionSetAsExpectedSilent(node *api.Node, conditionType api.NodeConditionType, wantTrue bool) bool {
  3762. return isNodeConditionSetAsExpected(node, conditionType, wantTrue, true)
  3763. }
  3764. func IsNodeConditionUnset(node *api.Node, conditionType api.NodeConditionType) bool {
  3765. for _, cond := range node.Status.Conditions {
  3766. if cond.Type == conditionType {
  3767. return false
  3768. }
  3769. }
  3770. return true
  3771. }
  3772. // WaitForNodeToBe returns whether node "name's" condition state matches wantTrue
  3773. // within timeout. If wantTrue is true, it will ensure the node condition status
  3774. // is ConditionTrue; if it's false, it ensures the node condition is in any state
  3775. // other than ConditionTrue (e.g. not true or unknown).
  3776. func WaitForNodeToBe(c *client.Client, name string, conditionType api.NodeConditionType, wantTrue bool, timeout time.Duration) bool {
  3777. Logf("Waiting up to %v for node %s condition %s to be %t", timeout, name, conditionType, wantTrue)
  3778. for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
  3779. node, err := c.Nodes().Get(name)
  3780. if err != nil {
  3781. Logf("Couldn't get node %s", name)
  3782. continue
  3783. }
  3784. if IsNodeConditionSetAsExpected(node, conditionType, wantTrue) {
  3785. return true
  3786. }
  3787. }
  3788. Logf("Node %s didn't reach desired %s condition status (%t) within %v", name, conditionType, wantTrue, timeout)
  3789. return false
  3790. }
  3791. // Checks whether all registered nodes are ready.
  3792. // TODO: we should change the AllNodesReady call in AfterEach to WaitForAllNodesHealthy,
  3793. // and figure out how to do it in a configurable way, as we can't expect all setups to run
  3794. // default test add-ons.
  3795. func AllNodesReady(c *client.Client, timeout time.Duration) error {
  3796. Logf("Waiting up to %v for all nodes to be ready", timeout)
  3797. var notReady []api.Node
  3798. err := wait.PollImmediate(Poll, timeout, func() (bool, error) {
  3799. notReady = nil
  3800. // It should be OK to list unschedulable Nodes here.
  3801. nodes, err := c.Nodes().List(api.ListOptions{})
  3802. if err != nil {
  3803. return false, err
  3804. }
  3805. for _, node := range nodes.Items {
  3806. if !IsNodeConditionSetAsExpected(&node, api.NodeReady, true) {
  3807. notReady = append(notReady, node)
  3808. }
  3809. }
  3810. return len(notReady) == 0, nil
  3811. })
  3812. if err != nil && err != wait.ErrWaitTimeout {
  3813. return err
  3814. }
  3815. if len(notReady) > 0 {
  3816. return fmt.Errorf("Not ready nodes: %v", notReady)
  3817. }
  3818. return nil
  3819. }
  3820. // checks whether all registered nodes are ready and all required Pods are running on them.
  3821. func WaitForAllNodesHealthy(c *client.Client, timeout time.Duration) error {
  3822. Logf("Waiting up to %v for all nodes to be ready", timeout)
  3823. var notReady []api.Node
  3824. var missingPodsPerNode map[string][]string
  3825. err := wait.PollImmediate(Poll, timeout, func() (bool, error) {
  3826. notReady = nil
  3827. // It should be OK to list unschedulable Nodes here.
  3828. nodes, err := c.Nodes().List(api.ListOptions{ResourceVersion: "0"})
  3829. if err != nil {
  3830. return false, err
  3831. }
  3832. for _, node := range nodes.Items {
  3833. if !IsNodeConditionSetAsExpected(&node, api.NodeReady, true) {
  3834. notReady = append(notReady, node)
  3835. }
  3836. }
  3837. pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{ResourceVersion: "0"})
  3838. if err != nil {
  3839. return false, err
  3840. }
  3841. systemPodsPerNode := make(map[string][]string)
  3842. for _, pod := range pods.Items {
  3843. if pod.Namespace == api.NamespaceSystem && pod.Status.Phase == api.PodRunning {
  3844. if pod.Spec.NodeName != "" {
  3845. systemPodsPerNode[pod.Spec.NodeName] = append(systemPodsPerNode[pod.Spec.NodeName], pod.Name)
  3846. }
  3847. }
  3848. }
  3849. missingPodsPerNode = make(map[string][]string)
  3850. for _, node := range nodes.Items {
  3851. if !system.IsMasterNode(&node) {
  3852. for _, requiredPod := range requiredPerNodePods {
  3853. foundRequired := false
  3854. for _, presentPod := range systemPodsPerNode[node.Name] {
  3855. if requiredPod.MatchString(presentPod) {
  3856. foundRequired = true
  3857. break
  3858. }
  3859. }
  3860. if !foundRequired {
  3861. missingPodsPerNode[node.Name] = append(missingPodsPerNode[node.Name], requiredPod.String())
  3862. }
  3863. }
  3864. }
  3865. }
  3866. return len(notReady) == 0 && len(missingPodsPerNode) == 0, nil
  3867. })
  3868. if err != nil && err != wait.ErrWaitTimeout {
  3869. return err
  3870. }
  3871. if len(notReady) > 0 {
  3872. return fmt.Errorf("Not ready nodes: %v", notReady)
  3873. }
  3874. if len(missingPodsPerNode) > 0 {
  3875. return fmt.Errorf("Not running system Pods: %v", missingPodsPerNode)
  3876. }
  3877. return nil
  3878. }
  3879. // Filters nodes in NodeList in place, removing nodes that do not
  3880. // satisfy the given condition
  3881. // TODO: consider merging with pkg/client/cache.NodeLister
  3882. func FilterNodes(nodeList *api.NodeList, fn func(node api.Node) bool) {
  3883. var l []api.Node
  3884. for _, node := range nodeList.Items {
  3885. if fn(node) {
  3886. l = append(l, node)
  3887. }
  3888. }
  3889. nodeList.Items = l
  3890. }
  3891. // ParseKVLines parses output that looks like lines containing "<key>: <val>"
  3892. // and returns <val> if <key> is found. Otherwise, it returns the empty string.
  3893. func ParseKVLines(output, key string) string {
  3894. delim := ":"
  3895. key = key + delim
  3896. for _, line := range strings.Split(output, "\n") {
  3897. pieces := strings.SplitAfterN(line, delim, 2)
  3898. if len(pieces) != 2 {
  3899. continue
  3900. }
  3901. k, v := pieces[0], pieces[1]
  3902. if k == key {
  3903. return strings.TrimSpace(v)
  3904. }
  3905. }
  3906. return ""
  3907. }
  3908. func RestartKubeProxy(host string) error {
  3909. // TODO: Make it work for all providers.
  3910. if !ProviderIs("gce", "gke", "aws") {
  3911. return fmt.Errorf("unsupported provider: %s", TestContext.Provider)
  3912. }
  3913. // kubelet will restart the kube-proxy since it's running in a static pod
  3914. result, err := SSH("sudo pkill kube-proxy", host, TestContext.Provider)
  3915. if err != nil || result.Code != 0 {
  3916. LogSSHResult(result)
  3917. return fmt.Errorf("couldn't restart kube-proxy: %v", err)
  3918. }
  3919. // wait for kube-proxy to come back up
  3920. err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  3921. result, err := SSH("sudo /bin/sh -c 'pgrep kube-proxy | wc -l'", host, TestContext.Provider)
  3922. if err != nil {
  3923. return false, err
  3924. }
  3925. if result.Code != 0 {
  3926. LogSSHResult(result)
  3927. return false, fmt.Errorf("failed to run command, exited %d", result.Code)
  3928. }
  3929. if result.Stdout == "0\n" {
  3930. return false, nil
  3931. }
  3932. Logf("kube-proxy is back up.")
  3933. return true, nil
  3934. })
  3935. if err != nil {
  3936. return fmt.Errorf("kube-proxy didn't recover: %v", err)
  3937. }
  3938. return nil
  3939. }
  3940. func RestartApiserver(c *client.Client) error {
  3941. // TODO: Make it work for all providers.
  3942. if !ProviderIs("gce", "gke", "aws") {
  3943. return fmt.Errorf("unsupported provider: %s", TestContext.Provider)
  3944. }
  3945. if ProviderIs("gce", "aws") {
  3946. return sshRestartMaster()
  3947. }
  3948. // GKE doesn't allow ssh access, so use a same-version master
  3949. // upgrade to teardown/recreate master.
  3950. v, err := c.ServerVersion()
  3951. if err != nil {
  3952. return err
  3953. }
  3954. return masterUpgradeGKE(v.GitVersion[1:]) // strip leading 'v'
  3955. }
  3956. func sshRestartMaster() error {
  3957. if !ProviderIs("gce", "aws") {
  3958. return fmt.Errorf("unsupported provider: %s", TestContext.Provider)
  3959. }
  3960. var command string
  3961. if ProviderIs("gce") {
  3962. command = "sudo docker ps | grep /kube-apiserver | cut -d ' ' -f 1 | xargs sudo docker kill"
  3963. } else {
  3964. command = "sudo /etc/init.d/kube-apiserver restart"
  3965. }
  3966. result, err := SSH(command, GetMasterHost()+":22", TestContext.Provider)
  3967. if err != nil || result.Code != 0 {
  3968. LogSSHResult(result)
  3969. return fmt.Errorf("couldn't restart apiserver: %v", err)
  3970. }
  3971. return nil
  3972. }
  3973. func WaitForApiserverUp(c *client.Client) error {
  3974. for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
  3975. body, err := c.Get().AbsPath("/healthz").Do().Raw()
  3976. if err == nil && string(body) == "ok" {
  3977. return nil
  3978. }
  3979. }
  3980. return fmt.Errorf("waiting for apiserver timed out")
  3981. }
  3982. // WaitForClusterSize waits until the cluster has desired size and there is no not-ready nodes in it.
  3983. // By cluster size we mean number of Nodes excluding Master Node.
  3984. func WaitForClusterSize(c *client.Client, size int, timeout time.Duration) error {
  3985. for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
  3986. nodes, err := c.Nodes().List(api.ListOptions{FieldSelector: fields.Set{
  3987. "spec.unschedulable": "false",
  3988. }.AsSelector()})
  3989. if err != nil {
  3990. Logf("Failed to list nodes: %v", err)
  3991. continue
  3992. }
  3993. numNodes := len(nodes.Items)
  3994. // Filter out not-ready nodes.
  3995. FilterNodes(nodes, func(node api.Node) bool {
  3996. return IsNodeConditionSetAsExpected(&node, api.NodeReady, true)
  3997. })
  3998. numReady := len(nodes.Items)
  3999. if numNodes == size && numReady == size {
  4000. Logf("Cluster has reached the desired size %d", size)
  4001. return nil
  4002. }
  4003. Logf("Waiting for cluster size %d, current size %d, not ready nodes %d", size, numNodes, numNodes-numReady)
  4004. }
  4005. return fmt.Errorf("timeout waiting %v for cluster size to be %d", timeout, size)
  4006. }
  4007. // GetHostExternalAddress gets the node for a pod and returns the first External
  4008. // address. Returns an error if the node the pod is on doesn't have an External
  4009. // address.
  4010. func GetHostExternalAddress(client *client.Client, p *api.Pod) (externalAddress string, err error) {
  4011. node, err := client.Nodes().Get(p.Spec.NodeName)
  4012. if err != nil {
  4013. return "", err
  4014. }
  4015. for _, address := range node.Status.Addresses {
  4016. if address.Type == api.NodeExternalIP {
  4017. if address.Address != "" {
  4018. externalAddress = address.Address
  4019. break
  4020. }
  4021. }
  4022. }
  4023. if externalAddress == "" {
  4024. err = fmt.Errorf("No external address for pod %v on node %v",
  4025. p.Name, p.Spec.NodeName)
  4026. }
  4027. return
  4028. }
  4029. type extractRT struct {
  4030. http.Header
  4031. }
  4032. func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) {
  4033. rt.Header = req.Header
  4034. return &http.Response{}, nil
  4035. }
  4036. // headersForConfig extracts any http client logic necessary for the provided
  4037. // config.
  4038. func headersForConfig(c *restclient.Config) (http.Header, error) {
  4039. extract := &extractRT{}
  4040. rt, err := restclient.HTTPWrappersForConfig(c, extract)
  4041. if err != nil {
  4042. return nil, err
  4043. }
  4044. if _, err := rt.RoundTrip(&http.Request{}); err != nil {
  4045. return nil, err
  4046. }
  4047. return extract.Header, nil
  4048. }
  4049. // OpenWebSocketForURL constructs a websocket connection to the provided URL, using the client
  4050. // config, with the specified protocols.
  4051. func OpenWebSocketForURL(url *url.URL, config *restclient.Config, protocols []string) (*websocket.Conn, error) {
  4052. tlsConfig, err := restclient.TLSConfigFor(config)
  4053. if err != nil {
  4054. return nil, fmt.Errorf("failed to create tls config: %v", err)
  4055. }
  4056. if tlsConfig != nil {
  4057. url.Scheme = "wss"
  4058. if !strings.Contains(url.Host, ":") {
  4059. url.Host += ":443"
  4060. }
  4061. } else {
  4062. url.Scheme = "ws"
  4063. if !strings.Contains(url.Host, ":") {
  4064. url.Host += ":80"
  4065. }
  4066. }
  4067. headers, err := headersForConfig(config)
  4068. if err != nil {
  4069. return nil, fmt.Errorf("failed to load http headers: %v", err)
  4070. }
  4071. cfg, err := websocket.NewConfig(url.String(), "http://localhost")
  4072. if err != nil {
  4073. return nil, fmt.Errorf("failed to create websocket config: %v", err)
  4074. }
  4075. cfg.Header = headers
  4076. cfg.TlsConfig = tlsConfig
  4077. cfg.Protocol = protocols
  4078. return websocket.DialConfig(cfg)
  4079. }
  4080. // getIngressAddress returns the ips/hostnames associated with the Ingress.
  4081. func getIngressAddress(client *client.Client, ns, name string) ([]string, error) {
  4082. ing, err := client.Extensions().Ingress(ns).Get(name)
  4083. if err != nil {
  4084. return nil, err
  4085. }
  4086. addresses := []string{}
  4087. for _, a := range ing.Status.LoadBalancer.Ingress {
  4088. if a.IP != "" {
  4089. addresses = append(addresses, a.IP)
  4090. }
  4091. if a.Hostname != "" {
  4092. addresses = append(addresses, a.Hostname)
  4093. }
  4094. }
  4095. return addresses, nil
  4096. }
  4097. // WaitForIngressAddress waits for the Ingress to acquire an address.
  4098. func WaitForIngressAddress(c *client.Client, ns, ingName string, timeout time.Duration) (string, error) {
  4099. var address string
  4100. err := wait.PollImmediate(10*time.Second, timeout, func() (bool, error) {
  4101. ipOrNameList, err := getIngressAddress(c, ns, ingName)
  4102. if err != nil || len(ipOrNameList) == 0 {
  4103. Logf("Waiting for Ingress %v to acquire IP, error %v", ingName, err)
  4104. return false, nil
  4105. }
  4106. address = ipOrNameList[0]
  4107. return true, nil
  4108. })
  4109. return address, err
  4110. }
  4111. // Looks for the given string in the log of a specific pod container
  4112. func LookForStringInLog(ns, podName, container, expectedString string, timeout time.Duration) (result string, err error) {
  4113. return LookForString(expectedString, timeout, func() string {
  4114. return RunKubectlOrDie("logs", podName, container, fmt.Sprintf("--namespace=%v", ns))
  4115. })
  4116. }
  4117. // Looks for the given string in a file in a specific pod container
  4118. func LookForStringInFile(ns, podName, container, file, expectedString string, timeout time.Duration) (result string, err error) {
  4119. return LookForString(expectedString, timeout, func() string {
  4120. return RunKubectlOrDie("exec", podName, "-c", container, fmt.Sprintf("--namespace=%v", ns), "--", "cat", file)
  4121. })
  4122. }
  4123. // Looks for the given string in the output of a command executed in a specific pod container
  4124. func LookForStringInPodExec(ns, podName string, command []string, expectedString string, timeout time.Duration) (result string, err error) {
  4125. return LookForString(expectedString, timeout, func() string {
  4126. // use the first container
  4127. args := []string{"exec", podName, fmt.Sprintf("--namespace=%v", ns), "--"}
  4128. args = append(args, command...)
  4129. return RunKubectlOrDie(args...)
  4130. })
  4131. }
  4132. // Looks for the given string in the output of fn, repeatedly calling fn until
  4133. // the timeout is reached or the string is found. Returns last log and possibly
  4134. // error if the string was not found.
  4135. func LookForString(expectedString string, timeout time.Duration, fn func() string) (result string, err error) {
  4136. for t := time.Now(); time.Since(t) < timeout; time.Sleep(Poll) {
  4137. result = fn()
  4138. if strings.Contains(result, expectedString) {
  4139. return
  4140. }
  4141. }
  4142. err = fmt.Errorf("Failed to find \"%s\", last result: \"%s\"", expectedString, result)
  4143. return
  4144. }
  4145. // getSvcNodePort returns the node port for the given service:port.
  4146. func getSvcNodePort(client *client.Client, ns, name string, svcPort int) (int, error) {
  4147. svc, err := client.Services(ns).Get(name)
  4148. if err != nil {
  4149. return 0, err
  4150. }
  4151. for _, p := range svc.Spec.Ports {
  4152. if p.Port == int32(svcPort) {
  4153. if p.NodePort != 0 {
  4154. return int(p.NodePort), nil
  4155. }
  4156. }
  4157. }
  4158. return 0, fmt.Errorf(
  4159. "No node port found for service %v, port %v", name, svcPort)
  4160. }
  4161. // GetNodePortURL returns the url to a nodeport Service.
  4162. func GetNodePortURL(client *client.Client, ns, name string, svcPort int) (string, error) {
  4163. nodePort, err := getSvcNodePort(client, ns, name, svcPort)
  4164. if err != nil {
  4165. return "", err
  4166. }
  4167. // This list of nodes must not include the master, which is marked
  4168. // unschedulable, since the master doesn't run kube-proxy. Without
  4169. // kube-proxy NodePorts won't work.
  4170. var nodes *api.NodeList
  4171. if wait.PollImmediate(Poll, SingleCallTimeout, func() (bool, error) {
  4172. nodes, err = client.Nodes().List(api.ListOptions{FieldSelector: fields.Set{
  4173. "spec.unschedulable": "false",
  4174. }.AsSelector()})
  4175. return err == nil, nil
  4176. }) != nil {
  4177. return "", err
  4178. }
  4179. if len(nodes.Items) == 0 {
  4180. return "", fmt.Errorf("Unable to list nodes in cluster.")
  4181. }
  4182. for _, node := range nodes.Items {
  4183. for _, address := range node.Status.Addresses {
  4184. if address.Type == api.NodeExternalIP {
  4185. if address.Address != "" {
  4186. return fmt.Sprintf("http://%v:%v", address.Address, nodePort), nil
  4187. }
  4188. }
  4189. }
  4190. }
  4191. return "", fmt.Errorf("Failed to find external address for service %v", name)
  4192. }
  4193. // ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till
  4194. // none are running, otherwise it does what a synchronous scale operation would do.
  4195. func ScaleRCByLabels(client *client.Client, ns string, l map[string]string, replicas uint) error {
  4196. listOpts := api.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l))}
  4197. rcs, err := client.ReplicationControllers(ns).List(listOpts)
  4198. if err != nil {
  4199. return err
  4200. }
  4201. if len(rcs.Items) == 0 {
  4202. return fmt.Errorf("RC with labels %v not found in ns %v", l, ns)
  4203. }
  4204. Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas)
  4205. for _, labelRC := range rcs.Items {
  4206. name := labelRC.Name
  4207. if err := ScaleRC(client, ns, name, replicas, false); err != nil {
  4208. return err
  4209. }
  4210. rc, err := client.ReplicationControllers(ns).Get(name)
  4211. if err != nil {
  4212. return err
  4213. }
  4214. if replicas == 0 {
  4215. ps, err := podStoreForRC(client, rc)
  4216. if err != nil {
  4217. return err
  4218. }
  4219. defer ps.Stop()
  4220. if err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute); err != nil {
  4221. return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
  4222. }
  4223. } else {
  4224. if err := WaitForPodsWithLabelRunning(
  4225. client, ns, labels.SelectorFromSet(labels.Set(rc.Spec.Selector))); err != nil {
  4226. return err
  4227. }
  4228. }
  4229. }
  4230. return nil
  4231. }
  4232. // TODO(random-liu): Change this to be a member function of the framework.
  4233. func GetPodLogs(c *client.Client, namespace, podName, containerName string) (string, error) {
  4234. return getPodLogsInternal(c, namespace, podName, containerName, false)
  4235. }
  4236. func getPreviousPodLogs(c *client.Client, namespace, podName, containerName string) (string, error) {
  4237. return getPodLogsInternal(c, namespace, podName, containerName, true)
  4238. }
  4239. // utility function for gomega Eventually
  4240. func getPodLogsInternal(c *client.Client, namespace, podName, containerName string, previous bool) (string, error) {
  4241. logs, err := c.Get().
  4242. Resource("pods").
  4243. Namespace(namespace).
  4244. Name(podName).SubResource("log").
  4245. Param("container", containerName).
  4246. Param("previous", strconv.FormatBool(previous)).
  4247. Do().
  4248. Raw()
  4249. if err != nil {
  4250. return "", err
  4251. }
  4252. if err == nil && strings.Contains(string(logs), "Internal Error") {
  4253. return "", fmt.Errorf("Fetched log contains \"Internal Error\": %q.", string(logs))
  4254. }
  4255. return string(logs), err
  4256. }
  4257. // EnsureLoadBalancerResourcesDeleted ensures that cloud load balancer resources that were created
  4258. // are actually cleaned up. Currently only implemented for GCE/GKE.
  4259. func EnsureLoadBalancerResourcesDeleted(ip, portRange string) error {
  4260. if TestContext.Provider == "gce" || TestContext.Provider == "gke" {
  4261. return ensureGCELoadBalancerResourcesDeleted(ip, portRange)
  4262. }
  4263. return nil
  4264. }
  4265. func ensureGCELoadBalancerResourcesDeleted(ip, portRange string) error {
  4266. gceCloud, ok := TestContext.CloudConfig.Provider.(*gcecloud.GCECloud)
  4267. if !ok {
  4268. return fmt.Errorf("failed to convert CloudConfig.Provider to GCECloud: %#v", TestContext.CloudConfig.Provider)
  4269. }
  4270. project := TestContext.CloudConfig.ProjectID
  4271. region, err := gcecloud.GetGCERegion(TestContext.CloudConfig.Zone)
  4272. if err != nil {
  4273. return fmt.Errorf("could not get region for zone %q: %v", TestContext.CloudConfig.Zone, err)
  4274. }
  4275. return wait.Poll(10*time.Second, 5*time.Minute, func() (bool, error) {
  4276. service := gceCloud.GetComputeService()
  4277. list, err := service.ForwardingRules.List(project, region).Do()
  4278. if err != nil {
  4279. return false, err
  4280. }
  4281. for ix := range list.Items {
  4282. item := list.Items[ix]
  4283. if item.PortRange == portRange && item.IPAddress == ip {
  4284. Logf("found a load balancer: %v", item)
  4285. return false, nil
  4286. }
  4287. }
  4288. return true, nil
  4289. })
  4290. }
  4291. // The following helper functions can block/unblock network from source
  4292. // host to destination host by manipulating iptable rules.
  4293. // This function assumes it can ssh to the source host.
  4294. //
  4295. // Caution:
  4296. // Recommend to input IP instead of hostnames. Using hostnames will cause iptables to
  4297. // do a DNS lookup to resolve the name to an IP address, which will
  4298. // slow down the test and cause it to fail if DNS is absent or broken.
  4299. //
  4300. // Suggested usage pattern:
  4301. // func foo() {
  4302. // ...
  4303. // defer UnblockNetwork(from, to)
  4304. // BlockNetwork(from, to)
  4305. // ...
  4306. // }
  4307. //
  4308. func BlockNetwork(from string, to string) {
  4309. Logf("block network traffic from %s to %s", from, to)
  4310. iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
  4311. dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
  4312. if result, err := SSH(dropCmd, from, TestContext.Provider); result.Code != 0 || err != nil {
  4313. LogSSHResult(result)
  4314. Failf("Unexpected error: %v", err)
  4315. }
  4316. }
  4317. func UnblockNetwork(from string, to string) {
  4318. Logf("Unblock network traffic from %s to %s", from, to)
  4319. iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
  4320. undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
  4321. // Undrop command may fail if the rule has never been created.
  4322. // In such case we just lose 30 seconds, but the cluster is healthy.
  4323. // But if the rule had been created and removing it failed, the node is broken and
  4324. // not coming back. Subsequent tests will run or fewer nodes (some of the tests
  4325. // may fail). Manual intervention is required in such case (recreating the
  4326. // cluster solves the problem too).
  4327. err := wait.Poll(time.Millisecond*100, time.Second*30, func() (bool, error) {
  4328. result, err := SSH(undropCmd, from, TestContext.Provider)
  4329. if result.Code == 0 && err == nil {
  4330. return true, nil
  4331. }
  4332. LogSSHResult(result)
  4333. if err != nil {
  4334. Logf("Unexpected error: %v", err)
  4335. }
  4336. return false, nil
  4337. })
  4338. if err != nil {
  4339. Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
  4340. "required on host %s: remove rule %s, if exists", from, iptablesRule)
  4341. }
  4342. }
  4343. func isElementOf(podUID types.UID, pods *api.PodList) bool {
  4344. for _, pod := range pods.Items {
  4345. if pod.UID == podUID {
  4346. return true
  4347. }
  4348. }
  4349. return false
  4350. }
  4351. func CheckRSHashLabel(rs *extensions.ReplicaSet) error {
  4352. if len(rs.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 ||
  4353. len(rs.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 ||
  4354. len(rs.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
  4355. return fmt.Errorf("unexpected RS missing required pod-hash-template: %+v, selector = %+v, template = %+v", rs, rs.Spec.Selector, rs.Spec.Template)
  4356. }
  4357. return nil
  4358. }
  4359. func CheckPodHashLabel(pods *api.PodList) error {
  4360. invalidPod := ""
  4361. for _, pod := range pods.Items {
  4362. if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
  4363. if len(invalidPod) == 0 {
  4364. invalidPod = "unexpected pods missing required pod-hash-template:"
  4365. }
  4366. invalidPod = fmt.Sprintf("%s %+v;", invalidPod, pod)
  4367. }
  4368. }
  4369. if len(invalidPod) > 0 {
  4370. return fmt.Errorf("%s", invalidPod)
  4371. }
  4372. return nil
  4373. }
  4374. // timeout for proxy requests.
  4375. const proxyTimeout = 2 * time.Minute
  4376. // NodeProxyRequest performs a get on a node proxy endpoint given the nodename and rest client.
  4377. func NodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) {
  4378. // proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call.
  4379. // This will leak a goroutine if proxy hangs. #22165
  4380. subResourceProxyAvailable, err := ServerVersionGTE(subResourceServiceAndNodeProxyVersion, c)
  4381. if err != nil {
  4382. return restclient.Result{}, err
  4383. }
  4384. var result restclient.Result
  4385. finished := make(chan struct{})
  4386. go func() {
  4387. if subResourceProxyAvailable {
  4388. result = c.Get().
  4389. Resource("nodes").
  4390. SubResource("proxy").
  4391. Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
  4392. Suffix(endpoint).
  4393. Do()
  4394. } else {
  4395. result = c.Get().
  4396. Prefix("proxy").
  4397. Resource("nodes").
  4398. Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
  4399. Suffix(endpoint).
  4400. Do()
  4401. }
  4402. finished <- struct{}{}
  4403. }()
  4404. select {
  4405. case <-finished:
  4406. return result, nil
  4407. case <-time.After(proxyTimeout):
  4408. return restclient.Result{}, nil
  4409. }
  4410. }
  4411. // GetKubeletPods retrieves the list of pods on the kubelet
  4412. func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
  4413. return getKubeletPods(c, node, "pods")
  4414. }
  4415. // GetKubeletRunningPods retrieves the list of running pods on the kubelet. The pods
  4416. // includes necessary information (e.g., UID, name, namespace for
  4417. // pods/containers), but do not contain the full spec.
  4418. func GetKubeletRunningPods(c *client.Client, node string) (*api.PodList, error) {
  4419. return getKubeletPods(c, node, "runningpods")
  4420. }
  4421. func getKubeletPods(c *client.Client, node, resource string) (*api.PodList, error) {
  4422. result := &api.PodList{}
  4423. client, err := NodeProxyRequest(c, node, resource)
  4424. if err != nil {
  4425. return &api.PodList{}, err
  4426. }
  4427. if err = client.Into(result); err != nil {
  4428. return &api.PodList{}, err
  4429. }
  4430. return result, nil
  4431. }
  4432. // LaunchWebserverPod launches a pod serving http on port 8080 to act
  4433. // as the target for networking connectivity checks. The ip address
  4434. // of the created pod will be returned if the pod is launched
  4435. // successfully.
  4436. func LaunchWebserverPod(f *Framework, podName, nodeName string) (ip string) {
  4437. containerName := fmt.Sprintf("%s-container", podName)
  4438. port := 8080
  4439. pod := &api.Pod{
  4440. ObjectMeta: api.ObjectMeta{
  4441. Name: podName,
  4442. },
  4443. Spec: api.PodSpec{
  4444. Containers: []api.Container{
  4445. {
  4446. Name: containerName,
  4447. Image: "gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab",
  4448. Env: []api.EnvVar{{Name: fmt.Sprintf("SERVE_PORT_%d", port), Value: "foo"}},
  4449. Ports: []api.ContainerPort{{ContainerPort: int32(port)}},
  4450. },
  4451. },
  4452. NodeName: nodeName,
  4453. RestartPolicy: api.RestartPolicyNever,
  4454. },
  4455. }
  4456. podClient := f.Client.Pods(f.Namespace.Name)
  4457. _, err := podClient.Create(pod)
  4458. ExpectNoError(err)
  4459. ExpectNoError(f.WaitForPodRunning(podName))
  4460. createdPod, err := podClient.Get(podName)
  4461. ExpectNoError(err)
  4462. ip = fmt.Sprintf("%s:%d", createdPod.Status.PodIP, port)
  4463. Logf("Target pod IP:port is %s", ip)
  4464. return
  4465. }
  4466. // CheckConnectivityToHost launches a pod running wget on the
  4467. // specified node to test connectivity to the specified host. An
  4468. // error will be returned if the host is not reachable from the pod.
  4469. func CheckConnectivityToHost(f *Framework, nodeName, podName, host string, timeout int) error {
  4470. contName := fmt.Sprintf("%s-container", podName)
  4471. pod := &api.Pod{
  4472. ObjectMeta: api.ObjectMeta{
  4473. Name: podName,
  4474. },
  4475. Spec: api.PodSpec{
  4476. Containers: []api.Container{
  4477. {
  4478. Name: contName,
  4479. Image: "gcr.io/google_containers/busybox:1.24",
  4480. Command: []string{"wget", fmt.Sprintf("--timeout=%d", timeout), "-s", host},
  4481. },
  4482. },
  4483. NodeName: nodeName,
  4484. RestartPolicy: api.RestartPolicyNever,
  4485. },
  4486. }
  4487. podClient := f.Client.Pods(f.Namespace.Name)
  4488. _, err := podClient.Create(pod)
  4489. if err != nil {
  4490. return err
  4491. }
  4492. defer podClient.Delete(podName, nil)
  4493. err = WaitForPodSuccessInNamespace(f.Client, podName, contName, f.Namespace.Name)
  4494. if err != nil {
  4495. logs, logErr := GetPodLogs(f.Client, f.Namespace.Name, pod.Name, contName)
  4496. if logErr != nil {
  4497. Logf("Warning: Failed to get logs from pod %q: %v", pod.Name, logErr)
  4498. } else {
  4499. Logf("pod %s/%s \"wget\" logs:\n%s", f.Namespace.Name, pod.Name, logs)
  4500. }
  4501. }
  4502. return err
  4503. }
  4504. // CoreDump SSHs to the master and all nodes and dumps their logs into dir.
  4505. // It shells out to cluster/log-dump.sh to accomplish this.
  4506. func CoreDump(dir string) {
  4507. cmd := exec.Command(path.Join(TestContext.RepoRoot, "cluster", "log-dump.sh"), dir)
  4508. cmd.Stdout = os.Stdout
  4509. cmd.Stderr = os.Stderr
  4510. if err := cmd.Run(); err != nil {
  4511. Logf("Error running cluster/log-dump.sh: %v", err)
  4512. }
  4513. }
  4514. func UpdatePodWithRetries(client *client.Client, ns, name string, update func(*api.Pod)) (*api.Pod, error) {
  4515. for i := 0; i < 3; i++ {
  4516. pod, err := client.Pods(ns).Get(name)
  4517. if err != nil {
  4518. return nil, fmt.Errorf("Failed to get pod %q: %v", name, err)
  4519. }
  4520. update(pod)
  4521. pod, err = client.Pods(ns).Update(pod)
  4522. if err == nil {
  4523. return pod, nil
  4524. }
  4525. if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
  4526. return nil, fmt.Errorf("Failed to update pod %q: %v", name, err)
  4527. }
  4528. }
  4529. return nil, fmt.Errorf("Too many retries updating Pod %q", name)
  4530. }
  4531. func GetPodsInNamespace(c *client.Client, ns string, ignoreLabels map[string]string) ([]*api.Pod, error) {
  4532. pods, err := c.Pods(ns).List(api.ListOptions{})
  4533. if err != nil {
  4534. return []*api.Pod{}, err
  4535. }
  4536. ignoreSelector := labels.SelectorFromSet(ignoreLabels)
  4537. filtered := []*api.Pod{}
  4538. for _, p := range pods.Items {
  4539. if len(ignoreLabels) != 0 && ignoreSelector.Matches(labels.Set(p.Labels)) {
  4540. continue
  4541. }
  4542. filtered = append(filtered, &p)
  4543. }
  4544. return filtered, nil
  4545. }
  4546. // RunCmd runs cmd using args and returns its stdout and stderr. It also outputs
  4547. // cmd's stdout and stderr to their respective OS streams.
  4548. func RunCmd(command string, args ...string) (string, string, error) {
  4549. Logf("Running %s %v", command, args)
  4550. var bout, berr bytes.Buffer
  4551. cmd := exec.Command(command, args...)
  4552. // We also output to the OS stdout/stderr to aid in debugging in case cmd
  4553. // hangs and never returns before the test gets killed.
  4554. //
  4555. // This creates some ugly output because gcloud doesn't always provide
  4556. // newlines.
  4557. cmd.Stdout = io.MultiWriter(os.Stdout, &bout)
  4558. cmd.Stderr = io.MultiWriter(os.Stderr, &berr)
  4559. err := cmd.Run()
  4560. stdout, stderr := bout.String(), berr.String()
  4561. if err != nil {
  4562. return "", "", fmt.Errorf("error running %s %v; got error %v, stdout %q, stderr %q",
  4563. command, args, err, stdout, stderr)
  4564. }
  4565. return stdout, stderr, nil
  4566. }
  4567. // retryCmd runs cmd using args and retries it for up to SingleCallTimeout if
  4568. // it returns an error. It returns stdout and stderr.
  4569. func retryCmd(command string, args ...string) (string, string, error) {
  4570. var err error
  4571. stdout, stderr := "", ""
  4572. wait.Poll(Poll, SingleCallTimeout, func() (bool, error) {
  4573. stdout, stderr, err = RunCmd(command, args...)
  4574. if err != nil {
  4575. Logf("Got %v", err)
  4576. return false, nil
  4577. }
  4578. return true, nil
  4579. })
  4580. return stdout, stderr, err
  4581. }
  4582. // GetPodsScheduled returns a number of currently scheduled and not scheduled Pods.
  4583. func GetPodsScheduled(masterNodes sets.String, pods *api.PodList) (scheduledPods, notScheduledPods []api.Pod) {
  4584. for _, pod := range pods.Items {
  4585. if !masterNodes.Has(pod.Spec.NodeName) {
  4586. if pod.Spec.NodeName != "" {
  4587. _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
  4588. Expect(scheduledCondition != nil).To(Equal(true))
  4589. Expect(scheduledCondition.Status).To(Equal(api.ConditionTrue))
  4590. scheduledPods = append(scheduledPods, pod)
  4591. } else {
  4592. _, scheduledCondition := api.GetPodCondition(&pod.Status, api.PodScheduled)
  4593. Expect(scheduledCondition != nil).To(Equal(true))
  4594. Expect(scheduledCondition.Status).To(Equal(api.ConditionFalse))
  4595. if scheduledCondition.Reason == "Unschedulable" {
  4596. notScheduledPods = append(notScheduledPods, pod)
  4597. }
  4598. }
  4599. }
  4600. }
  4601. return
  4602. }
  4603. // WaitForStableCluster waits until all existing pods are scheduled and returns their amount.
  4604. func WaitForStableCluster(c *client.Client, masterNodes sets.String) int {
  4605. timeout := 10 * time.Minute
  4606. startTime := time.Now()
  4607. allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
  4608. ExpectNoError(err)
  4609. // API server returns also Pods that succeeded. We need to filter them out.
  4610. currentPods := make([]api.Pod, 0, len(allPods.Items))
  4611. for _, pod := range allPods.Items {
  4612. if pod.Status.Phase != api.PodSucceeded && pod.Status.Phase != api.PodFailed {
  4613. currentPods = append(currentPods, pod)
  4614. }
  4615. }
  4616. allPods.Items = currentPods
  4617. scheduledPods, currentlyNotScheduledPods := GetPodsScheduled(masterNodes, allPods)
  4618. for len(currentlyNotScheduledPods) != 0 {
  4619. time.Sleep(2 * time.Second)
  4620. allPods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{})
  4621. ExpectNoError(err)
  4622. scheduledPods, currentlyNotScheduledPods = GetPodsScheduled(masterNodes, allPods)
  4623. if startTime.Add(timeout).Before(time.Now()) {
  4624. Failf("Timed out after %v waiting for stable cluster.", timeout)
  4625. break
  4626. }
  4627. }
  4628. return len(scheduledPods)
  4629. }
  4630. // GetMasterAndWorkerNodesOrDie will return a list masters and schedulable worker nodes
  4631. func GetMasterAndWorkerNodesOrDie(c *client.Client) (sets.String, *api.NodeList) {
  4632. nodes := &api.NodeList{}
  4633. masters := sets.NewString()
  4634. all, _ := c.Nodes().List(api.ListOptions{})
  4635. for _, n := range all.Items {
  4636. if system.IsMasterNode(&n) {
  4637. masters.Insert(n.Name)
  4638. } else if isNodeSchedulable(&n) {
  4639. nodes.Items = append(nodes.Items, n)
  4640. }
  4641. }
  4642. return masters, nodes
  4643. }
  4644. func CreateFileForGoBinData(gobindataPath, outputFilename string) error {
  4645. data := ReadOrDie(gobindataPath)
  4646. if len(data) == 0 {
  4647. return fmt.Errorf("Failed to read gobindata from %v", gobindataPath)
  4648. }
  4649. fullPath := filepath.Join(TestContext.OutputDir, outputFilename)
  4650. err := os.MkdirAll(filepath.Dir(fullPath), 0777)
  4651. if err != nil {
  4652. return fmt.Errorf("Error while creating directory %v: %v", filepath.Dir(fullPath), err)
  4653. }
  4654. err = ioutil.WriteFile(fullPath, data, 0644)
  4655. if err != nil {
  4656. return fmt.Errorf("Error while trying to write to file %v: %v", fullPath, err)
  4657. }
  4658. return nil
  4659. }