http2_client.go 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "golang.org/x/net/http2/hpack"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal"
  35. "google.golang.org/grpc/internal/channelz"
  36. "google.golang.org/grpc/internal/syscall"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/metadata"
  39. "google.golang.org/grpc/peer"
  40. "google.golang.org/grpc/stats"
  41. "google.golang.org/grpc/status"
  42. )
  43. // clientConnectionCounter counts the number of connections a client has
  44. // initiated (equal to the number of http2Clients created). Must be accessed
  45. // atomically.
  46. var clientConnectionCounter uint64
  47. // http2Client implements the ClientTransport interface with HTTP2.
  48. type http2Client struct {
  49. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  50. ctx context.Context
  51. cancel context.CancelFunc
  52. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  53. userAgent string
  54. md interface{}
  55. conn net.Conn // underlying communication channel
  56. loopy *loopyWriter
  57. remoteAddr net.Addr
  58. localAddr net.Addr
  59. authInfo credentials.AuthInfo // auth info about the connection
  60. readerDone chan struct{} // sync point to enable testing.
  61. writerDone chan struct{} // sync point to enable testing.
  62. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  63. // that the server sent GoAway on this transport.
  64. goAway chan struct{}
  65. framer *framer
  66. // controlBuf delivers all the control related tasks (e.g., window
  67. // updates, reset streams, and various settings) to the controller.
  68. controlBuf *controlBuffer
  69. fc *trInFlow
  70. // The scheme used: https if TLS is on, http otherwise.
  71. scheme string
  72. isSecure bool
  73. perRPCCreds []credentials.PerRPCCredentials
  74. kp keepalive.ClientParameters
  75. keepaliveEnabled bool
  76. statsHandler stats.Handler
  77. initialWindowSize int32
  78. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  79. maxSendHeaderListSize *uint32
  80. bdpEst *bdpEstimator
  81. // onPrefaceReceipt is a callback that client transport calls upon
  82. // receiving server preface to signal that a succefull HTTP2
  83. // connection was established.
  84. onPrefaceReceipt func()
  85. maxConcurrentStreams uint32
  86. streamQuota int64
  87. streamsQuotaAvailable chan struct{}
  88. waitingStreams uint32
  89. nextID uint32
  90. mu sync.Mutex // guard the following variables
  91. state transportState
  92. activeStreams map[uint32]*Stream
  93. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  94. prevGoAwayID uint32
  95. // goAwayReason records the http2.ErrCode and debug data received with the
  96. // GoAway frame.
  97. goAwayReason GoAwayReason
  98. // A condition variable used to signal when the keepalive goroutine should
  99. // go dormant. The condition for dormancy is based on the number of active
  100. // streams and the `PermitWithoutStream` keepalive client parameter. And
  101. // since the number of active streams is guarded by the above mutex, we use
  102. // the same for this condition variable as well.
  103. kpDormancyCond *sync.Cond
  104. // A boolean to track whether the keepalive goroutine is dormant or not.
  105. // This is checked before attempting to signal the above condition
  106. // variable.
  107. kpDormant bool
  108. // Fields below are for channelz metric collection.
  109. channelzID int64 // channelz unique identification number
  110. czData *channelzData
  111. onGoAway func(GoAwayReason)
  112. onClose func()
  113. bufferPool *bufferPool
  114. connectionID uint64
  115. }
  116. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  117. if fn != nil {
  118. return fn(ctx, addr)
  119. }
  120. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  121. }
  122. func isTemporary(err error) bool {
  123. switch err := err.(type) {
  124. case interface {
  125. Temporary() bool
  126. }:
  127. return err.Temporary()
  128. case interface {
  129. Timeout() bool
  130. }:
  131. // Timeouts may be resolved upon retry, and are thus treated as
  132. // temporary.
  133. return err.Timeout()
  134. }
  135. return true
  136. }
  137. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  138. // and starts to receive messages on it. Non-nil error returns if construction
  139. // fails.
  140. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  141. scheme := "http"
  142. ctx, cancel := context.WithCancel(ctx)
  143. defer func() {
  144. if err != nil {
  145. cancel()
  146. }
  147. }()
  148. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  149. if err != nil {
  150. if opts.FailOnNonTempDialError {
  151. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  152. }
  153. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  154. }
  155. // Any further errors will close the underlying connection
  156. defer func(conn net.Conn) {
  157. if err != nil {
  158. conn.Close()
  159. }
  160. }(conn)
  161. kp := opts.KeepaliveParams
  162. // Validate keepalive parameters.
  163. if kp.Time == 0 {
  164. kp.Time = defaultClientKeepaliveTime
  165. }
  166. if kp.Timeout == 0 {
  167. kp.Timeout = defaultClientKeepaliveTimeout
  168. }
  169. keepaliveEnabled := false
  170. if kp.Time != infinity {
  171. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  172. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  173. }
  174. keepaliveEnabled = true
  175. }
  176. var (
  177. isSecure bool
  178. authInfo credentials.AuthInfo
  179. )
  180. transportCreds := opts.TransportCredentials
  181. perRPCCreds := opts.PerRPCCredentials
  182. if b := opts.CredsBundle; b != nil {
  183. if t := b.TransportCredentials(); t != nil {
  184. transportCreds = t
  185. }
  186. if t := b.PerRPCCredentials(); t != nil {
  187. perRPCCreds = append(perRPCCreds, t)
  188. }
  189. }
  190. if transportCreds != nil {
  191. scheme = "https"
  192. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  193. if err != nil {
  194. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  195. }
  196. isSecure = true
  197. }
  198. dynamicWindow := true
  199. icwz := int32(initialWindowSize)
  200. if opts.InitialConnWindowSize >= defaultWindowSize {
  201. icwz = opts.InitialConnWindowSize
  202. dynamicWindow = false
  203. }
  204. writeBufSize := opts.WriteBufferSize
  205. readBufSize := opts.ReadBufferSize
  206. maxHeaderListSize := defaultClientMaxHeaderListSize
  207. if opts.MaxHeaderListSize != nil {
  208. maxHeaderListSize = *opts.MaxHeaderListSize
  209. }
  210. t := &http2Client{
  211. ctx: ctx,
  212. ctxDone: ctx.Done(), // Cache Done chan.
  213. cancel: cancel,
  214. userAgent: opts.UserAgent,
  215. md: addr.Metadata,
  216. conn: conn,
  217. remoteAddr: conn.RemoteAddr(),
  218. localAddr: conn.LocalAddr(),
  219. authInfo: authInfo,
  220. readerDone: make(chan struct{}),
  221. writerDone: make(chan struct{}),
  222. goAway: make(chan struct{}),
  223. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  224. fc: &trInFlow{limit: uint32(icwz)},
  225. scheme: scheme,
  226. activeStreams: make(map[uint32]*Stream),
  227. isSecure: isSecure,
  228. perRPCCreds: perRPCCreds,
  229. kp: kp,
  230. statsHandler: opts.StatsHandler,
  231. initialWindowSize: initialWindowSize,
  232. onPrefaceReceipt: onPrefaceReceipt,
  233. nextID: 1,
  234. maxConcurrentStreams: defaultMaxStreamsClient,
  235. streamQuota: defaultMaxStreamsClient,
  236. streamsQuotaAvailable: make(chan struct{}, 1),
  237. czData: new(channelzData),
  238. onGoAway: onGoAway,
  239. onClose: onClose,
  240. keepaliveEnabled: keepaliveEnabled,
  241. bufferPool: newBufferPool(),
  242. }
  243. t.controlBuf = newControlBuffer(t.ctxDone)
  244. if opts.InitialWindowSize >= defaultWindowSize {
  245. t.initialWindowSize = opts.InitialWindowSize
  246. dynamicWindow = false
  247. }
  248. if dynamicWindow {
  249. t.bdpEst = &bdpEstimator{
  250. bdp: initialWindowSize,
  251. updateFlowControl: t.updateFlowControl,
  252. }
  253. }
  254. if t.statsHandler != nil {
  255. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  256. RemoteAddr: t.remoteAddr,
  257. LocalAddr: t.localAddr,
  258. })
  259. connBegin := &stats.ConnBegin{
  260. Client: true,
  261. }
  262. t.statsHandler.HandleConn(t.ctx, connBegin)
  263. }
  264. if channelz.IsOn() {
  265. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  266. }
  267. if t.keepaliveEnabled {
  268. t.kpDormancyCond = sync.NewCond(&t.mu)
  269. go t.keepalive()
  270. }
  271. // Start the reader goroutine for incoming message. Each transport has
  272. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  273. // dispatches the frame to the corresponding stream entity.
  274. go t.reader()
  275. // Send connection preface to server.
  276. n, err := t.conn.Write(clientPreface)
  277. if err != nil {
  278. t.Close()
  279. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  280. }
  281. if n != len(clientPreface) {
  282. t.Close()
  283. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  284. }
  285. var ss []http2.Setting
  286. if t.initialWindowSize != defaultWindowSize {
  287. ss = append(ss, http2.Setting{
  288. ID: http2.SettingInitialWindowSize,
  289. Val: uint32(t.initialWindowSize),
  290. })
  291. }
  292. if opts.MaxHeaderListSize != nil {
  293. ss = append(ss, http2.Setting{
  294. ID: http2.SettingMaxHeaderListSize,
  295. Val: *opts.MaxHeaderListSize,
  296. })
  297. }
  298. err = t.framer.fr.WriteSettings(ss...)
  299. if err != nil {
  300. t.Close()
  301. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  302. }
  303. // Adjust the connection flow control window if needed.
  304. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  305. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  306. t.Close()
  307. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  308. }
  309. }
  310. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  311. if err := t.framer.writer.Flush(); err != nil {
  312. return nil, err
  313. }
  314. go func() {
  315. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  316. err := t.loopy.run()
  317. if err != nil {
  318. errorf("transport: loopyWriter.run returning. Err: %v", err)
  319. }
  320. // If it's a connection error, let reader goroutine handle it
  321. // since there might be data in the buffers.
  322. if _, ok := err.(net.Error); !ok {
  323. t.conn.Close()
  324. }
  325. close(t.writerDone)
  326. }()
  327. return t, nil
  328. }
  329. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  330. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  331. s := &Stream{
  332. ct: t,
  333. done: make(chan struct{}),
  334. method: callHdr.Method,
  335. sendCompress: callHdr.SendCompress,
  336. buf: newRecvBuffer(),
  337. headerChan: make(chan struct{}),
  338. contentSubtype: callHdr.ContentSubtype,
  339. }
  340. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  341. s.requestRead = func(n int) {
  342. t.adjustWindow(s, uint32(n))
  343. }
  344. // The client side stream context should have exactly the same life cycle with the user provided context.
  345. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  346. // So we use the original context here instead of creating a copy.
  347. s.ctx = ctx
  348. s.trReader = &transportReader{
  349. reader: &recvBufferReader{
  350. ctx: s.ctx,
  351. ctxDone: s.ctx.Done(),
  352. recv: s.buf,
  353. closeStream: func(err error) {
  354. t.CloseStream(s, err)
  355. },
  356. freeBuffer: t.bufferPool.put,
  357. },
  358. windowHandler: func(n int) {
  359. t.updateWindow(s, uint32(n))
  360. },
  361. }
  362. return s
  363. }
  364. func (t *http2Client) getPeer() *peer.Peer {
  365. return &peer.Peer{
  366. Addr: t.remoteAddr,
  367. AuthInfo: t.authInfo,
  368. }
  369. }
  370. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  371. aud := t.createAudience(callHdr)
  372. ri := credentials.RequestInfo{
  373. Method: callHdr.Method,
  374. AuthInfo: t.authInfo,
  375. }
  376. ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
  377. authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
  378. if err != nil {
  379. return nil, err
  380. }
  381. callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
  382. if err != nil {
  383. return nil, err
  384. }
  385. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  386. // first and create a slice of that exact size.
  387. // Make the slice of certain predictable size to reduce allocations made by append.
  388. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  389. hfLen += len(authData) + len(callAuthData)
  390. headerFields := make([]hpack.HeaderField, 0, hfLen)
  391. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  392. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  393. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  394. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  395. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  396. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  397. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  398. if callHdr.PreviousAttempts > 0 {
  399. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  400. }
  401. if callHdr.SendCompress != "" {
  402. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  403. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
  404. }
  405. if dl, ok := ctx.Deadline(); ok {
  406. // Send out timeout regardless its value. The server can detect timeout context by itself.
  407. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  408. timeout := time.Until(dl)
  409. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  410. }
  411. for k, v := range authData {
  412. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  413. }
  414. for k, v := range callAuthData {
  415. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  416. }
  417. if b := stats.OutgoingTags(ctx); b != nil {
  418. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  419. }
  420. if b := stats.OutgoingTrace(ctx); b != nil {
  421. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  422. }
  423. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  424. var k string
  425. for k, vv := range md {
  426. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  427. if isReservedHeader(k) {
  428. continue
  429. }
  430. for _, v := range vv {
  431. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  432. }
  433. }
  434. for _, vv := range added {
  435. for i, v := range vv {
  436. if i%2 == 0 {
  437. k = v
  438. continue
  439. }
  440. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  441. if isReservedHeader(k) {
  442. continue
  443. }
  444. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  445. }
  446. }
  447. }
  448. if md, ok := t.md.(*metadata.MD); ok {
  449. for k, vv := range *md {
  450. if isReservedHeader(k) {
  451. continue
  452. }
  453. for _, v := range vv {
  454. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  455. }
  456. }
  457. }
  458. return headerFields, nil
  459. }
  460. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  461. // Create an audience string only if needed.
  462. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  463. return ""
  464. }
  465. // Construct URI required to get auth request metadata.
  466. // Omit port if it is the default one.
  467. host := strings.TrimSuffix(callHdr.Host, ":443")
  468. pos := strings.LastIndex(callHdr.Method, "/")
  469. if pos == -1 {
  470. pos = len(callHdr.Method)
  471. }
  472. return "https://" + host + callHdr.Method[:pos]
  473. }
  474. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  475. if len(t.perRPCCreds) == 0 {
  476. return nil, nil
  477. }
  478. authData := map[string]string{}
  479. for _, c := range t.perRPCCreds {
  480. data, err := c.GetRequestMetadata(ctx, audience)
  481. if err != nil {
  482. if _, ok := status.FromError(err); ok {
  483. return nil, err
  484. }
  485. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  486. }
  487. for k, v := range data {
  488. // Capital header names are illegal in HTTP/2.
  489. k = strings.ToLower(k)
  490. authData[k] = v
  491. }
  492. }
  493. return authData, nil
  494. }
  495. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  496. var callAuthData map[string]string
  497. // Check if credentials.PerRPCCredentials were provided via call options.
  498. // Note: if these credentials are provided both via dial options and call
  499. // options, then both sets of credentials will be applied.
  500. if callCreds := callHdr.Creds; callCreds != nil {
  501. if !t.isSecure && callCreds.RequireTransportSecurity() {
  502. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  503. }
  504. data, err := callCreds.GetRequestMetadata(ctx, audience)
  505. if err != nil {
  506. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  507. }
  508. callAuthData = make(map[string]string, len(data))
  509. for k, v := range data {
  510. // Capital header names are illegal in HTTP/2
  511. k = strings.ToLower(k)
  512. callAuthData[k] = v
  513. }
  514. }
  515. return callAuthData, nil
  516. }
  517. // NewStream creates a stream and registers it into the transport as "active"
  518. // streams.
  519. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  520. ctx = peer.NewContext(ctx, t.getPeer())
  521. headerFields, err := t.createHeaderFields(ctx, callHdr)
  522. if err != nil {
  523. return nil, err
  524. }
  525. s := t.newStream(ctx, callHdr)
  526. cleanup := func(err error) {
  527. if s.swapState(streamDone) == streamDone {
  528. // If it was already done, return.
  529. return
  530. }
  531. // The stream was unprocessed by the server.
  532. atomic.StoreUint32(&s.unprocessed, 1)
  533. s.write(recvMsg{err: err})
  534. close(s.done)
  535. // If headerChan isn't closed, then close it.
  536. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  537. close(s.headerChan)
  538. }
  539. }
  540. hdr := &headerFrame{
  541. hf: headerFields,
  542. endStream: false,
  543. initStream: func(id uint32) error {
  544. t.mu.Lock()
  545. if state := t.state; state != reachable {
  546. t.mu.Unlock()
  547. // Do a quick cleanup.
  548. err := error(errStreamDrain)
  549. if state == closing {
  550. err = ErrConnClosing
  551. }
  552. cleanup(err)
  553. return err
  554. }
  555. t.activeStreams[id] = s
  556. if channelz.IsOn() {
  557. atomic.AddInt64(&t.czData.streamsStarted, 1)
  558. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  559. }
  560. // If the keepalive goroutine has gone dormant, wake it up.
  561. if t.kpDormant {
  562. t.kpDormancyCond.Signal()
  563. }
  564. t.mu.Unlock()
  565. return nil
  566. },
  567. onOrphaned: cleanup,
  568. wq: s.wq,
  569. }
  570. firstTry := true
  571. var ch chan struct{}
  572. checkForStreamQuota := func(it interface{}) bool {
  573. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  574. if firstTry {
  575. t.waitingStreams++
  576. }
  577. ch = t.streamsQuotaAvailable
  578. return false
  579. }
  580. if !firstTry {
  581. t.waitingStreams--
  582. }
  583. t.streamQuota--
  584. h := it.(*headerFrame)
  585. h.streamID = t.nextID
  586. t.nextID += 2
  587. s.id = h.streamID
  588. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  589. if t.streamQuota > 0 && t.waitingStreams > 0 {
  590. select {
  591. case t.streamsQuotaAvailable <- struct{}{}:
  592. default:
  593. }
  594. }
  595. return true
  596. }
  597. var hdrListSizeErr error
  598. checkForHeaderListSize := func(it interface{}) bool {
  599. if t.maxSendHeaderListSize == nil {
  600. return true
  601. }
  602. hdrFrame := it.(*headerFrame)
  603. var sz int64
  604. for _, f := range hdrFrame.hf {
  605. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  606. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  607. return false
  608. }
  609. }
  610. return true
  611. }
  612. for {
  613. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  614. if !checkForStreamQuota(it) {
  615. return false
  616. }
  617. if !checkForHeaderListSize(it) {
  618. return false
  619. }
  620. return true
  621. }, hdr)
  622. if err != nil {
  623. return nil, err
  624. }
  625. if success {
  626. break
  627. }
  628. if hdrListSizeErr != nil {
  629. return nil, hdrListSizeErr
  630. }
  631. firstTry = false
  632. select {
  633. case <-ch:
  634. case <-s.ctx.Done():
  635. return nil, ContextErr(s.ctx.Err())
  636. case <-t.goAway:
  637. return nil, errStreamDrain
  638. case <-t.ctx.Done():
  639. return nil, ErrConnClosing
  640. }
  641. }
  642. if t.statsHandler != nil {
  643. header, _, _ := metadata.FromOutgoingContextRaw(ctx)
  644. outHeader := &stats.OutHeader{
  645. Client: true,
  646. FullMethod: callHdr.Method,
  647. RemoteAddr: t.remoteAddr,
  648. LocalAddr: t.localAddr,
  649. Compression: callHdr.SendCompress,
  650. Header: header.Copy(),
  651. }
  652. t.statsHandler.HandleRPC(s.ctx, outHeader)
  653. }
  654. return s, nil
  655. }
  656. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  657. // This must not be executed in reader's goroutine.
  658. func (t *http2Client) CloseStream(s *Stream, err error) {
  659. var (
  660. rst bool
  661. rstCode http2.ErrCode
  662. )
  663. if err != nil {
  664. rst = true
  665. rstCode = http2.ErrCodeCancel
  666. }
  667. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  668. }
  669. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  670. // Set stream status to done.
  671. if s.swapState(streamDone) == streamDone {
  672. // If it was already done, return. If multiple closeStream calls
  673. // happen simultaneously, wait for the first to finish.
  674. <-s.done
  675. return
  676. }
  677. // status and trailers can be updated here without any synchronization because the stream goroutine will
  678. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  679. // only after updating this.
  680. s.status = st
  681. if len(mdata) > 0 {
  682. s.trailer = mdata
  683. }
  684. if err != nil {
  685. // This will unblock reads eventually.
  686. s.write(recvMsg{err: err})
  687. }
  688. // If headerChan isn't closed, then close it.
  689. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  690. s.noHeaders = true
  691. close(s.headerChan)
  692. }
  693. cleanup := &cleanupStream{
  694. streamID: s.id,
  695. onWrite: func() {
  696. t.mu.Lock()
  697. if t.activeStreams != nil {
  698. delete(t.activeStreams, s.id)
  699. }
  700. t.mu.Unlock()
  701. if channelz.IsOn() {
  702. if eosReceived {
  703. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  704. } else {
  705. atomic.AddInt64(&t.czData.streamsFailed, 1)
  706. }
  707. }
  708. },
  709. rst: rst,
  710. rstCode: rstCode,
  711. }
  712. addBackStreamQuota := func(interface{}) bool {
  713. t.streamQuota++
  714. if t.streamQuota > 0 && t.waitingStreams > 0 {
  715. select {
  716. case t.streamsQuotaAvailable <- struct{}{}:
  717. default:
  718. }
  719. }
  720. return true
  721. }
  722. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  723. // This will unblock write.
  724. close(s.done)
  725. }
  726. // Close kicks off the shutdown process of the transport. This should be called
  727. // only once on a transport. Once it is called, the transport should not be
  728. // accessed any more.
  729. //
  730. // This method blocks until the addrConn that initiated this transport is
  731. // re-connected. This happens because t.onClose() begins reconnect logic at the
  732. // addrConn level and blocks until the addrConn is successfully connected.
  733. func (t *http2Client) Close() error {
  734. t.mu.Lock()
  735. // Make sure we only Close once.
  736. if t.state == closing {
  737. t.mu.Unlock()
  738. return nil
  739. }
  740. // Call t.onClose before setting the state to closing to prevent the client
  741. // from attempting to create new streams ASAP.
  742. t.onClose()
  743. t.state = closing
  744. streams := t.activeStreams
  745. t.activeStreams = nil
  746. if t.kpDormant {
  747. // If the keepalive goroutine is blocked on this condition variable, we
  748. // should unblock it so that the goroutine eventually exits.
  749. t.kpDormancyCond.Signal()
  750. }
  751. t.mu.Unlock()
  752. t.controlBuf.finish()
  753. t.cancel()
  754. err := t.conn.Close()
  755. if channelz.IsOn() {
  756. channelz.RemoveEntry(t.channelzID)
  757. }
  758. // Notify all active streams.
  759. for _, s := range streams {
  760. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  761. }
  762. if t.statsHandler != nil {
  763. connEnd := &stats.ConnEnd{
  764. Client: true,
  765. }
  766. t.statsHandler.HandleConn(t.ctx, connEnd)
  767. }
  768. return err
  769. }
  770. // GracefulClose sets the state to draining, which prevents new streams from
  771. // being created and causes the transport to be closed when the last active
  772. // stream is closed. If there are no active streams, the transport is closed
  773. // immediately. This does nothing if the transport is already draining or
  774. // closing.
  775. func (t *http2Client) GracefulClose() {
  776. t.mu.Lock()
  777. // Make sure we move to draining only from active.
  778. if t.state == draining || t.state == closing {
  779. t.mu.Unlock()
  780. return
  781. }
  782. t.state = draining
  783. active := len(t.activeStreams)
  784. t.mu.Unlock()
  785. if active == 0 {
  786. t.Close()
  787. return
  788. }
  789. t.controlBuf.put(&incomingGoAway{})
  790. }
  791. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  792. // should proceed only if Write returns nil.
  793. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  794. if opts.Last {
  795. // If it's the last message, update stream state.
  796. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  797. return errStreamDone
  798. }
  799. } else if s.getState() != streamActive {
  800. return errStreamDone
  801. }
  802. df := &dataFrame{
  803. streamID: s.id,
  804. endStream: opts.Last,
  805. }
  806. if hdr != nil || data != nil { // If it's not an empty data frame.
  807. // Add some data to grpc message header so that we can equally
  808. // distribute bytes across frames.
  809. emptyLen := http2MaxFrameLen - len(hdr)
  810. if emptyLen > len(data) {
  811. emptyLen = len(data)
  812. }
  813. hdr = append(hdr, data[:emptyLen]...)
  814. data = data[emptyLen:]
  815. df.h, df.d = hdr, data
  816. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  817. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  818. return err
  819. }
  820. }
  821. return t.controlBuf.put(df)
  822. }
  823. func (t *http2Client) getStream(f http2.Frame) *Stream {
  824. t.mu.Lock()
  825. s := t.activeStreams[f.Header().StreamID]
  826. t.mu.Unlock()
  827. return s
  828. }
  829. // adjustWindow sends out extra window update over the initial window size
  830. // of stream if the application is requesting data larger in size than
  831. // the window.
  832. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  833. if w := s.fc.maybeAdjust(n); w > 0 {
  834. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  835. }
  836. }
  837. // updateWindow adjusts the inbound quota for the stream.
  838. // Window updates will be sent out when the cumulative quota
  839. // exceeds the corresponding threshold.
  840. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  841. if w := s.fc.onRead(n); w > 0 {
  842. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  843. }
  844. }
  845. // updateFlowControl updates the incoming flow control windows
  846. // for the transport and the stream based on the current bdp
  847. // estimation.
  848. func (t *http2Client) updateFlowControl(n uint32) {
  849. t.mu.Lock()
  850. for _, s := range t.activeStreams {
  851. s.fc.newLimit(n)
  852. }
  853. t.mu.Unlock()
  854. updateIWS := func(interface{}) bool {
  855. t.initialWindowSize = int32(n)
  856. return true
  857. }
  858. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  859. t.controlBuf.put(&outgoingSettings{
  860. ss: []http2.Setting{
  861. {
  862. ID: http2.SettingInitialWindowSize,
  863. Val: n,
  864. },
  865. },
  866. })
  867. }
  868. func (t *http2Client) handleData(f *http2.DataFrame) {
  869. size := f.Header().Length
  870. var sendBDPPing bool
  871. if t.bdpEst != nil {
  872. sendBDPPing = t.bdpEst.add(size)
  873. }
  874. // Decouple connection's flow control from application's read.
  875. // An update on connection's flow control should not depend on
  876. // whether user application has read the data or not. Such a
  877. // restriction is already imposed on the stream's flow control,
  878. // and therefore the sender will be blocked anyways.
  879. // Decoupling the connection flow control will prevent other
  880. // active(fast) streams from starving in presence of slow or
  881. // inactive streams.
  882. //
  883. if w := t.fc.onData(size); w > 0 {
  884. t.controlBuf.put(&outgoingWindowUpdate{
  885. streamID: 0,
  886. increment: w,
  887. })
  888. }
  889. if sendBDPPing {
  890. // Avoid excessive ping detection (e.g. in an L7 proxy)
  891. // by sending a window update prior to the BDP ping.
  892. if w := t.fc.reset(); w > 0 {
  893. t.controlBuf.put(&outgoingWindowUpdate{
  894. streamID: 0,
  895. increment: w,
  896. })
  897. }
  898. t.controlBuf.put(bdpPing)
  899. }
  900. // Select the right stream to dispatch.
  901. s := t.getStream(f)
  902. if s == nil {
  903. return
  904. }
  905. if size > 0 {
  906. if err := s.fc.onData(size); err != nil {
  907. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  908. return
  909. }
  910. if f.Header().Flags.Has(http2.FlagDataPadded) {
  911. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  912. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  913. }
  914. }
  915. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  916. // guarantee f.Data() is consumed before the arrival of next frame.
  917. // Can this copy be eliminated?
  918. if len(f.Data()) > 0 {
  919. buffer := t.bufferPool.get()
  920. buffer.Reset()
  921. buffer.Write(f.Data())
  922. s.write(recvMsg{buffer: buffer})
  923. }
  924. }
  925. // The server has closed the stream without sending trailers. Record that
  926. // the read direction is closed, and set the status appropriately.
  927. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  928. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  929. }
  930. }
  931. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  932. s := t.getStream(f)
  933. if s == nil {
  934. return
  935. }
  936. if f.ErrCode == http2.ErrCodeRefusedStream {
  937. // The stream was unprocessed by the server.
  938. atomic.StoreUint32(&s.unprocessed, 1)
  939. }
  940. statusCode, ok := http2ErrConvTab[f.ErrCode]
  941. if !ok {
  942. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  943. statusCode = codes.Unknown
  944. }
  945. if statusCode == codes.Canceled {
  946. if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
  947. // Our deadline was already exceeded, and that was likely the cause
  948. // of this cancelation. Alter the status code accordingly.
  949. statusCode = codes.DeadlineExceeded
  950. }
  951. }
  952. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  953. }
  954. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  955. if f.IsAck() {
  956. return
  957. }
  958. var maxStreams *uint32
  959. var ss []http2.Setting
  960. var updateFuncs []func()
  961. f.ForeachSetting(func(s http2.Setting) error {
  962. switch s.ID {
  963. case http2.SettingMaxConcurrentStreams:
  964. maxStreams = new(uint32)
  965. *maxStreams = s.Val
  966. case http2.SettingMaxHeaderListSize:
  967. updateFuncs = append(updateFuncs, func() {
  968. t.maxSendHeaderListSize = new(uint32)
  969. *t.maxSendHeaderListSize = s.Val
  970. })
  971. default:
  972. ss = append(ss, s)
  973. }
  974. return nil
  975. })
  976. if isFirst && maxStreams == nil {
  977. maxStreams = new(uint32)
  978. *maxStreams = math.MaxUint32
  979. }
  980. sf := &incomingSettings{
  981. ss: ss,
  982. }
  983. if maxStreams != nil {
  984. updateStreamQuota := func() {
  985. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  986. t.maxConcurrentStreams = *maxStreams
  987. t.streamQuota += delta
  988. if delta > 0 && t.waitingStreams > 0 {
  989. close(t.streamsQuotaAvailable) // wake all of them up.
  990. t.streamsQuotaAvailable = make(chan struct{}, 1)
  991. }
  992. }
  993. updateFuncs = append(updateFuncs, updateStreamQuota)
  994. }
  995. t.controlBuf.executeAndPut(func(interface{}) bool {
  996. for _, f := range updateFuncs {
  997. f()
  998. }
  999. return true
  1000. }, sf)
  1001. }
  1002. func (t *http2Client) handlePing(f *http2.PingFrame) {
  1003. if f.IsAck() {
  1004. // Maybe it's a BDP ping.
  1005. if t.bdpEst != nil {
  1006. t.bdpEst.calculate(f.Data)
  1007. }
  1008. return
  1009. }
  1010. pingAck := &ping{ack: true}
  1011. copy(pingAck.data[:], f.Data[:])
  1012. t.controlBuf.put(pingAck)
  1013. }
  1014. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  1015. t.mu.Lock()
  1016. if t.state == closing {
  1017. t.mu.Unlock()
  1018. return
  1019. }
  1020. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  1021. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  1022. }
  1023. id := f.LastStreamID
  1024. if id > 0 && id%2 != 1 {
  1025. t.mu.Unlock()
  1026. t.Close()
  1027. return
  1028. }
  1029. // A client can receive multiple GoAways from the server (see
  1030. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1031. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1032. // sent after an RTT delay with the ID of the last stream the server will
  1033. // process.
  1034. //
  1035. // Therefore, when we get the first GoAway we don't necessarily close any
  1036. // streams. While in case of second GoAway we close all streams created after
  1037. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1038. // server was being sent don't get killed.
  1039. select {
  1040. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1041. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1042. if id > t.prevGoAwayID {
  1043. t.mu.Unlock()
  1044. t.Close()
  1045. return
  1046. }
  1047. default:
  1048. t.setGoAwayReason(f)
  1049. close(t.goAway)
  1050. t.controlBuf.put(&incomingGoAway{})
  1051. // Notify the clientconn about the GOAWAY before we set the state to
  1052. // draining, to allow the client to stop attempting to create streams
  1053. // before disallowing new streams on this connection.
  1054. t.onGoAway(t.goAwayReason)
  1055. t.state = draining
  1056. }
  1057. // All streams with IDs greater than the GoAwayId
  1058. // and smaller than the previous GoAway ID should be killed.
  1059. upperLimit := t.prevGoAwayID
  1060. if upperLimit == 0 { // This is the first GoAway Frame.
  1061. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1062. }
  1063. for streamID, stream := range t.activeStreams {
  1064. if streamID > id && streamID <= upperLimit {
  1065. // The stream was unprocessed by the server.
  1066. atomic.StoreUint32(&stream.unprocessed, 1)
  1067. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1068. }
  1069. }
  1070. t.prevGoAwayID = id
  1071. active := len(t.activeStreams)
  1072. t.mu.Unlock()
  1073. if active == 0 {
  1074. t.Close()
  1075. }
  1076. }
  1077. // setGoAwayReason sets the value of t.goAwayReason based
  1078. // on the GoAway frame received.
  1079. // It expects a lock on transport's mutext to be held by
  1080. // the caller.
  1081. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1082. t.goAwayReason = GoAwayNoReason
  1083. switch f.ErrCode {
  1084. case http2.ErrCodeEnhanceYourCalm:
  1085. if string(f.DebugData()) == "too_many_pings" {
  1086. t.goAwayReason = GoAwayTooManyPings
  1087. }
  1088. }
  1089. }
  1090. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1091. t.mu.Lock()
  1092. defer t.mu.Unlock()
  1093. return t.goAwayReason
  1094. }
  1095. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1096. t.controlBuf.put(&incomingWindowUpdate{
  1097. streamID: f.Header().StreamID,
  1098. increment: f.Increment,
  1099. })
  1100. }
  1101. // operateHeaders takes action on the decoded headers.
  1102. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1103. s := t.getStream(frame)
  1104. if s == nil {
  1105. return
  1106. }
  1107. endStream := frame.StreamEnded()
  1108. atomic.StoreUint32(&s.bytesReceived, 1)
  1109. initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
  1110. if !initialHeader && !endStream {
  1111. // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
  1112. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
  1113. t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
  1114. return
  1115. }
  1116. state := &decodeState{}
  1117. // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
  1118. state.data.isGRPC = !initialHeader
  1119. if err := state.decodeHeader(frame); err != nil {
  1120. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
  1121. return
  1122. }
  1123. isHeader := false
  1124. defer func() {
  1125. if t.statsHandler != nil {
  1126. if isHeader {
  1127. inHeader := &stats.InHeader{
  1128. Client: true,
  1129. WireLength: int(frame.Header().Length),
  1130. Header: s.header.Copy(),
  1131. }
  1132. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1133. } else {
  1134. inTrailer := &stats.InTrailer{
  1135. Client: true,
  1136. WireLength: int(frame.Header().Length),
  1137. Trailer: s.trailer.Copy(),
  1138. }
  1139. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1140. }
  1141. }
  1142. }()
  1143. // If headerChan hasn't been closed yet
  1144. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  1145. s.headerValid = true
  1146. if !endStream {
  1147. // HEADERS frame block carries a Response-Headers.
  1148. isHeader = true
  1149. // These values can be set without any synchronization because
  1150. // stream goroutine will read it only after seeing a closed
  1151. // headerChan which we'll close after setting this.
  1152. s.recvCompress = state.data.encoding
  1153. if len(state.data.mdata) > 0 {
  1154. s.header = state.data.mdata
  1155. }
  1156. } else {
  1157. // HEADERS frame block carries a Trailers-Only.
  1158. s.noHeaders = true
  1159. }
  1160. close(s.headerChan)
  1161. }
  1162. if !endStream {
  1163. return
  1164. }
  1165. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1166. rst := s.getState() == streamActive
  1167. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
  1168. }
  1169. // reader runs as a separate goroutine in charge of reading data from network
  1170. // connection.
  1171. //
  1172. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1173. // optimal.
  1174. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1175. func (t *http2Client) reader() {
  1176. defer close(t.readerDone)
  1177. // Check the validity of server preface.
  1178. frame, err := t.framer.fr.ReadFrame()
  1179. if err != nil {
  1180. t.Close() // this kicks off resetTransport, so must be last before return
  1181. return
  1182. }
  1183. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1184. if t.keepaliveEnabled {
  1185. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1186. }
  1187. sf, ok := frame.(*http2.SettingsFrame)
  1188. if !ok {
  1189. t.Close() // this kicks off resetTransport, so must be last before return
  1190. return
  1191. }
  1192. t.onPrefaceReceipt()
  1193. t.handleSettings(sf, true)
  1194. // loop to keep reading incoming messages on this transport.
  1195. for {
  1196. t.controlBuf.throttle()
  1197. frame, err := t.framer.fr.ReadFrame()
  1198. if t.keepaliveEnabled {
  1199. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1200. }
  1201. if err != nil {
  1202. // Abort an active stream if the http2.Framer returns a
  1203. // http2.StreamError. This can happen only if the server's response
  1204. // is malformed http2.
  1205. if se, ok := err.(http2.StreamError); ok {
  1206. t.mu.Lock()
  1207. s := t.activeStreams[se.StreamID]
  1208. t.mu.Unlock()
  1209. if s != nil {
  1210. // use error detail to provide better err message
  1211. code := http2ErrConvTab[se.Code]
  1212. msg := t.framer.fr.ErrorDetail().Error()
  1213. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1214. }
  1215. continue
  1216. } else {
  1217. // Transport error.
  1218. t.Close()
  1219. return
  1220. }
  1221. }
  1222. switch frame := frame.(type) {
  1223. case *http2.MetaHeadersFrame:
  1224. t.operateHeaders(frame)
  1225. case *http2.DataFrame:
  1226. t.handleData(frame)
  1227. case *http2.RSTStreamFrame:
  1228. t.handleRSTStream(frame)
  1229. case *http2.SettingsFrame:
  1230. t.handleSettings(frame, false)
  1231. case *http2.PingFrame:
  1232. t.handlePing(frame)
  1233. case *http2.GoAwayFrame:
  1234. t.handleGoAway(frame)
  1235. case *http2.WindowUpdateFrame:
  1236. t.handleWindowUpdate(frame)
  1237. default:
  1238. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1239. }
  1240. }
  1241. }
  1242. func minTime(a, b time.Duration) time.Duration {
  1243. if a < b {
  1244. return a
  1245. }
  1246. return b
  1247. }
  1248. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1249. func (t *http2Client) keepalive() {
  1250. p := &ping{data: [8]byte{}}
  1251. // True iff a ping has been sent, and no data has been received since then.
  1252. outstandingPing := false
  1253. // Amount of time remaining before which we should receive an ACK for the
  1254. // last sent ping.
  1255. timeoutLeft := time.Duration(0)
  1256. // Records the last value of t.lastRead before we go block on the timer.
  1257. // This is required to check for read activity since then.
  1258. prevNano := time.Now().UnixNano()
  1259. timer := time.NewTimer(t.kp.Time)
  1260. for {
  1261. select {
  1262. case <-timer.C:
  1263. lastRead := atomic.LoadInt64(&t.lastRead)
  1264. if lastRead > prevNano {
  1265. // There has been read activity since the last time we were here.
  1266. outstandingPing = false
  1267. // Next timer should fire at kp.Time seconds from lastRead time.
  1268. timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1269. prevNano = lastRead
  1270. continue
  1271. }
  1272. if outstandingPing && timeoutLeft <= 0 {
  1273. t.Close()
  1274. return
  1275. }
  1276. t.mu.Lock()
  1277. if t.state == closing {
  1278. // If the transport is closing, we should exit from the
  1279. // keepalive goroutine here. If not, we could have a race
  1280. // between the call to Signal() from Close() and the call to
  1281. // Wait() here, whereby the keepalive goroutine ends up
  1282. // blocking on the condition variable which will never be
  1283. // signalled again.
  1284. t.mu.Unlock()
  1285. return
  1286. }
  1287. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1288. // If a ping was sent out previously (because there were active
  1289. // streams at that point) which wasn't acked and its timeout
  1290. // hadn't fired, but we got here and are about to go dormant,
  1291. // we should make sure that we unconditionally send a ping once
  1292. // we awaken.
  1293. outstandingPing = false
  1294. t.kpDormant = true
  1295. t.kpDormancyCond.Wait()
  1296. }
  1297. t.kpDormant = false
  1298. t.mu.Unlock()
  1299. // We get here either because we were dormant and a new stream was
  1300. // created which unblocked the Wait() call, or because the
  1301. // keepalive timer expired. In both cases, we need to send a ping.
  1302. if !outstandingPing {
  1303. if channelz.IsOn() {
  1304. atomic.AddInt64(&t.czData.kpCount, 1)
  1305. }
  1306. t.controlBuf.put(p)
  1307. timeoutLeft = t.kp.Timeout
  1308. outstandingPing = true
  1309. }
  1310. // The amount of time to sleep here is the minimum of kp.Time and
  1311. // timeoutLeft. This will ensure that we wait only for kp.Time
  1312. // before sending out the next ping (for cases where the ping is
  1313. // acked).
  1314. sleepDuration := minTime(t.kp.Time, timeoutLeft)
  1315. timeoutLeft -= sleepDuration
  1316. timer.Reset(sleepDuration)
  1317. case <-t.ctx.Done():
  1318. if !timer.Stop() {
  1319. <-timer.C
  1320. }
  1321. return
  1322. }
  1323. }
  1324. }
  1325. func (t *http2Client) Error() <-chan struct{} {
  1326. return t.ctx.Done()
  1327. }
  1328. func (t *http2Client) GoAway() <-chan struct{} {
  1329. return t.goAway
  1330. }
  1331. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1332. s := channelz.SocketInternalMetric{
  1333. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1334. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1335. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1336. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1337. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1338. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1339. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1340. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1341. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1342. LocalFlowControlWindow: int64(t.fc.getSize()),
  1343. SocketOptions: channelz.GetSocketOption(t.conn),
  1344. LocalAddr: t.localAddr,
  1345. RemoteAddr: t.remoteAddr,
  1346. // RemoteName :
  1347. }
  1348. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1349. s.Security = au.GetSecurityValue()
  1350. }
  1351. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1352. return &s
  1353. }
  1354. func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
  1355. func (t *http2Client) IncrMsgSent() {
  1356. atomic.AddInt64(&t.czData.msgSent, 1)
  1357. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1358. }
  1359. func (t *http2Client) IncrMsgRecv() {
  1360. atomic.AddInt64(&t.czData.msgRecv, 1)
  1361. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1362. }
  1363. func (t *http2Client) getOutFlowWindow() int64 {
  1364. resp := make(chan uint32, 1)
  1365. timer := time.NewTimer(time.Second)
  1366. defer timer.Stop()
  1367. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1368. select {
  1369. case sz := <-resp:
  1370. return int64(sz)
  1371. case <-t.ctxDone:
  1372. return -1
  1373. case <-timer.C:
  1374. return -2
  1375. }
  1376. }