resolver_conn_wrapper.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "strings"
  22. "sync"
  23. "time"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/grpclog"
  27. "google.golang.org/grpc/internal/channelz"
  28. "google.golang.org/grpc/internal/grpcsync"
  29. "google.golang.org/grpc/resolver"
  30. "google.golang.org/grpc/serviceconfig"
  31. )
  32. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  33. // It implements resolver.ClientConn interface.
  34. type ccResolverWrapper struct {
  35. cc *ClientConn
  36. resolverMu sync.Mutex
  37. resolver resolver.Resolver
  38. done *grpcsync.Event
  39. curState resolver.State
  40. pollingMu sync.Mutex
  41. polling chan struct{}
  42. }
  43. // split2 returns the values from strings.SplitN(s, sep, 2).
  44. // If sep is not found, it returns ("", "", false) instead.
  45. func split2(s, sep string) (string, string, bool) {
  46. spl := strings.SplitN(s, sep, 2)
  47. if len(spl) < 2 {
  48. return "", "", false
  49. }
  50. return spl[0], spl[1], true
  51. }
  52. // parseTarget splits target into a struct containing scheme, authority and
  53. // endpoint.
  54. //
  55. // If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
  56. // target}.
  57. func parseTarget(target string) (ret resolver.Target) {
  58. var ok bool
  59. ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  60. if !ok {
  61. return resolver.Target{Endpoint: target}
  62. }
  63. ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  64. if !ok {
  65. return resolver.Target{Endpoint: target}
  66. }
  67. return ret
  68. }
  69. // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
  70. // returns a ccResolverWrapper object which wraps the newly built resolver.
  71. func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  72. ccr := &ccResolverWrapper{
  73. cc: cc,
  74. done: grpcsync.NewEvent(),
  75. }
  76. var credsClone credentials.TransportCredentials
  77. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  78. credsClone = creds.Clone()
  79. }
  80. rbo := resolver.BuildOptions{
  81. DisableServiceConfig: cc.dopts.disableServiceConfig,
  82. DialCreds: credsClone,
  83. CredsBundle: cc.dopts.copts.CredsBundle,
  84. Dialer: cc.dopts.copts.Dialer,
  85. }
  86. var err error
  87. // We need to hold the lock here while we assign to the ccr.resolver field
  88. // to guard against a data race caused by the following code path,
  89. // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
  90. // accessing ccr.resolver which is being assigned here.
  91. ccr.resolverMu.Lock()
  92. defer ccr.resolverMu.Unlock()
  93. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
  94. if err != nil {
  95. return nil, err
  96. }
  97. return ccr, nil
  98. }
  99. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  100. ccr.resolverMu.Lock()
  101. if !ccr.done.HasFired() {
  102. ccr.resolver.ResolveNow(o)
  103. }
  104. ccr.resolverMu.Unlock()
  105. }
  106. func (ccr *ccResolverWrapper) close() {
  107. ccr.resolverMu.Lock()
  108. ccr.resolver.Close()
  109. ccr.done.Fire()
  110. ccr.resolverMu.Unlock()
  111. }
  112. // poll begins or ends asynchronous polling of the resolver based on whether
  113. // err is ErrBadResolverState.
  114. func (ccr *ccResolverWrapper) poll(err error) {
  115. ccr.pollingMu.Lock()
  116. defer ccr.pollingMu.Unlock()
  117. if err != balancer.ErrBadResolverState {
  118. // stop polling
  119. if ccr.polling != nil {
  120. close(ccr.polling)
  121. ccr.polling = nil
  122. }
  123. return
  124. }
  125. if ccr.polling != nil {
  126. // already polling
  127. return
  128. }
  129. p := make(chan struct{})
  130. ccr.polling = p
  131. go func() {
  132. for i := 0; ; i++ {
  133. ccr.resolveNow(resolver.ResolveNowOptions{})
  134. t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
  135. select {
  136. case <-p:
  137. t.Stop()
  138. return
  139. case <-ccr.done.Done():
  140. // Resolver has been closed.
  141. t.Stop()
  142. return
  143. case <-t.C:
  144. select {
  145. case <-p:
  146. return
  147. default:
  148. }
  149. // Timer expired; re-resolve.
  150. }
  151. }
  152. }()
  153. }
  154. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
  155. if ccr.done.HasFired() {
  156. return
  157. }
  158. grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
  159. if channelz.IsOn() {
  160. ccr.addChannelzTraceEvent(s)
  161. }
  162. ccr.curState = s
  163. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  164. }
  165. func (ccr *ccResolverWrapper) ReportError(err error) {
  166. if ccr.done.HasFired() {
  167. return
  168. }
  169. grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
  170. if channelz.IsOn() {
  171. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  172. Desc: fmt.Sprintf("Resolver reported error: %v", err),
  173. Severity: channelz.CtWarning,
  174. })
  175. }
  176. ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
  177. }
  178. // NewAddress is called by the resolver implementation to send addresses to gRPC.
  179. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  180. if ccr.done.HasFired() {
  181. return
  182. }
  183. grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
  184. if channelz.IsOn() {
  185. ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
  186. }
  187. ccr.curState.Addresses = addrs
  188. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  189. }
  190. // NewServiceConfig is called by the resolver implementation to send service
  191. // configs to gRPC.
  192. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  193. if ccr.done.HasFired() {
  194. return
  195. }
  196. grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
  197. if ccr.cc.dopts.disableServiceConfig {
  198. grpclog.Infof("Service config lookups disabled; ignoring config")
  199. return
  200. }
  201. scpr := parseServiceConfig(sc)
  202. if scpr.Err != nil {
  203. grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
  204. if channelz.IsOn() {
  205. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  206. Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
  207. Severity: channelz.CtWarning,
  208. })
  209. }
  210. ccr.poll(balancer.ErrBadResolverState)
  211. return
  212. }
  213. if channelz.IsOn() {
  214. ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
  215. }
  216. ccr.curState.ServiceConfig = scpr
  217. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  218. }
  219. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  220. return parseServiceConfig(scJSON)
  221. }
  222. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  223. var updates []string
  224. var oldSC, newSC *ServiceConfig
  225. var oldOK, newOK bool
  226. if ccr.curState.ServiceConfig != nil {
  227. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  228. }
  229. if s.ServiceConfig != nil {
  230. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  231. }
  232. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  233. updates = append(updates, "service config updated")
  234. }
  235. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  236. updates = append(updates, "resolver returned an empty address list")
  237. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  238. updates = append(updates, "resolver returned new addresses")
  239. }
  240. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  241. Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
  242. Severity: channelz.CtINFO,
  243. })
  244. }