utils.go 8.2 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package testing
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "os"
  21. "path"
  22. "testing"
  23. "time"
  24. "k8s.io/kubernetes/pkg/storage/etcd/testing/testingcert"
  25. "k8s.io/kubernetes/pkg/util/wait"
  26. etcd "github.com/coreos/etcd/client"
  27. "github.com/coreos/etcd/etcdserver"
  28. "github.com/coreos/etcd/etcdserver/api/v2http"
  29. "github.com/coreos/etcd/pkg/testutil"
  30. "github.com/coreos/etcd/pkg/transport"
  31. "github.com/coreos/etcd/pkg/types"
  32. "github.com/golang/glog"
  33. "golang.org/x/net/context"
  34. )
  35. // EtcdTestServer encapsulates the datastructures needed to start local instance for testing
  36. type EtcdTestServer struct {
  37. etcdserver.ServerConfig
  38. PeerListeners, ClientListeners []net.Listener
  39. Client etcd.Client
  40. CertificatesDir string
  41. CertFile string
  42. KeyFile string
  43. CAFile string
  44. raftHandler http.Handler
  45. s *etcdserver.EtcdServer
  46. hss []*httptest.Server
  47. }
  48. // newLocalListener opens a port localhost using any port
  49. func newLocalListener(t *testing.T) net.Listener {
  50. l, err := net.Listen("tcp", "127.0.0.1:0")
  51. if err != nil {
  52. t.Fatal(err)
  53. }
  54. return l
  55. }
  56. // newSecuredLocalListener opens a port localhost using any port
  57. // with SSL enable
  58. func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
  59. var l net.Listener
  60. l, err := net.Listen("tcp", "127.0.0.1:0")
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. tlsInfo := transport.TLSInfo{
  65. CertFile: certFile,
  66. KeyFile: keyFile,
  67. CAFile: caFile,
  68. }
  69. tlscfg, err := tlsInfo.ServerConfig()
  70. if err != nil {
  71. t.Fatalf("unexpected serverConfig error: %v", err)
  72. }
  73. l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. return l
  78. }
  79. func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
  80. tlsInfo := transport.TLSInfo{
  81. CertFile: certFile,
  82. KeyFile: keyFile,
  83. CAFile: caFile,
  84. }
  85. tr, err := transport.NewTransport(tlsInfo, time.Second)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. return tr
  90. }
  91. // configureTestCluster will set the params to start an etcd server
  92. func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
  93. var err error
  94. m := &EtcdTestServer{}
  95. pln := newLocalListener(t)
  96. m.PeerListeners = []net.Listener{pln}
  97. m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
  98. if err != nil {
  99. t.Fatal(err)
  100. }
  101. // Allow test launches to control where etcd data goes, for space or performance reasons
  102. baseDir := os.Getenv("TEST_ETCD_DIR")
  103. if len(baseDir) == 0 {
  104. baseDir = os.TempDir()
  105. }
  106. if https {
  107. m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
  112. if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
  113. t.Fatal(err)
  114. }
  115. m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
  116. if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
  117. t.Fatal(err)
  118. }
  119. m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
  120. if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
  121. t.Fatal(err)
  122. }
  123. cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
  124. m.ClientListeners = []net.Listener{cln}
  125. m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. } else {
  130. cln := newLocalListener(t)
  131. m.ClientListeners = []net.Listener{cln}
  132. m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. }
  137. m.Name = name
  138. m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
  143. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. m.InitialClusterToken = "TestEtcd"
  148. m.NewCluster = true
  149. m.ForceNewCluster = false
  150. m.ElectionTicks = 10
  151. m.TickMs = uint(10)
  152. return m
  153. }
  154. // launch will attempt to start the etcd server
  155. func (m *EtcdTestServer) launch(t *testing.T) error {
  156. var err error
  157. if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
  158. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  159. }
  160. m.s.SyncTicker = time.Tick(500 * time.Millisecond)
  161. m.s.Start()
  162. m.raftHandler = &testutil.PauseableHandler{Next: v2http.NewPeerHandler(m.s)}
  163. for _, ln := range m.PeerListeners {
  164. hs := &httptest.Server{
  165. Listener: ln,
  166. Config: &http.Server{Handler: m.raftHandler},
  167. }
  168. hs.Start()
  169. m.hss = append(m.hss, hs)
  170. }
  171. for _, ln := range m.ClientListeners {
  172. hs := &httptest.Server{
  173. Listener: ln,
  174. Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
  175. }
  176. hs.Start()
  177. m.hss = append(m.hss, hs)
  178. }
  179. return nil
  180. }
  181. // waitForEtcd wait until etcd is propagated correctly
  182. func (m *EtcdTestServer) waitUntilUp() error {
  183. membersAPI := etcd.NewMembersAPI(m.Client)
  184. for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
  185. members, err := membersAPI.List(context.TODO())
  186. if err != nil {
  187. glog.Errorf("Error when getting etcd cluster members")
  188. continue
  189. }
  190. if len(members) == 1 && len(members[0].ClientURLs) > 0 {
  191. return nil
  192. }
  193. }
  194. return fmt.Errorf("timeout on waiting for etcd cluster")
  195. }
  196. // Terminate will shutdown the running etcd server
  197. func (m *EtcdTestServer) Terminate(t *testing.T) {
  198. m.Client = nil
  199. m.s.Stop()
  200. // TODO: This is a pretty ugly hack to workaround races during closing
  201. // in-memory etcd server in unit tests - see #18928 for more details.
  202. // We should get rid of it as soon as we have a proper fix - etcd clients
  203. // have overwritten transport counting opened connections (probably by
  204. // overwriting Dial function) and termination function waiting for all
  205. // connections to be closed and stopping accepting new ones.
  206. time.Sleep(250 * time.Millisecond)
  207. for _, hs := range m.hss {
  208. hs.CloseClientConnections()
  209. hs.Close()
  210. }
  211. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  212. t.Fatal(err)
  213. }
  214. if len(m.CertificatesDir) > 0 {
  215. if err := os.RemoveAll(m.CertificatesDir); err != nil {
  216. t.Fatal(err)
  217. }
  218. }
  219. }
  220. // NewEtcdTestClientServer creates a new client and server for testing
  221. func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
  222. server := configureTestCluster(t, "foo", true)
  223. err := server.launch(t)
  224. if err != nil {
  225. t.Fatalf("Failed to start etcd server error=%v", err)
  226. return nil
  227. }
  228. cfg := etcd.Config{
  229. Endpoints: server.ClientURLs.StringSlice(),
  230. Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
  231. }
  232. server.Client, err = etcd.New(cfg)
  233. if err != nil {
  234. server.Terminate(t)
  235. t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
  236. return nil
  237. }
  238. if err := server.waitUntilUp(); err != nil {
  239. server.Terminate(t)
  240. t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
  241. return nil
  242. }
  243. return server
  244. }
  245. // NewUnsecuredEtcdTestClientServer creates a new client and server for testing
  246. func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
  247. server := configureTestCluster(t, "foo", false)
  248. err := server.launch(t)
  249. if err != nil {
  250. t.Fatalf("Failed to start etcd server error=%v", err)
  251. return nil
  252. }
  253. cfg := etcd.Config{
  254. Endpoints: server.ClientURLs.StringSlice(),
  255. Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
  256. }
  257. server.Client, err = etcd.New(cfg)
  258. if err != nil {
  259. t.Errorf("Unexpected error in NewUnsecuredEtcdTestClientServer (%v)", err)
  260. server.Terminate(t)
  261. return nil
  262. }
  263. if err := server.waitUntilUp(); err != nil {
  264. t.Errorf("Unexpected error in waitUntilUp (%v)", err)
  265. server.Terminate(t)
  266. return nil
  267. }
  268. return server
  269. }