mutation_detector.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "os"
  17. "reflect"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "k8s.io/klog/v2"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/util/diff"
  24. )
  25. var mutationDetectionEnabled = false
  26. func init() {
  27. mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
  28. }
  29. // MutationDetector is able to monitor objects for mutation within a limited window of time
  30. type MutationDetector interface {
  31. // AddObject adds the given object to the set being monitored for a while from now
  32. AddObject(obj interface{})
  33. // Run starts the monitoring and does not return until the monitoring is stopped.
  34. Run(stopCh <-chan struct{})
  35. }
  36. // NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
  37. func NewCacheMutationDetector(name string) MutationDetector {
  38. if !mutationDetectionEnabled {
  39. return dummyMutationDetector{}
  40. }
  41. klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
  42. return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
  43. }
  44. type dummyMutationDetector struct{}
  45. func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
  46. }
  47. func (dummyMutationDetector) AddObject(obj interface{}) {
  48. }
  49. // defaultCacheMutationDetector gives a way to detect if a cached object has been mutated
  50. // It has a list of cached objects and their copies. I haven't thought of a way
  51. // to see WHO is mutating it, just that it's getting mutated.
  52. type defaultCacheMutationDetector struct {
  53. name string
  54. period time.Duration
  55. // compareLock ensures only a single call to CompareObjects runs at a time
  56. compareObjectsLock sync.Mutex
  57. // addLock guards addedObjs between AddObject and CompareObjects
  58. addedObjsLock sync.Mutex
  59. addedObjs []cacheObj
  60. cachedObjs []cacheObj
  61. retainDuration time.Duration
  62. lastRotated time.Time
  63. retainedCachedObjs []cacheObj
  64. // failureFunc is injectable for unit testing. If you don't have it, the process will panic.
  65. // This panic is intentional, since turning on this detection indicates you want a strong
  66. // failure signal. This failure is effectively a p0 bug and you can't trust process results
  67. // after a mutation anyway.
  68. failureFunc func(message string)
  69. }
  70. // cacheObj holds the actual object and a copy
  71. type cacheObj struct {
  72. cached interface{}
  73. copied interface{}
  74. }
  75. func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
  76. // we DON'T want protection from panics. If we're running this code, we want to die
  77. for {
  78. if d.lastRotated.IsZero() {
  79. d.lastRotated = time.Now()
  80. } else if time.Now().Sub(d.lastRotated) > d.retainDuration {
  81. d.retainedCachedObjs = d.cachedObjs
  82. d.cachedObjs = nil
  83. d.lastRotated = time.Now()
  84. }
  85. d.CompareObjects()
  86. select {
  87. case <-stopCh:
  88. return
  89. case <-time.After(d.period):
  90. }
  91. }
  92. }
  93. // AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object
  94. // but that covers the vast majority of our cached objects
  95. func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
  96. if _, ok := obj.(DeletedFinalStateUnknown); ok {
  97. return
  98. }
  99. if obj, ok := obj.(runtime.Object); ok {
  100. copiedObj := obj.DeepCopyObject()
  101. d.addedObjsLock.Lock()
  102. defer d.addedObjsLock.Unlock()
  103. d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
  104. }
  105. }
  106. func (d *defaultCacheMutationDetector) CompareObjects() {
  107. d.compareObjectsLock.Lock()
  108. defer d.compareObjectsLock.Unlock()
  109. // move addedObjs into cachedObjs under lock
  110. // this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
  111. d.addedObjsLock.Lock()
  112. d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
  113. d.addedObjs = nil
  114. d.addedObjsLock.Unlock()
  115. altered := false
  116. for i, obj := range d.cachedObjs {
  117. if !reflect.DeepEqual(obj.cached, obj.copied) {
  118. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
  119. altered = true
  120. }
  121. }
  122. for i, obj := range d.retainedCachedObjs {
  123. if !reflect.DeepEqual(obj.cached, obj.copied) {
  124. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
  125. altered = true
  126. }
  127. }
  128. if altered {
  129. msg := fmt.Sprintf("cache %s modified", d.name)
  130. if d.failureFunc != nil {
  131. d.failureFunc(msg)
  132. return
  133. }
  134. panic(msg)
  135. }
  136. }