pod_workers_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "reflect"
  16. "sync"
  17. "testing"
  18. "time"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/client/record"
  21. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  22. containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
  23. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  24. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  25. "k8s.io/kubernetes/pkg/types"
  26. "k8s.io/kubernetes/pkg/util/clock"
  27. )
  28. // fakePodWorkers runs sync pod function in serial, so we can have
  29. // deterministic behaviour in testing.
  30. type fakePodWorkers struct {
  31. syncPodFn syncPodFnType
  32. cache kubecontainer.Cache
  33. t TestingInterface
  34. }
  35. func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
  36. status, err := f.cache.Get(options.Pod.UID)
  37. if err != nil {
  38. f.t.Errorf("Unexpected error: %v", err)
  39. }
  40. if err := f.syncPodFn(syncPodOptions{
  41. mirrorPod: options.MirrorPod,
  42. pod: options.Pod,
  43. podStatus: status,
  44. updateType: options.UpdateType,
  45. killPodOptions: options.KillPodOptions,
  46. }); err != nil {
  47. f.t.Errorf("Unexpected error: %v", err)
  48. }
  49. }
  50. func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
  51. func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
  52. type TestingInterface interface {
  53. Errorf(format string, args ...interface{})
  54. }
  55. func newPod(uid, name string) *api.Pod {
  56. return &api.Pod{
  57. ObjectMeta: api.ObjectMeta{
  58. UID: types.UID(uid),
  59. Name: name,
  60. },
  61. }
  62. }
  63. // syncPodRecord is a record of a sync pod call
  64. type syncPodRecord struct {
  65. name string
  66. updateType kubetypes.SyncPodType
  67. }
  68. func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
  69. lock := sync.Mutex{}
  70. processed := make(map[types.UID][]syncPodRecord)
  71. fakeRecorder := &record.FakeRecorder{}
  72. fakeRuntime := &containertest.FakeRuntime{}
  73. fakeCache := containertest.NewFakeCache(fakeRuntime)
  74. podWorkers := newPodWorkers(
  75. func(options syncPodOptions) error {
  76. func() {
  77. lock.Lock()
  78. defer lock.Unlock()
  79. pod := options.pod
  80. processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
  81. name: pod.Name,
  82. updateType: options.updateType,
  83. })
  84. }()
  85. return nil
  86. },
  87. fakeRecorder,
  88. queue.NewBasicWorkQueue(&clock.RealClock{}),
  89. time.Second,
  90. time.Second,
  91. fakeCache,
  92. )
  93. return podWorkers, processed
  94. }
  95. func drainWorkers(podWorkers *podWorkers, numPods int) {
  96. for {
  97. stillWorking := false
  98. podWorkers.podLock.Lock()
  99. for i := 0; i < numPods; i++ {
  100. if podWorkers.isWorking[types.UID(string(i))] {
  101. stillWorking = true
  102. }
  103. }
  104. podWorkers.podLock.Unlock()
  105. if !stillWorking {
  106. break
  107. }
  108. time.Sleep(50 * time.Millisecond)
  109. }
  110. }
  111. func TestUpdatePod(t *testing.T) {
  112. podWorkers, processed := createPodWorkers()
  113. // Check whether all pod updates will be processed.
  114. numPods := 20
  115. for i := 0; i < numPods; i++ {
  116. for j := i; j < numPods; j++ {
  117. podWorkers.UpdatePod(&UpdatePodOptions{
  118. Pod: newPod(string(j), string(i)),
  119. UpdateType: kubetypes.SyncPodCreate,
  120. })
  121. }
  122. }
  123. drainWorkers(podWorkers, numPods)
  124. if len(processed) != numPods {
  125. t.Errorf("Not all pods processed: %v", len(processed))
  126. return
  127. }
  128. for i := 0; i < numPods; i++ {
  129. uid := types.UID(i)
  130. if len(processed[uid]) < 1 || len(processed[uid]) > i+1 {
  131. t.Errorf("Pod %v processed %v times", i, len(processed[uid]))
  132. continue
  133. }
  134. first := 0
  135. last := len(processed[uid]) - 1
  136. if processed[uid][first].name != string(0) {
  137. t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
  138. }
  139. if processed[uid][last].name != string(i) {
  140. t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
  141. }
  142. }
  143. }
  144. func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
  145. podWorkers, processed := createPodWorkers()
  146. numPods := 20
  147. for i := 0; i < numPods; i++ {
  148. pod := newPod(string(i), string(i))
  149. podWorkers.UpdatePod(&UpdatePodOptions{
  150. Pod: pod,
  151. UpdateType: kubetypes.SyncPodCreate,
  152. })
  153. podWorkers.UpdatePod(&UpdatePodOptions{
  154. Pod: pod,
  155. UpdateType: kubetypes.SyncPodKill,
  156. })
  157. podWorkers.UpdatePod(&UpdatePodOptions{
  158. Pod: pod,
  159. UpdateType: kubetypes.SyncPodUpdate,
  160. })
  161. }
  162. drainWorkers(podWorkers, numPods)
  163. if len(processed) != numPods {
  164. t.Errorf("Not all pods processed: %v", len(processed))
  165. return
  166. }
  167. for i := 0; i < numPods; i++ {
  168. uid := types.UID(i)
  169. // each pod should be processed two times (create, kill, but not update)
  170. syncPodRecords := processed[uid]
  171. if len(syncPodRecords) < 2 {
  172. t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
  173. continue
  174. }
  175. if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
  176. t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
  177. }
  178. if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
  179. t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
  180. }
  181. }
  182. }
  183. func TestForgetNonExistingPodWorkers(t *testing.T) {
  184. podWorkers, _ := createPodWorkers()
  185. numPods := 20
  186. for i := 0; i < numPods; i++ {
  187. podWorkers.UpdatePod(&UpdatePodOptions{
  188. Pod: newPod(string(i), "name"),
  189. UpdateType: kubetypes.SyncPodUpdate,
  190. })
  191. }
  192. drainWorkers(podWorkers, numPods)
  193. if len(podWorkers.podUpdates) != numPods {
  194. t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  195. }
  196. desiredPods := map[types.UID]empty{}
  197. desiredPods[types.UID(2)] = empty{}
  198. desiredPods[types.UID(14)] = empty{}
  199. podWorkers.ForgetNonExistingPodWorkers(desiredPods)
  200. if len(podWorkers.podUpdates) != 2 {
  201. t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  202. }
  203. if _, exists := podWorkers.podUpdates[types.UID(2)]; !exists {
  204. t.Errorf("No updates channel for pod 2")
  205. }
  206. if _, exists := podWorkers.podUpdates[types.UID(14)]; !exists {
  207. t.Errorf("No updates channel for pod 14")
  208. }
  209. podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{})
  210. if len(podWorkers.podUpdates) != 0 {
  211. t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  212. }
  213. }
  214. type simpleFakeKubelet struct {
  215. pod *api.Pod
  216. mirrorPod *api.Pod
  217. podStatus *kubecontainer.PodStatus
  218. wg sync.WaitGroup
  219. }
  220. func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
  221. kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
  222. return nil
  223. }
  224. func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
  225. kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
  226. kl.wg.Done()
  227. return nil
  228. }
  229. // byContainerName sort the containers in a running pod by their names.
  230. type byContainerName kubecontainer.Pod
  231. func (b byContainerName) Len() int { return len(b.Containers) }
  232. func (b byContainerName) Swap(i, j int) {
  233. b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i]
  234. }
  235. func (b byContainerName) Less(i, j int) bool {
  236. return b.Containers[i].Name < b.Containers[j].Name
  237. }
  238. // TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
  239. // for their invocation of the syncPodFn.
  240. func TestFakePodWorkers(t *testing.T) {
  241. fakeRecorder := &record.FakeRecorder{}
  242. fakeRuntime := &containertest.FakeRuntime{}
  243. fakeCache := containertest.NewFakeCache(fakeRuntime)
  244. kubeletForRealWorkers := &simpleFakeKubelet{}
  245. kubeletForFakeWorkers := &simpleFakeKubelet{}
  246. realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
  247. fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
  248. tests := []struct {
  249. pod *api.Pod
  250. mirrorPod *api.Pod
  251. }{
  252. {
  253. &api.Pod{},
  254. &api.Pod{},
  255. },
  256. {
  257. podWithUidNameNs("12345678", "foo", "new"),
  258. podWithUidNameNs("12345678", "fooMirror", "new"),
  259. },
  260. {
  261. podWithUidNameNs("98765", "bar", "new"),
  262. podWithUidNameNs("98765", "barMirror", "new"),
  263. },
  264. }
  265. for i, tt := range tests {
  266. kubeletForRealWorkers.wg.Add(1)
  267. realPodWorkers.UpdatePod(&UpdatePodOptions{
  268. Pod: tt.pod,
  269. MirrorPod: tt.mirrorPod,
  270. UpdateType: kubetypes.SyncPodUpdate,
  271. })
  272. fakePodWorkers.UpdatePod(&UpdatePodOptions{
  273. Pod: tt.pod,
  274. MirrorPod: tt.mirrorPod,
  275. UpdateType: kubetypes.SyncPodUpdate,
  276. })
  277. kubeletForRealWorkers.wg.Wait()
  278. if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
  279. t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
  280. }
  281. if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
  282. t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
  283. }
  284. if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
  285. t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
  286. }
  287. }
  288. }
  289. // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
  290. func TestKillPodNowFunc(t *testing.T) {
  291. podWorkers, processed := createPodWorkers()
  292. killPodFunc := killPodNow(podWorkers)
  293. pod := newPod("test", "test")
  294. gracePeriodOverride := int64(0)
  295. err := killPodFunc(pod, api.PodStatus{Phase: api.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
  296. if err != nil {
  297. t.Errorf("Unexpected error: %v", err)
  298. }
  299. if len(processed) != 1 {
  300. t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
  301. return
  302. }
  303. syncPodRecords := processed[pod.UID]
  304. if len(syncPodRecords) != 1 {
  305. t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
  306. }
  307. if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
  308. t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
  309. }
  310. }