pickfirst.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "google.golang.org/grpc/balancer"
  22. "google.golang.org/grpc/codes"
  23. "google.golang.org/grpc/connectivity"
  24. "google.golang.org/grpc/grpclog"
  25. "google.golang.org/grpc/resolver"
  26. "google.golang.org/grpc/status"
  27. )
  28. // PickFirstBalancerName is the name of the pick_first balancer.
  29. const PickFirstBalancerName = "pick_first"
  30. func newPickfirstBuilder() balancer.Builder {
  31. return &pickfirstBuilder{}
  32. }
  33. type pickfirstBuilder struct{}
  34. func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  35. return &pickfirstBalancer{cc: cc}
  36. }
  37. func (*pickfirstBuilder) Name() string {
  38. return PickFirstBalancerName
  39. }
  40. type pickfirstBalancer struct {
  41. state connectivity.State
  42. cc balancer.ClientConn
  43. sc balancer.SubConn
  44. }
  45. var _ balancer.V2Balancer = &pickfirstBalancer{} // Assert we implement v2
  46. func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  47. if err != nil {
  48. b.ResolverError(err)
  49. return
  50. }
  51. b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) // Ignore error
  52. }
  53. func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  54. b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s})
  55. }
  56. func (b *pickfirstBalancer) ResolverError(err error) {
  57. switch b.state {
  58. case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
  59. // Set a failing picker if we don't have a good picker.
  60. b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
  61. Picker: &picker{err: status.Errorf(codes.Unavailable, "name resolver error: %v", err)}},
  62. )
  63. }
  64. if grpclog.V(2) {
  65. grpclog.Infof("pickfirstBalancer: ResolverError called with error %v", err)
  66. }
  67. }
  68. func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
  69. if len(cs.ResolverState.Addresses) == 0 {
  70. b.ResolverError(errors.New("produced zero addresses"))
  71. return balancer.ErrBadResolverState
  72. }
  73. if b.sc == nil {
  74. var err error
  75. b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
  76. if err != nil {
  77. if grpclog.V(2) {
  78. grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
  79. }
  80. b.state = connectivity.TransientFailure
  81. b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
  82. Picker: &picker{err: status.Errorf(codes.Unavailable, "error creating connection: %v", err)}},
  83. )
  84. return balancer.ErrBadResolverState
  85. }
  86. b.state = connectivity.Idle
  87. b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
  88. b.sc.Connect()
  89. } else {
  90. b.sc.UpdateAddresses(cs.ResolverState.Addresses)
  91. b.sc.Connect()
  92. }
  93. return nil
  94. }
  95. func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
  96. if grpclog.V(2) {
  97. grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
  98. }
  99. if b.sc != sc {
  100. if grpclog.V(2) {
  101. grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
  102. }
  103. return
  104. }
  105. b.state = s.ConnectivityState
  106. if s.ConnectivityState == connectivity.Shutdown {
  107. b.sc = nil
  108. return
  109. }
  110. switch s.ConnectivityState {
  111. case connectivity.Ready, connectivity.Idle:
  112. b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
  113. case connectivity.Connecting:
  114. b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
  115. case connectivity.TransientFailure:
  116. err := balancer.ErrTransientFailure
  117. // TODO: this can be unconditional after the V1 API is removed, as
  118. // SubConnState will always contain a connection error.
  119. if s.ConnectionError != nil {
  120. err = balancer.TransientFailureError(s.ConnectionError)
  121. }
  122. b.cc.UpdateState(balancer.State{
  123. ConnectivityState: s.ConnectivityState,
  124. Picker: &picker{err: err},
  125. })
  126. }
  127. }
  128. func (b *pickfirstBalancer) Close() {
  129. }
  130. type picker struct {
  131. result balancer.PickResult
  132. err error
  133. }
  134. func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
  135. return p.result, p.err
  136. }
  137. func init() {
  138. balancer.Register(newPickfirstBuilder())
  139. }