extender.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/client/restclient"
  23. utilnet "k8s.io/kubernetes/pkg/util/net"
  24. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
  25. schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
  26. )
  27. const (
  28. DefaultExtenderTimeout = 5 * time.Second
  29. )
  30. // HTTPExtender implements the algorithm.SchedulerExtender interface.
  31. type HTTPExtender struct {
  32. extenderURL string
  33. filterVerb string
  34. prioritizeVerb string
  35. weight int
  36. apiVersion string
  37. client *http.Client
  38. }
  39. func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
  40. var cfg restclient.Config
  41. if config.TLSConfig != nil {
  42. cfg.TLSClientConfig = *config.TLSConfig
  43. }
  44. if config.EnableHttps {
  45. hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
  46. if !hasCA {
  47. cfg.Insecure = true
  48. }
  49. }
  50. tlsConfig, err := restclient.TLSConfigFor(&cfg)
  51. if err != nil {
  52. return nil, err
  53. }
  54. if tlsConfig != nil {
  55. return utilnet.SetTransportDefaults(&http.Transport{
  56. TLSClientConfig: tlsConfig,
  57. }), nil
  58. }
  59. return utilnet.SetTransportDefaults(&http.Transport{}), nil
  60. }
  61. func NewHTTPExtender(config *schedulerapi.ExtenderConfig, apiVersion string) (algorithm.SchedulerExtender, error) {
  62. if config.HTTPTimeout.Nanoseconds() == 0 {
  63. config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
  64. }
  65. transport, err := makeTransport(config)
  66. if err != nil {
  67. return nil, err
  68. }
  69. client := &http.Client{
  70. Transport: transport,
  71. Timeout: config.HTTPTimeout,
  72. }
  73. return &HTTPExtender{
  74. extenderURL: config.URLPrefix,
  75. apiVersion: apiVersion,
  76. filterVerb: config.FilterVerb,
  77. prioritizeVerb: config.PrioritizeVerb,
  78. weight: config.Weight,
  79. client: client,
  80. }, nil
  81. }
  82. // Filter based on extender implemented predicate functions. The filtered list is
  83. // expected to be a subset of the supplied list. failedNodesMap optionally contains
  84. // the list of failed nodes and failure reasons.
  85. func (h *HTTPExtender) Filter(pod *api.Pod, nodes []*api.Node) ([]*api.Node, schedulerapi.FailedNodesMap, error) {
  86. var result schedulerapi.ExtenderFilterResult
  87. if h.filterVerb == "" {
  88. return nodes, schedulerapi.FailedNodesMap{}, nil
  89. }
  90. nodeItems := make([]api.Node, 0, len(nodes))
  91. for _, node := range nodes {
  92. nodeItems = append(nodeItems, *node)
  93. }
  94. args := schedulerapi.ExtenderArgs{
  95. Pod: *pod,
  96. Nodes: api.NodeList{Items: nodeItems},
  97. }
  98. if err := h.send(h.filterVerb, &args, &result); err != nil {
  99. return nil, nil, err
  100. }
  101. if result.Error != "" {
  102. return nil, nil, fmt.Errorf(result.Error)
  103. }
  104. nodeResult := make([]*api.Node, 0, len(result.Nodes.Items))
  105. for i := range result.Nodes.Items {
  106. nodeResult = append(nodeResult, &result.Nodes.Items[i])
  107. }
  108. return nodeResult, result.FailedNodes, nil
  109. }
  110. // Prioritize based on extender implemented priority functions. Weight*priority is added
  111. // up for each such priority function. The returned score is added to the score computed
  112. // by Kubernetes scheduler. The total score is used to do the host selection.
  113. func (h *HTTPExtender) Prioritize(pod *api.Pod, nodes []*api.Node) (*schedulerapi.HostPriorityList, int, error) {
  114. var result schedulerapi.HostPriorityList
  115. if h.prioritizeVerb == "" {
  116. result := schedulerapi.HostPriorityList{}
  117. for _, node := range nodes {
  118. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
  119. }
  120. return &result, 0, nil
  121. }
  122. nodeItems := make([]api.Node, 0, len(nodes))
  123. for _, node := range nodes {
  124. nodeItems = append(nodeItems, *node)
  125. }
  126. args := schedulerapi.ExtenderArgs{
  127. Pod: *pod,
  128. Nodes: api.NodeList{Items: nodeItems},
  129. }
  130. if err := h.send(h.prioritizeVerb, &args, &result); err != nil {
  131. return nil, 0, err
  132. }
  133. return &result, h.weight, nil
  134. }
  135. // Helper function to send messages to the extender
  136. func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
  137. out, err := json.Marshal(args)
  138. if err != nil {
  139. return err
  140. }
  141. url := h.extenderURL + "/" + h.apiVersion + "/" + action
  142. req, err := http.NewRequest("POST", url, bytes.NewReader(out))
  143. if err != nil {
  144. return err
  145. }
  146. req.Header.Set("Content-Type", "application/json")
  147. resp, err := h.client.Do(req)
  148. if err != nil {
  149. return err
  150. }
  151. defer resp.Body.Close()
  152. body, err := ioutil.ReadAll(resp.Body)
  153. if err != nil {
  154. return err
  155. }
  156. if err := json.Unmarshal(body, result); err != nil {
  157. return err
  158. }
  159. return nil
  160. }