roundrobin_test.go 31 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package userspace
  14. import (
  15. "net"
  16. "testing"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/proxy"
  19. "k8s.io/kubernetes/pkg/types"
  20. )
  21. func TestValidateWorks(t *testing.T) {
  22. if isValidEndpoint(&hostPortPair{}) {
  23. t.Errorf("Didn't fail for empty set")
  24. }
  25. if isValidEndpoint(&hostPortPair{host: "foobar"}) {
  26. t.Errorf("Didn't fail with invalid port")
  27. }
  28. if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
  29. t.Errorf("Didn't fail with a negative port")
  30. }
  31. if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
  32. t.Errorf("Failed a valid config.")
  33. }
  34. }
  35. func TestFilterWorks(t *testing.T) {
  36. endpoints := []hostPortPair{
  37. {host: "foobar", port: 1},
  38. {host: "foobar", port: 2},
  39. {host: "foobar", port: -1},
  40. {host: "foobar", port: 3},
  41. {host: "foobar", port: -2},
  42. }
  43. filtered := flattenValidEndpoints(endpoints)
  44. if len(filtered) != 3 {
  45. t.Errorf("Failed to filter to the correct size")
  46. }
  47. if filtered[0] != "foobar:1" {
  48. t.Errorf("Index zero is not foobar:1")
  49. }
  50. if filtered[1] != "foobar:2" {
  51. t.Errorf("Index one is not foobar:2")
  52. }
  53. if filtered[2] != "foobar:3" {
  54. t.Errorf("Index two is not foobar:3")
  55. }
  56. }
  57. func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
  58. loadBalancer := NewLoadBalancerRR()
  59. var endpoints []api.Endpoints
  60. loadBalancer.OnEndpointsUpdate(endpoints)
  61. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"}
  62. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  63. if err == nil {
  64. t.Errorf("Didn't fail with non-existent service")
  65. }
  66. if len(endpoint) != 0 {
  67. t.Errorf("Got an endpoint")
  68. }
  69. }
  70. func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  71. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false)
  72. if err != nil {
  73. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  74. }
  75. if endpoint != expected {
  76. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  77. }
  78. }
  79. func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) {
  80. endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true)
  81. if err != nil {
  82. t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
  83. }
  84. if endpoint != expected {
  85. t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
  86. }
  87. }
  88. func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
  89. loadBalancer := NewLoadBalancerRR()
  90. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  91. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  92. if err == nil || len(endpoint) != 0 {
  93. t.Errorf("Didn't fail with non-existent service")
  94. }
  95. endpoints := make([]api.Endpoints, 1)
  96. endpoints[0] = api.Endpoints{
  97. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  98. Subsets: []api.EndpointSubset{{
  99. Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
  100. Ports: []api.EndpointPort{{Name: "p", Port: 40}},
  101. }},
  102. }
  103. loadBalancer.OnEndpointsUpdate(endpoints)
  104. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  105. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  106. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  107. expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
  108. }
  109. func stringsInSlice(haystack []string, needles ...string) bool {
  110. for _, needle := range needles {
  111. found := false
  112. for i := range haystack {
  113. if haystack[i] == needle {
  114. found = true
  115. break
  116. }
  117. }
  118. if found == false {
  119. return false
  120. }
  121. }
  122. return true
  123. }
  124. func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
  125. loadBalancer := NewLoadBalancerRR()
  126. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  127. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  128. if err == nil || len(endpoint) != 0 {
  129. t.Errorf("Didn't fail with non-existent service")
  130. }
  131. endpoints := make([]api.Endpoints, 1)
  132. endpoints[0] = api.Endpoints{
  133. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  134. Subsets: []api.EndpointSubset{{
  135. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  136. Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
  137. }},
  138. }
  139. loadBalancer.OnEndpointsUpdate(endpoints)
  140. shuffledEndpoints := loadBalancer.services[service].endpoints
  141. if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
  142. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  143. }
  144. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  145. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
  146. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
  147. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
  148. }
  149. func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
  150. loadBalancer := NewLoadBalancerRR()
  151. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  152. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  153. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  154. if err == nil || len(endpoint) != 0 {
  155. t.Errorf("Didn't fail with non-existent service")
  156. }
  157. endpoints := make([]api.Endpoints, 1)
  158. endpoints[0] = api.Endpoints{
  159. ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  160. Subsets: []api.EndpointSubset{
  161. {
  162. Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
  163. Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}},
  164. },
  165. {
  166. Addresses: []api.EndpointAddress{{IP: "endpoint3"}},
  167. Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}},
  168. },
  169. },
  170. }
  171. loadBalancer.OnEndpointsUpdate(endpoints)
  172. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  173. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
  174. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  175. }
  176. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  177. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  178. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  179. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  180. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  181. if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
  182. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  183. }
  184. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  185. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  186. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  187. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  188. }
  189. func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  190. loadBalancer := NewLoadBalancerRR()
  191. serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  192. serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"}
  193. endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false)
  194. if err == nil || len(endpoint) != 0 {
  195. t.Errorf("Didn't fail with non-existent service")
  196. }
  197. endpoints := make([]api.Endpoints, 1)
  198. endpoints[0] = api.Endpoints{
  199. ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  200. Subsets: []api.EndpointSubset{
  201. {
  202. Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
  203. Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}},
  204. },
  205. {
  206. Addresses: []api.EndpointAddress{{IP: "endpoint2"}},
  207. Ports: []api.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}},
  208. },
  209. {
  210. Addresses: []api.EndpointAddress{{IP: "endpoint3"}},
  211. Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}},
  212. },
  213. },
  214. }
  215. loadBalancer.OnEndpointsUpdate(endpoints)
  216. shuffledEndpoints := loadBalancer.services[serviceP].endpoints
  217. if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
  218. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  219. }
  220. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  221. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  222. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil)
  223. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  224. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  225. if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
  226. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  227. }
  228. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  229. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  230. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil)
  231. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  232. // Then update the configuration with one fewer endpoints, make sure
  233. // we start in the beginning again
  234. endpoints[0] = api.Endpoints{
  235. ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
  236. Subsets: []api.EndpointSubset{
  237. {
  238. Addresses: []api.EndpointAddress{{IP: "endpoint4"}},
  239. Ports: []api.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}},
  240. },
  241. {
  242. Addresses: []api.EndpointAddress{{IP: "endpoint5"}},
  243. Ports: []api.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}},
  244. },
  245. },
  246. }
  247. loadBalancer.OnEndpointsUpdate(endpoints)
  248. shuffledEndpoints = loadBalancer.services[serviceP].endpoints
  249. if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
  250. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  251. }
  252. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  253. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  254. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil)
  255. expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil)
  256. shuffledEndpoints = loadBalancer.services[serviceQ].endpoints
  257. if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
  258. t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
  259. }
  260. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  261. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  262. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil)
  263. expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil)
  264. // Clear endpoints
  265. endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil}
  266. loadBalancer.OnEndpointsUpdate(endpoints)
  267. endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false)
  268. if err == nil || len(endpoint) != 0 {
  269. t.Errorf("Didn't fail with non-existent service")
  270. }
  271. }
  272. func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  273. loadBalancer := NewLoadBalancerRR()
  274. fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"}
  275. barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"}
  276. endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false)
  277. if err == nil || len(endpoint) != 0 {
  278. t.Errorf("Didn't fail with non-existent service")
  279. }
  280. endpoints := make([]api.Endpoints, 2)
  281. endpoints[0] = api.Endpoints{
  282. ObjectMeta: api.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace},
  283. Subsets: []api.EndpointSubset{
  284. {
  285. Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
  286. Ports: []api.EndpointPort{{Name: "p", Port: 123}},
  287. },
  288. },
  289. }
  290. endpoints[1] = api.Endpoints{
  291. ObjectMeta: api.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace},
  292. Subsets: []api.EndpointSubset{
  293. {
  294. Addresses: []api.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
  295. Ports: []api.EndpointPort{{Name: "p", Port: 456}},
  296. },
  297. },
  298. }
  299. loadBalancer.OnEndpointsUpdate(endpoints)
  300. shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints
  301. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  302. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  303. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil)
  304. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil)
  305. expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil)
  306. shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints
  307. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  308. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  309. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  310. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  311. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  312. // Then update the configuration by removing foo
  313. loadBalancer.OnEndpointsUpdate(endpoints[1:])
  314. endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false)
  315. if err == nil || len(endpoint) != 0 {
  316. t.Errorf("Didn't fail with non-existent service")
  317. }
  318. // but bar is still there, and we continue RR from where we left off.
  319. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  320. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil)
  321. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil)
  322. expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
  323. }
  324. func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
  325. loadBalancer := NewLoadBalancerRR()
  326. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  327. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  328. if err == nil || len(endpoint) != 0 {
  329. t.Errorf("Didn't fail with non-existent service")
  330. }
  331. // Call NewService() before OnEndpointsUpdate()
  332. loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
  333. endpoints := make([]api.Endpoints, 1)
  334. endpoints[0] = api.Endpoints{
  335. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  336. Subsets: []api.EndpointSubset{
  337. {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
  338. {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
  339. {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
  340. },
  341. }
  342. loadBalancer.OnEndpointsUpdate(endpoints)
  343. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  344. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  345. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  346. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  347. if err != nil {
  348. t.Errorf("Didn't find a service for %s: %v", service, err)
  349. }
  350. expectEndpoint(t, loadBalancer, service, ep1, client1)
  351. expectEndpoint(t, loadBalancer, service, ep1, client1)
  352. expectEndpoint(t, loadBalancer, service, ep1, client1)
  353. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  354. if err != nil {
  355. t.Errorf("Didn't find a service for %s: %v", service, err)
  356. }
  357. expectEndpoint(t, loadBalancer, service, ep2, client2)
  358. expectEndpoint(t, loadBalancer, service, ep2, client2)
  359. expectEndpoint(t, loadBalancer, service, ep2, client2)
  360. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  361. if err != nil {
  362. t.Errorf("Didn't find a service for %s: %v", service, err)
  363. }
  364. expectEndpoint(t, loadBalancer, service, ep3, client3)
  365. expectEndpoint(t, loadBalancer, service, ep3, client3)
  366. expectEndpoint(t, loadBalancer, service, ep3, client3)
  367. expectEndpoint(t, loadBalancer, service, ep1, client1)
  368. expectEndpoint(t, loadBalancer, service, ep2, client2)
  369. expectEndpoint(t, loadBalancer, service, ep3, client3)
  370. expectEndpoint(t, loadBalancer, service, ep1, client1)
  371. expectEndpoint(t, loadBalancer, service, ep2, client2)
  372. expectEndpoint(t, loadBalancer, service, ep3, client3)
  373. }
  374. func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
  375. loadBalancer := NewLoadBalancerRR()
  376. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  377. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  378. if err == nil || len(endpoint) != 0 {
  379. t.Errorf("Didn't fail with non-existent service")
  380. }
  381. // Call OnEndpointsUpdate() before NewService()
  382. endpoints := make([]api.Endpoints, 1)
  383. endpoints[0] = api.Endpoints{
  384. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  385. Subsets: []api.EndpointSubset{
  386. {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
  387. {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
  388. },
  389. }
  390. loadBalancer.OnEndpointsUpdate(endpoints)
  391. loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
  392. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  393. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  394. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  395. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  396. if err != nil {
  397. t.Errorf("Didn't find a service for %s: %v", service, err)
  398. }
  399. expectEndpoint(t, loadBalancer, service, ep1, client1)
  400. expectEndpoint(t, loadBalancer, service, ep1, client1)
  401. expectEndpoint(t, loadBalancer, service, ep1, client1)
  402. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  403. if err != nil {
  404. t.Errorf("Didn't find a service for %s: %v", service, err)
  405. }
  406. expectEndpoint(t, loadBalancer, service, ep2, client2)
  407. expectEndpoint(t, loadBalancer, service, ep2, client2)
  408. expectEndpoint(t, loadBalancer, service, ep2, client2)
  409. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  410. if err != nil {
  411. t.Errorf("Didn't find a service for %s: %v", service, err)
  412. }
  413. expectEndpoint(t, loadBalancer, service, ep3, client3)
  414. expectEndpoint(t, loadBalancer, service, ep3, client3)
  415. expectEndpoint(t, loadBalancer, service, ep3, client3)
  416. expectEndpoint(t, loadBalancer, service, ep1, client1)
  417. expectEndpoint(t, loadBalancer, service, ep2, client2)
  418. expectEndpoint(t, loadBalancer, service, ep3, client3)
  419. expectEndpoint(t, loadBalancer, service, ep1, client1)
  420. expectEndpoint(t, loadBalancer, service, ep2, client2)
  421. expectEndpoint(t, loadBalancer, service, ep3, client3)
  422. }
  423. func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
  424. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  425. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  426. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  427. client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0}
  428. client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
  429. client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
  430. loadBalancer := NewLoadBalancerRR()
  431. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  432. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  433. if err == nil || len(endpoint) != 0 {
  434. t.Errorf("Didn't fail with non-existent service")
  435. }
  436. loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
  437. endpoints := make([]api.Endpoints, 1)
  438. endpoints[0] = api.Endpoints{
  439. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  440. Subsets: []api.EndpointSubset{
  441. {
  442. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  443. Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  444. },
  445. },
  446. }
  447. loadBalancer.OnEndpointsUpdate(endpoints)
  448. shuffledEndpoints := loadBalancer.services[service].endpoints
  449. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  450. client1Endpoint := shuffledEndpoints[0]
  451. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  452. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  453. client2Endpoint := shuffledEndpoints[1]
  454. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  455. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  456. client3Endpoint := shuffledEndpoints[2]
  457. endpoints[0] = api.Endpoints{
  458. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  459. Subsets: []api.EndpointSubset{
  460. {
  461. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  462. Ports: []api.EndpointPort{{Port: 1}, {Port: 2}},
  463. },
  464. },
  465. }
  466. loadBalancer.OnEndpointsUpdate(endpoints)
  467. shuffledEndpoints = loadBalancer.services[service].endpoints
  468. if client1Endpoint == "endpoint:3" {
  469. client1Endpoint = shuffledEndpoints[0]
  470. } else if client2Endpoint == "endpoint:3" {
  471. client2Endpoint = shuffledEndpoints[0]
  472. } else if client3Endpoint == "endpoint:3" {
  473. client3Endpoint = shuffledEndpoints[0]
  474. }
  475. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  476. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  477. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  478. endpoints[0] = api.Endpoints{
  479. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  480. Subsets: []api.EndpointSubset{
  481. {
  482. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  483. Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}},
  484. },
  485. },
  486. }
  487. loadBalancer.OnEndpointsUpdate(endpoints)
  488. shuffledEndpoints = loadBalancer.services[service].endpoints
  489. expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
  490. expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
  491. expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
  492. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
  493. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
  494. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
  495. }
  496. func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
  497. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  498. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  499. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  500. loadBalancer := NewLoadBalancerRR()
  501. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  502. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  503. if err == nil || len(endpoint) != 0 {
  504. t.Errorf("Didn't fail with non-existent service")
  505. }
  506. loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
  507. endpoints := make([]api.Endpoints, 1)
  508. endpoints[0] = api.Endpoints{
  509. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  510. Subsets: []api.EndpointSubset{
  511. {
  512. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  513. Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  514. },
  515. },
  516. }
  517. loadBalancer.OnEndpointsUpdate(endpoints)
  518. shuffledEndpoints := loadBalancer.services[service].endpoints
  519. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  520. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  521. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  522. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  523. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
  524. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  525. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  526. // Then update the configuration with one fewer endpoints, make sure
  527. // we start in the beginning again
  528. endpoints[0] = api.Endpoints{
  529. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  530. Subsets: []api.EndpointSubset{
  531. {
  532. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  533. Ports: []api.EndpointPort{{Port: 4}, {Port: 5}},
  534. },
  535. },
  536. }
  537. loadBalancer.OnEndpointsUpdate(endpoints)
  538. shuffledEndpoints = loadBalancer.services[service].endpoints
  539. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  540. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  541. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  542. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
  543. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  544. expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
  545. // Clear endpoints
  546. endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
  547. loadBalancer.OnEndpointsUpdate(endpoints)
  548. endpoint, err = loadBalancer.NextEndpoint(service, nil, false)
  549. if err == nil || len(endpoint) != 0 {
  550. t.Errorf("Didn't fail with non-existent service")
  551. }
  552. }
  553. func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
  554. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  555. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  556. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  557. loadBalancer := NewLoadBalancerRR()
  558. fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  559. endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false)
  560. if err == nil || len(endpoint) != 0 {
  561. t.Errorf("Didn't fail with non-existent service")
  562. }
  563. loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0)
  564. endpoints := make([]api.Endpoints, 2)
  565. endpoints[0] = api.Endpoints{
  566. ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
  567. Subsets: []api.EndpointSubset{
  568. {
  569. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  570. Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
  571. },
  572. },
  573. }
  574. barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""}
  575. loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0)
  576. endpoints[1] = api.Endpoints{
  577. ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
  578. Subsets: []api.EndpointSubset{
  579. {
  580. Addresses: []api.EndpointAddress{{IP: "endpoint"}},
  581. Ports: []api.EndpointPort{{Port: 4}, {Port: 5}},
  582. },
  583. },
  584. }
  585. loadBalancer.OnEndpointsUpdate(endpoints)
  586. shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
  587. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  588. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  589. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  590. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  591. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
  592. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  593. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
  594. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  595. expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
  596. shuffledBarEndpoints := loadBalancer.services[barService].endpoints
  597. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  598. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  599. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  600. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  601. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  602. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  603. // Then update the configuration by removing foo
  604. loadBalancer.OnEndpointsUpdate(endpoints[1:])
  605. endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false)
  606. if err == nil || len(endpoint) != 0 {
  607. t.Errorf("Didn't fail with non-existent service")
  608. }
  609. // but bar is still there, and we continue RR from where we left off.
  610. shuffledBarEndpoints = loadBalancer.services[barService].endpoints
  611. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  612. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  613. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  614. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
  615. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  616. expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
  617. }
  618. func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) {
  619. loadBalancer := NewLoadBalancerRR()
  620. service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""}
  621. endpoint, err := loadBalancer.NextEndpoint(service, nil, false)
  622. if err == nil || len(endpoint) != 0 {
  623. t.Errorf("Didn't fail with non-existent service")
  624. }
  625. // Call NewService() before OnEndpointsUpdate()
  626. loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
  627. endpoints := make([]api.Endpoints, 1)
  628. endpoints[0] = api.Endpoints{
  629. ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
  630. Subsets: []api.EndpointSubset{
  631. {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
  632. {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
  633. {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
  634. },
  635. }
  636. loadBalancer.OnEndpointsUpdate(endpoints)
  637. client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
  638. client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
  639. client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
  640. ep1, err := loadBalancer.NextEndpoint(service, client1, false)
  641. if err != nil {
  642. t.Errorf("Didn't find a service for %s: %v", service, err)
  643. }
  644. ep2, err := loadBalancer.NextEndpoint(service, client2, false)
  645. if err != nil {
  646. t.Errorf("Didn't find a service for %s: %v", service, err)
  647. }
  648. ep3, err := loadBalancer.NextEndpoint(service, client3, false)
  649. if err != nil {
  650. t.Errorf("Didn't find a service for %s: %v", service, err)
  651. }
  652. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1)
  653. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1)
  654. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  655. expectEndpoint(t, loadBalancer, service, ep2, client2)
  656. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  657. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  658. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1)
  659. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2)
  660. expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3)
  661. }