123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kubelet
- import (
- "reflect"
- "sync"
- "testing"
- "time"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/client/record"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/kubelet/util/queue"
- "k8s.io/kubernetes/pkg/types"
- "k8s.io/kubernetes/pkg/util/clock"
- )
- // fakePodWorkers runs sync pod function in serial, so we can have
- // deterministic behaviour in testing.
- type fakePodWorkers struct {
- syncPodFn syncPodFnType
- cache kubecontainer.Cache
- t TestingInterface
- }
- func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
- status, err := f.cache.Get(options.Pod.UID)
- if err != nil {
- f.t.Errorf("Unexpected error: %v", err)
- }
- if err := f.syncPodFn(syncPodOptions{
- mirrorPod: options.MirrorPod,
- pod: options.Pod,
- podStatus: status,
- updateType: options.UpdateType,
- killPodOptions: options.KillPodOptions,
- }); err != nil {
- f.t.Errorf("Unexpected error: %v", err)
- }
- }
- func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
- func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
- type TestingInterface interface {
- Errorf(format string, args ...interface{})
- }
- func newPod(uid, name string) *api.Pod {
- return &api.Pod{
- ObjectMeta: api.ObjectMeta{
- UID: types.UID(uid),
- Name: name,
- },
- }
- }
- // syncPodRecord is a record of a sync pod call
- type syncPodRecord struct {
- name string
- updateType kubetypes.SyncPodType
- }
- func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
- lock := sync.Mutex{}
- processed := make(map[types.UID][]syncPodRecord)
- fakeRecorder := &record.FakeRecorder{}
- fakeRuntime := &containertest.FakeRuntime{}
- fakeCache := containertest.NewFakeCache(fakeRuntime)
- podWorkers := newPodWorkers(
- func(options syncPodOptions) error {
- func() {
- lock.Lock()
- defer lock.Unlock()
- pod := options.pod
- processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
- name: pod.Name,
- updateType: options.updateType,
- })
- }()
- return nil
- },
- fakeRecorder,
- queue.NewBasicWorkQueue(&clock.RealClock{}),
- time.Second,
- time.Second,
- fakeCache,
- )
- return podWorkers, processed
- }
- func drainWorkers(podWorkers *podWorkers, numPods int) {
- for {
- stillWorking := false
- podWorkers.podLock.Lock()
- for i := 0; i < numPods; i++ {
- if podWorkers.isWorking[types.UID(string(i))] {
- stillWorking = true
- }
- }
- podWorkers.podLock.Unlock()
- if !stillWorking {
- break
- }
- time.Sleep(50 * time.Millisecond)
- }
- }
- func TestUpdatePod(t *testing.T) {
- podWorkers, processed := createPodWorkers()
- // Check whether all pod updates will be processed.
- numPods := 20
- for i := 0; i < numPods; i++ {
- for j := i; j < numPods; j++ {
- podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: newPod(string(j), string(i)),
- UpdateType: kubetypes.SyncPodCreate,
- })
- }
- }
- drainWorkers(podWorkers, numPods)
- if len(processed) != numPods {
- t.Errorf("Not all pods processed: %v", len(processed))
- return
- }
- for i := 0; i < numPods; i++ {
- uid := types.UID(i)
- if len(processed[uid]) < 1 || len(processed[uid]) > i+1 {
- t.Errorf("Pod %v processed %v times", i, len(processed[uid]))
- continue
- }
- first := 0
- last := len(processed[uid]) - 1
- if processed[uid][first].name != string(0) {
- t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
- }
- if processed[uid][last].name != string(i) {
- t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
- }
- }
- }
- func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
- podWorkers, processed := createPodWorkers()
- numPods := 20
- for i := 0; i < numPods; i++ {
- pod := newPod(string(i), string(i))
- podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- UpdateType: kubetypes.SyncPodCreate,
- })
- podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- UpdateType: kubetypes.SyncPodKill,
- })
- podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- UpdateType: kubetypes.SyncPodUpdate,
- })
- }
- drainWorkers(podWorkers, numPods)
- if len(processed) != numPods {
- t.Errorf("Not all pods processed: %v", len(processed))
- return
- }
- for i := 0; i < numPods; i++ {
- uid := types.UID(i)
- // each pod should be processed two times (create, kill, but not update)
- syncPodRecords := processed[uid]
- if len(syncPodRecords) < 2 {
- t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
- continue
- }
- if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
- t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
- }
- if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
- t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
- }
- }
- }
- func TestForgetNonExistingPodWorkers(t *testing.T) {
- podWorkers, _ := createPodWorkers()
- numPods := 20
- for i := 0; i < numPods; i++ {
- podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: newPod(string(i), "name"),
- UpdateType: kubetypes.SyncPodUpdate,
- })
- }
- drainWorkers(podWorkers, numPods)
- if len(podWorkers.podUpdates) != numPods {
- t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
- }
- desiredPods := map[types.UID]empty{}
- desiredPods[types.UID(2)] = empty{}
- desiredPods[types.UID(14)] = empty{}
- podWorkers.ForgetNonExistingPodWorkers(desiredPods)
- if len(podWorkers.podUpdates) != 2 {
- t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
- }
- if _, exists := podWorkers.podUpdates[types.UID(2)]; !exists {
- t.Errorf("No updates channel for pod 2")
- }
- if _, exists := podWorkers.podUpdates[types.UID(14)]; !exists {
- t.Errorf("No updates channel for pod 14")
- }
- podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{})
- if len(podWorkers.podUpdates) != 0 {
- t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
- }
- }
- type simpleFakeKubelet struct {
- pod *api.Pod
- mirrorPod *api.Pod
- podStatus *kubecontainer.PodStatus
- wg sync.WaitGroup
- }
- func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
- kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
- return nil
- }
- func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
- kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
- kl.wg.Done()
- return nil
- }
- // byContainerName sort the containers in a running pod by their names.
- type byContainerName kubecontainer.Pod
- func (b byContainerName) Len() int { return len(b.Containers) }
- func (b byContainerName) Swap(i, j int) {
- b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i]
- }
- func (b byContainerName) Less(i, j int) bool {
- return b.Containers[i].Name < b.Containers[j].Name
- }
- // TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
- // for their invocation of the syncPodFn.
- func TestFakePodWorkers(t *testing.T) {
- fakeRecorder := &record.FakeRecorder{}
- fakeRuntime := &containertest.FakeRuntime{}
- fakeCache := containertest.NewFakeCache(fakeRuntime)
- kubeletForRealWorkers := &simpleFakeKubelet{}
- kubeletForFakeWorkers := &simpleFakeKubelet{}
- realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
- fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
- tests := []struct {
- pod *api.Pod
- mirrorPod *api.Pod
- }{
- {
- &api.Pod{},
- &api.Pod{},
- },
- {
- podWithUidNameNs("12345678", "foo", "new"),
- podWithUidNameNs("12345678", "fooMirror", "new"),
- },
- {
- podWithUidNameNs("98765", "bar", "new"),
- podWithUidNameNs("98765", "barMirror", "new"),
- },
- }
- for i, tt := range tests {
- kubeletForRealWorkers.wg.Add(1)
- realPodWorkers.UpdatePod(&UpdatePodOptions{
- Pod: tt.pod,
- MirrorPod: tt.mirrorPod,
- UpdateType: kubetypes.SyncPodUpdate,
- })
- fakePodWorkers.UpdatePod(&UpdatePodOptions{
- Pod: tt.pod,
- MirrorPod: tt.mirrorPod,
- UpdateType: kubetypes.SyncPodUpdate,
- })
- kubeletForRealWorkers.wg.Wait()
- if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
- t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
- }
- if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
- t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
- }
- if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
- t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
- }
- }
- }
- // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
- func TestKillPodNowFunc(t *testing.T) {
- podWorkers, processed := createPodWorkers()
- killPodFunc := killPodNow(podWorkers)
- pod := newPod("test", "test")
- gracePeriodOverride := int64(0)
- err := killPodFunc(pod, api.PodStatus{Phase: api.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
- if err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- if len(processed) != 1 {
- t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
- return
- }
- syncPodRecords := processed[pod.UID]
- if len(syncPodRecords) != 1 {
- t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
- }
- if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
- t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
- }
- }
|