discoverysummarizer.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package discoverysummarizer
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. config "k8s.io/kubernetes/cmd/kubernetes-discovery/discoverysummarizer/apis/config/v1alpha1"
  20. "k8s.io/kubernetes/pkg/api/unversioned"
  21. )
  22. type DiscoverySummarizer interface {
  23. Run(port string) error
  24. }
  25. type discoverySummarizerServer struct {
  26. // The list of servers as read from the config file.
  27. serverList config.FederatedServerList
  28. groupVersionPaths map[string][]string
  29. legacyVersionPaths map[string][]string
  30. }
  31. // Ensure that discoverySummarizerServer implements DiscoverySummarizer interface.
  32. var _ DiscoverySummarizer = &discoverySummarizerServer{}
  33. // Creates a server to summarize all group versions
  34. // supported by the servers mentioned in the given config file.
  35. // Call Run() to bring up the server.
  36. func NewDiscoverySummarizer(configFilePath string) (DiscoverySummarizer, error) {
  37. file, err := ioutil.ReadFile(configFilePath)
  38. if err != nil {
  39. return nil, fmt.Errorf("Error in reading config file: %v\n", err)
  40. }
  41. ds := discoverySummarizerServer{
  42. groupVersionPaths: map[string][]string{},
  43. legacyVersionPaths: map[string][]string{},
  44. }
  45. err = json.Unmarshal(file, &ds.serverList)
  46. if err != nil {
  47. return nil, fmt.Errorf("Error in marshalling config file to json: %v\n", err)
  48. }
  49. for _, server := range ds.serverList.Servers {
  50. for _, groupVersionPath := range server.GroupVersionDiscoveryPaths {
  51. if groupVersionPath.IsLegacy {
  52. ds.legacyVersionPaths[groupVersionPath.Path] = append(ds.legacyVersionPaths[groupVersionPath.Path], server.ServerAddress)
  53. } else {
  54. ds.groupVersionPaths[groupVersionPath.Path] = append(ds.groupVersionPaths[groupVersionPath.Path], server.ServerAddress)
  55. }
  56. }
  57. }
  58. return &ds, nil
  59. }
  60. // Brings up the server at the given port.
  61. // TODO: Add HTTPS support.
  62. func (ds *discoverySummarizerServer) Run(port string) error {
  63. http.HandleFunc("/", ds.indexHandler)
  64. // Register a handler for all paths.
  65. for path := range ds.groupVersionPaths {
  66. p := path
  67. fmt.Printf("setting up a handler for %s\n", p)
  68. http.HandleFunc(p, ds.summarizeGroupVersionsHandler(p))
  69. }
  70. for path := range ds.legacyVersionPaths {
  71. p := path
  72. fmt.Printf("setting up a handler for %s\n", p)
  73. http.HandleFunc(p, ds.summarizeLegacyVersionsHandler(p))
  74. }
  75. fmt.Printf("Server running on port %s\n", port)
  76. return http.ListenAndServe(":"+port, nil)
  77. }
  78. // Handler for "/"
  79. func (ds *discoverySummarizerServer) indexHandler(w http.ResponseWriter, r *http.Request) {
  80. if r.URL.Path != "/" {
  81. w.WriteHeader(http.StatusNotFound)
  82. return
  83. }
  84. w.WriteHeader(http.StatusOK)
  85. w.Write([]byte("Success"))
  86. }
  87. // Handler for group versions summarizer.
  88. func (ds *discoverySummarizerServer) summarizeGroupVersionsHandler(path string) func(http.ResponseWriter, *http.Request) {
  89. return func(w http.ResponseWriter, r *http.Request) {
  90. var apiGroupList *unversioned.APIGroupList
  91. // TODO: We can cache calls to all servers.
  92. groups := make(chan *unversioned.APIGroupList)
  93. errorChannel := make(chan error)
  94. for _, serverAddress := range ds.groupVersionPaths[path] {
  95. addr := serverAddress
  96. go func(groups chan *unversioned.APIGroupList, error_channel chan error) {
  97. groupList, err := ds.getAPIGroupList(addr + path)
  98. if err != nil {
  99. errorChannel <- err
  100. return
  101. }
  102. groups <- groupList
  103. return
  104. }(groups, errorChannel)
  105. }
  106. var groupList *unversioned.APIGroupList
  107. var err error
  108. for range ds.groupVersionPaths[path] {
  109. select {
  110. case groupList = <-groups:
  111. if apiGroupList == nil {
  112. apiGroupList = &unversioned.APIGroupList{}
  113. *apiGroupList = *groupList
  114. } else {
  115. apiGroupList.Groups = append(apiGroupList.Groups, groupList.Groups...)
  116. }
  117. case err = <-errorChannel:
  118. ds.writeErr(http.StatusBadGateway, err, w)
  119. return
  120. }
  121. }
  122. ds.writeRawJSON(http.StatusOK, *apiGroupList, w)
  123. return
  124. }
  125. }
  126. // Handler for legacy versions summarizer.
  127. func (ds *discoverySummarizerServer) summarizeLegacyVersionsHandler(path string) func(http.ResponseWriter, *http.Request) {
  128. return func(w http.ResponseWriter, r *http.Request) {
  129. if len(ds.legacyVersionPaths[path]) > 1 {
  130. err := fmt.Errorf("invalid multiple servers serving legacy group %v", ds.legacyVersionPaths[path])
  131. ds.writeErr(http.StatusInternalServerError, err, w)
  132. }
  133. serverAddress := ds.legacyVersionPaths[path][0]
  134. apiVersions, err := ds.getAPIVersions(serverAddress + path)
  135. if err != nil {
  136. ds.writeErr(http.StatusBadGateway, err, w)
  137. return
  138. }
  139. ds.writeRawJSON(http.StatusOK, apiVersions, w)
  140. return
  141. }
  142. }
  143. func (ds *discoverySummarizerServer) getAPIGroupList(serverAddress string) (*unversioned.APIGroupList, error) {
  144. response, err := http.Get(serverAddress)
  145. if err != nil {
  146. return nil, fmt.Errorf("Error in fetching %s: %v", serverAddress, err)
  147. }
  148. defer response.Body.Close()
  149. contents, err := ioutil.ReadAll(response.Body)
  150. if err != nil {
  151. return nil, fmt.Errorf("Error reading response from %s: %v", serverAddress, err)
  152. }
  153. var apiGroupList unversioned.APIGroupList
  154. err = json.Unmarshal(contents, &apiGroupList)
  155. if err != nil {
  156. return nil, fmt.Errorf("Error in unmarshalling response from server %s: %v", serverAddress, err)
  157. }
  158. return &apiGroupList, nil
  159. }
  160. func (ds *discoverySummarizerServer) getAPIVersions(serverAddress string) (*unversioned.APIVersions, error) {
  161. response, err := http.Get(serverAddress)
  162. if err != nil {
  163. return nil, fmt.Errorf("Error in fetching %s: %v", serverAddress, err)
  164. }
  165. defer response.Body.Close()
  166. contents, err := ioutil.ReadAll(response.Body)
  167. if err != nil {
  168. return nil, fmt.Errorf("Error reading response from %s: %v", serverAddress, err)
  169. }
  170. var apiVersions unversioned.APIVersions
  171. err = json.Unmarshal(contents, &apiVersions)
  172. if err != nil {
  173. return nil, fmt.Errorf("Error in unmarshalling response from server %s: %v", serverAddress, err)
  174. }
  175. return &apiVersions, nil
  176. }
  177. // TODO: Pass a runtime.Object here instead of interface{} and use the encoding/decoding stack from kubernetes apiserver.
  178. func (ds *discoverySummarizerServer) writeRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
  179. output, err := json.MarshalIndent(object, "", " ")
  180. if err != nil {
  181. http.Error(w, err.Error(), http.StatusInternalServerError)
  182. return
  183. }
  184. w.Header().Set("Content-Type", "application/json")
  185. w.WriteHeader(statusCode)
  186. w.Write(output)
  187. }
  188. func (ds *discoverySummarizerServer) writeErr(statusCode int, err error, w http.ResponseWriter) {
  189. http.Error(w, err.Error(), statusCode)
  190. }