http2_server.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "strconv"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/golang/protobuf/proto"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. spb "google.golang.org/genproto/googleapis/rpc/status"
  35. "google.golang.org/grpc/codes"
  36. "google.golang.org/grpc/credentials"
  37. "google.golang.org/grpc/grpclog"
  38. "google.golang.org/grpc/internal"
  39. "google.golang.org/grpc/internal/channelz"
  40. "google.golang.org/grpc/internal/grpcrand"
  41. "google.golang.org/grpc/keepalive"
  42. "google.golang.org/grpc/metadata"
  43. "google.golang.org/grpc/peer"
  44. "google.golang.org/grpc/stats"
  45. "google.golang.org/grpc/status"
  46. "google.golang.org/grpc/tap"
  47. )
  48. var (
  49. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  50. // the stream's state.
  51. ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  52. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  53. // than the limit set by peer.
  54. ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
  55. // statusRawProto is a function to get to the raw status proto wrapped in a
  56. // status.Status without a proto.Clone().
  57. statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
  58. )
  59. // serverConnectionCounter counts the number of connections a server has seen
  60. // (equal to the number of http2Servers created). Must be accessed atomically.
  61. var serverConnectionCounter uint64
  62. // http2Server implements the ServerTransport interface with HTTP2.
  63. type http2Server struct {
  64. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  65. ctx context.Context
  66. done chan struct{}
  67. conn net.Conn
  68. loopy *loopyWriter
  69. readerDone chan struct{} // sync point to enable testing.
  70. writerDone chan struct{} // sync point to enable testing.
  71. remoteAddr net.Addr
  72. localAddr net.Addr
  73. maxStreamID uint32 // max stream ID ever seen
  74. authInfo credentials.AuthInfo // auth info about the connection
  75. inTapHandle tap.ServerInHandle
  76. framer *framer
  77. // The max number of concurrent streams.
  78. maxStreams uint32
  79. // controlBuf delivers all the control related tasks (e.g., window
  80. // updates, reset streams, and various settings) to the controller.
  81. controlBuf *controlBuffer
  82. fc *trInFlow
  83. stats stats.Handler
  84. // Keepalive and max-age parameters for the server.
  85. kp keepalive.ServerParameters
  86. // Keepalive enforcement policy.
  87. kep keepalive.EnforcementPolicy
  88. // The time instance last ping was received.
  89. lastPingAt time.Time
  90. // Number of times the client has violated keepalive ping policy so far.
  91. pingStrikes uint8
  92. // Flag to signify that number of ping strikes should be reset to 0.
  93. // This is set whenever data or header frames are sent.
  94. // 1 means yes.
  95. resetPingStrikes uint32 // Accessed atomically.
  96. initialWindowSize int32
  97. bdpEst *bdpEstimator
  98. maxSendHeaderListSize *uint32
  99. mu sync.Mutex // guard the following
  100. // drainChan is initialized when drain(...) is called the first time.
  101. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  102. // Then an independent goroutine will be launched to later send the second GoAway.
  103. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  104. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  105. // already underway.
  106. drainChan chan struct{}
  107. state transportState
  108. activeStreams map[uint32]*Stream
  109. // idle is the time instant when the connection went idle.
  110. // This is either the beginning of the connection or when the number of
  111. // RPCs go down to 0.
  112. // When the connection is busy, this value is set to 0.
  113. idle time.Time
  114. // Fields below are for channelz metric collection.
  115. channelzID int64 // channelz unique identification number
  116. czData *channelzData
  117. bufferPool *bufferPool
  118. connectionID uint64
  119. }
  120. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  121. // returned if something goes wrong.
  122. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  123. writeBufSize := config.WriteBufferSize
  124. readBufSize := config.ReadBufferSize
  125. maxHeaderListSize := defaultServerMaxHeaderListSize
  126. if config.MaxHeaderListSize != nil {
  127. maxHeaderListSize = *config.MaxHeaderListSize
  128. }
  129. framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  130. // Send initial settings as connection preface to client.
  131. isettings := []http2.Setting{{
  132. ID: http2.SettingMaxFrameSize,
  133. Val: http2MaxFrameLen,
  134. }}
  135. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  136. // permitted in the HTTP2 spec.
  137. maxStreams := config.MaxStreams
  138. if maxStreams == 0 {
  139. maxStreams = math.MaxUint32
  140. } else {
  141. isettings = append(isettings, http2.Setting{
  142. ID: http2.SettingMaxConcurrentStreams,
  143. Val: maxStreams,
  144. })
  145. }
  146. dynamicWindow := true
  147. iwz := int32(initialWindowSize)
  148. if config.InitialWindowSize >= defaultWindowSize {
  149. iwz = config.InitialWindowSize
  150. dynamicWindow = false
  151. }
  152. icwz := int32(initialWindowSize)
  153. if config.InitialConnWindowSize >= defaultWindowSize {
  154. icwz = config.InitialConnWindowSize
  155. dynamicWindow = false
  156. }
  157. if iwz != defaultWindowSize {
  158. isettings = append(isettings, http2.Setting{
  159. ID: http2.SettingInitialWindowSize,
  160. Val: uint32(iwz)})
  161. }
  162. if config.MaxHeaderListSize != nil {
  163. isettings = append(isettings, http2.Setting{
  164. ID: http2.SettingMaxHeaderListSize,
  165. Val: *config.MaxHeaderListSize,
  166. })
  167. }
  168. if config.HeaderTableSize != nil {
  169. isettings = append(isettings, http2.Setting{
  170. ID: http2.SettingHeaderTableSize,
  171. Val: *config.HeaderTableSize,
  172. })
  173. }
  174. if err := framer.fr.WriteSettings(isettings...); err != nil {
  175. return nil, connectionErrorf(false, err, "transport: %v", err)
  176. }
  177. // Adjust the connection flow control window if needed.
  178. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  179. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  180. return nil, connectionErrorf(false, err, "transport: %v", err)
  181. }
  182. }
  183. kp := config.KeepaliveParams
  184. if kp.MaxConnectionIdle == 0 {
  185. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  186. }
  187. if kp.MaxConnectionAge == 0 {
  188. kp.MaxConnectionAge = defaultMaxConnectionAge
  189. }
  190. // Add a jitter to MaxConnectionAge.
  191. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  192. if kp.MaxConnectionAgeGrace == 0 {
  193. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  194. }
  195. if kp.Time == 0 {
  196. kp.Time = defaultServerKeepaliveTime
  197. }
  198. if kp.Timeout == 0 {
  199. kp.Timeout = defaultServerKeepaliveTimeout
  200. }
  201. kep := config.KeepalivePolicy
  202. if kep.MinTime == 0 {
  203. kep.MinTime = defaultKeepalivePolicyMinTime
  204. }
  205. done := make(chan struct{})
  206. t := &http2Server{
  207. ctx: context.Background(),
  208. done: done,
  209. conn: conn,
  210. remoteAddr: conn.RemoteAddr(),
  211. localAddr: conn.LocalAddr(),
  212. authInfo: config.AuthInfo,
  213. framer: framer,
  214. readerDone: make(chan struct{}),
  215. writerDone: make(chan struct{}),
  216. maxStreams: maxStreams,
  217. inTapHandle: config.InTapHandle,
  218. fc: &trInFlow{limit: uint32(icwz)},
  219. state: reachable,
  220. activeStreams: make(map[uint32]*Stream),
  221. stats: config.StatsHandler,
  222. kp: kp,
  223. idle: time.Now(),
  224. kep: kep,
  225. initialWindowSize: iwz,
  226. czData: new(channelzData),
  227. bufferPool: newBufferPool(),
  228. }
  229. t.controlBuf = newControlBuffer(t.done)
  230. if dynamicWindow {
  231. t.bdpEst = &bdpEstimator{
  232. bdp: initialWindowSize,
  233. updateFlowControl: t.updateFlowControl,
  234. }
  235. }
  236. if t.stats != nil {
  237. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  238. RemoteAddr: t.remoteAddr,
  239. LocalAddr: t.localAddr,
  240. })
  241. connBegin := &stats.ConnBegin{}
  242. t.stats.HandleConn(t.ctx, connBegin)
  243. }
  244. if channelz.IsOn() {
  245. t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
  246. }
  247. t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
  248. t.framer.writer.Flush()
  249. defer func() {
  250. if err != nil {
  251. t.Close()
  252. }
  253. }()
  254. // Check the validity of client preface.
  255. preface := make([]byte, len(clientPreface))
  256. if _, err := io.ReadFull(t.conn, preface); err != nil {
  257. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  258. }
  259. if !bytes.Equal(preface, clientPreface) {
  260. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  261. }
  262. frame, err := t.framer.fr.ReadFrame()
  263. if err == io.EOF || err == io.ErrUnexpectedEOF {
  264. return nil, err
  265. }
  266. if err != nil {
  267. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  268. }
  269. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  270. sf, ok := frame.(*http2.SettingsFrame)
  271. if !ok {
  272. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  273. }
  274. t.handleSettings(sf)
  275. go func() {
  276. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
  277. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  278. if err := t.loopy.run(); err != nil {
  279. errorf("transport: loopyWriter.run returning. Err: %v", err)
  280. }
  281. t.conn.Close()
  282. close(t.writerDone)
  283. }()
  284. go t.keepalive()
  285. return t, nil
  286. }
  287. // operateHeader takes action on the decoded headers.
  288. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
  289. streamID := frame.Header().StreamID
  290. state := &decodeState{
  291. serverSide: true,
  292. }
  293. if err := state.decodeHeader(frame); err != nil {
  294. if se, ok := status.FromError(err); ok {
  295. t.controlBuf.put(&cleanupStream{
  296. streamID: streamID,
  297. rst: true,
  298. rstCode: statusCodeConvTab[se.Code()],
  299. onWrite: func() {},
  300. })
  301. }
  302. return false
  303. }
  304. buf := newRecvBuffer()
  305. s := &Stream{
  306. id: streamID,
  307. st: t,
  308. buf: buf,
  309. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  310. recvCompress: state.data.encoding,
  311. method: state.data.method,
  312. contentSubtype: state.data.contentSubtype,
  313. }
  314. if frame.StreamEnded() {
  315. // s is just created by the caller. No lock needed.
  316. s.state = streamReadDone
  317. }
  318. if state.data.timeoutSet {
  319. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
  320. } else {
  321. s.ctx, s.cancel = context.WithCancel(t.ctx)
  322. }
  323. pr := &peer.Peer{
  324. Addr: t.remoteAddr,
  325. }
  326. // Attach Auth info if there is any.
  327. if t.authInfo != nil {
  328. pr.AuthInfo = t.authInfo
  329. }
  330. s.ctx = peer.NewContext(s.ctx, pr)
  331. // Attach the received metadata to the context.
  332. if len(state.data.mdata) > 0 {
  333. s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
  334. }
  335. if state.data.statsTags != nil {
  336. s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
  337. }
  338. if state.data.statsTrace != nil {
  339. s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
  340. }
  341. if t.inTapHandle != nil {
  342. var err error
  343. info := &tap.Info{
  344. FullMethodName: state.data.method,
  345. }
  346. s.ctx, err = t.inTapHandle(s.ctx, info)
  347. if err != nil {
  348. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  349. t.controlBuf.put(&cleanupStream{
  350. streamID: s.id,
  351. rst: true,
  352. rstCode: http2.ErrCodeRefusedStream,
  353. onWrite: func() {},
  354. })
  355. s.cancel()
  356. return false
  357. }
  358. }
  359. t.mu.Lock()
  360. if t.state != reachable {
  361. t.mu.Unlock()
  362. s.cancel()
  363. return false
  364. }
  365. if uint32(len(t.activeStreams)) >= t.maxStreams {
  366. t.mu.Unlock()
  367. t.controlBuf.put(&cleanupStream{
  368. streamID: streamID,
  369. rst: true,
  370. rstCode: http2.ErrCodeRefusedStream,
  371. onWrite: func() {},
  372. })
  373. s.cancel()
  374. return false
  375. }
  376. if streamID%2 != 1 || streamID <= t.maxStreamID {
  377. t.mu.Unlock()
  378. // illegal gRPC stream id.
  379. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  380. s.cancel()
  381. return true
  382. }
  383. t.maxStreamID = streamID
  384. t.activeStreams[streamID] = s
  385. if len(t.activeStreams) == 1 {
  386. t.idle = time.Time{}
  387. }
  388. t.mu.Unlock()
  389. if channelz.IsOn() {
  390. atomic.AddInt64(&t.czData.streamsStarted, 1)
  391. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  392. }
  393. s.requestRead = func(n int) {
  394. t.adjustWindow(s, uint32(n))
  395. }
  396. s.ctx = traceCtx(s.ctx, s.method)
  397. if t.stats != nil {
  398. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  399. inHeader := &stats.InHeader{
  400. FullMethod: s.method,
  401. RemoteAddr: t.remoteAddr,
  402. LocalAddr: t.localAddr,
  403. Compression: s.recvCompress,
  404. WireLength: int(frame.Header().Length),
  405. Header: metadata.MD(state.data.mdata).Copy(),
  406. }
  407. t.stats.HandleRPC(s.ctx, inHeader)
  408. }
  409. s.ctxDone = s.ctx.Done()
  410. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  411. s.trReader = &transportReader{
  412. reader: &recvBufferReader{
  413. ctx: s.ctx,
  414. ctxDone: s.ctxDone,
  415. recv: s.buf,
  416. freeBuffer: t.bufferPool.put,
  417. },
  418. windowHandler: func(n int) {
  419. t.updateWindow(s, uint32(n))
  420. },
  421. }
  422. // Register the stream with loopy.
  423. t.controlBuf.put(&registerStream{
  424. streamID: s.id,
  425. wq: s.wq,
  426. })
  427. handle(s)
  428. return false
  429. }
  430. // HandleStreams receives incoming streams using the given handler. This is
  431. // typically run in a separate goroutine.
  432. // traceCtx attaches trace to ctx and returns the new context.
  433. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  434. defer close(t.readerDone)
  435. for {
  436. t.controlBuf.throttle()
  437. frame, err := t.framer.fr.ReadFrame()
  438. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  439. if err != nil {
  440. if se, ok := err.(http2.StreamError); ok {
  441. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  442. t.mu.Lock()
  443. s := t.activeStreams[se.StreamID]
  444. t.mu.Unlock()
  445. if s != nil {
  446. t.closeStream(s, true, se.Code, false)
  447. } else {
  448. t.controlBuf.put(&cleanupStream{
  449. streamID: se.StreamID,
  450. rst: true,
  451. rstCode: se.Code,
  452. onWrite: func() {},
  453. })
  454. }
  455. continue
  456. }
  457. if err == io.EOF || err == io.ErrUnexpectedEOF {
  458. t.Close()
  459. return
  460. }
  461. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  462. t.Close()
  463. return
  464. }
  465. switch frame := frame.(type) {
  466. case *http2.MetaHeadersFrame:
  467. if t.operateHeaders(frame, handle, traceCtx) {
  468. t.Close()
  469. break
  470. }
  471. case *http2.DataFrame:
  472. t.handleData(frame)
  473. case *http2.RSTStreamFrame:
  474. t.handleRSTStream(frame)
  475. case *http2.SettingsFrame:
  476. t.handleSettings(frame)
  477. case *http2.PingFrame:
  478. t.handlePing(frame)
  479. case *http2.WindowUpdateFrame:
  480. t.handleWindowUpdate(frame)
  481. case *http2.GoAwayFrame:
  482. // TODO: Handle GoAway from the client appropriately.
  483. default:
  484. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  485. }
  486. }
  487. }
  488. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  489. t.mu.Lock()
  490. defer t.mu.Unlock()
  491. if t.activeStreams == nil {
  492. // The transport is closing.
  493. return nil, false
  494. }
  495. s, ok := t.activeStreams[f.Header().StreamID]
  496. if !ok {
  497. // The stream is already done.
  498. return nil, false
  499. }
  500. return s, true
  501. }
  502. // adjustWindow sends out extra window update over the initial window size
  503. // of stream if the application is requesting data larger in size than
  504. // the window.
  505. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  506. if w := s.fc.maybeAdjust(n); w > 0 {
  507. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  508. }
  509. }
  510. // updateWindow adjusts the inbound quota for the stream and the transport.
  511. // Window updates will deliver to the controller for sending when
  512. // the cumulative quota exceeds the corresponding threshold.
  513. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  514. if w := s.fc.onRead(n); w > 0 {
  515. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  516. increment: w,
  517. })
  518. }
  519. }
  520. // updateFlowControl updates the incoming flow control windows
  521. // for the transport and the stream based on the current bdp
  522. // estimation.
  523. func (t *http2Server) updateFlowControl(n uint32) {
  524. t.mu.Lock()
  525. for _, s := range t.activeStreams {
  526. s.fc.newLimit(n)
  527. }
  528. t.initialWindowSize = int32(n)
  529. t.mu.Unlock()
  530. t.controlBuf.put(&outgoingWindowUpdate{
  531. streamID: 0,
  532. increment: t.fc.newLimit(n),
  533. })
  534. t.controlBuf.put(&outgoingSettings{
  535. ss: []http2.Setting{
  536. {
  537. ID: http2.SettingInitialWindowSize,
  538. Val: n,
  539. },
  540. },
  541. })
  542. }
  543. func (t *http2Server) handleData(f *http2.DataFrame) {
  544. size := f.Header().Length
  545. var sendBDPPing bool
  546. if t.bdpEst != nil {
  547. sendBDPPing = t.bdpEst.add(size)
  548. }
  549. // Decouple connection's flow control from application's read.
  550. // An update on connection's flow control should not depend on
  551. // whether user application has read the data or not. Such a
  552. // restriction is already imposed on the stream's flow control,
  553. // and therefore the sender will be blocked anyways.
  554. // Decoupling the connection flow control will prevent other
  555. // active(fast) streams from starving in presence of slow or
  556. // inactive streams.
  557. if w := t.fc.onData(size); w > 0 {
  558. t.controlBuf.put(&outgoingWindowUpdate{
  559. streamID: 0,
  560. increment: w,
  561. })
  562. }
  563. if sendBDPPing {
  564. // Avoid excessive ping detection (e.g. in an L7 proxy)
  565. // by sending a window update prior to the BDP ping.
  566. if w := t.fc.reset(); w > 0 {
  567. t.controlBuf.put(&outgoingWindowUpdate{
  568. streamID: 0,
  569. increment: w,
  570. })
  571. }
  572. t.controlBuf.put(bdpPing)
  573. }
  574. // Select the right stream to dispatch.
  575. s, ok := t.getStream(f)
  576. if !ok {
  577. return
  578. }
  579. if size > 0 {
  580. if err := s.fc.onData(size); err != nil {
  581. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  582. return
  583. }
  584. if f.Header().Flags.Has(http2.FlagDataPadded) {
  585. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  586. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  587. }
  588. }
  589. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  590. // guarantee f.Data() is consumed before the arrival of next frame.
  591. // Can this copy be eliminated?
  592. if len(f.Data()) > 0 {
  593. buffer := t.bufferPool.get()
  594. buffer.Reset()
  595. buffer.Write(f.Data())
  596. s.write(recvMsg{buffer: buffer})
  597. }
  598. }
  599. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  600. // Received the end of stream from the client.
  601. s.compareAndSwapState(streamActive, streamReadDone)
  602. s.write(recvMsg{err: io.EOF})
  603. }
  604. }
  605. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  606. // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
  607. if s, ok := t.getStream(f); ok {
  608. t.closeStream(s, false, 0, false)
  609. return
  610. }
  611. // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
  612. t.controlBuf.put(&cleanupStream{
  613. streamID: f.Header().StreamID,
  614. rst: false,
  615. rstCode: 0,
  616. onWrite: func() {},
  617. })
  618. }
  619. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  620. if f.IsAck() {
  621. return
  622. }
  623. var ss []http2.Setting
  624. var updateFuncs []func()
  625. f.ForeachSetting(func(s http2.Setting) error {
  626. switch s.ID {
  627. case http2.SettingMaxHeaderListSize:
  628. updateFuncs = append(updateFuncs, func() {
  629. t.maxSendHeaderListSize = new(uint32)
  630. *t.maxSendHeaderListSize = s.Val
  631. })
  632. default:
  633. ss = append(ss, s)
  634. }
  635. return nil
  636. })
  637. t.controlBuf.executeAndPut(func(interface{}) bool {
  638. for _, f := range updateFuncs {
  639. f()
  640. }
  641. return true
  642. }, &incomingSettings{
  643. ss: ss,
  644. })
  645. }
  646. const (
  647. maxPingStrikes = 2
  648. defaultPingTimeout = 2 * time.Hour
  649. )
  650. func (t *http2Server) handlePing(f *http2.PingFrame) {
  651. if f.IsAck() {
  652. if f.Data == goAwayPing.data && t.drainChan != nil {
  653. close(t.drainChan)
  654. return
  655. }
  656. // Maybe it's a BDP ping.
  657. if t.bdpEst != nil {
  658. t.bdpEst.calculate(f.Data)
  659. }
  660. return
  661. }
  662. pingAck := &ping{ack: true}
  663. copy(pingAck.data[:], f.Data[:])
  664. t.controlBuf.put(pingAck)
  665. now := time.Now()
  666. defer func() {
  667. t.lastPingAt = now
  668. }()
  669. // A reset ping strikes means that we don't need to check for policy
  670. // violation for this ping and the pingStrikes counter should be set
  671. // to 0.
  672. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  673. t.pingStrikes = 0
  674. return
  675. }
  676. t.mu.Lock()
  677. ns := len(t.activeStreams)
  678. t.mu.Unlock()
  679. if ns < 1 && !t.kep.PermitWithoutStream {
  680. // Keepalive shouldn't be active thus, this new ping should
  681. // have come after at least defaultPingTimeout.
  682. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  683. t.pingStrikes++
  684. }
  685. } else {
  686. // Check if keepalive policy is respected.
  687. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  688. t.pingStrikes++
  689. }
  690. }
  691. if t.pingStrikes > maxPingStrikes {
  692. // Send goaway and close the connection.
  693. errorf("transport: Got too many pings from the client, closing the connection.")
  694. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  695. }
  696. }
  697. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  698. t.controlBuf.put(&incomingWindowUpdate{
  699. streamID: f.Header().StreamID,
  700. increment: f.Increment,
  701. })
  702. }
  703. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  704. for k, vv := range md {
  705. if isReservedHeader(k) {
  706. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  707. continue
  708. }
  709. for _, v := range vv {
  710. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  711. }
  712. }
  713. return headerFields
  714. }
  715. func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
  716. if t.maxSendHeaderListSize == nil {
  717. return true
  718. }
  719. hdrFrame := it.(*headerFrame)
  720. var sz int64
  721. for _, f := range hdrFrame.hf {
  722. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  723. errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  724. return false
  725. }
  726. }
  727. return true
  728. }
  729. // WriteHeader sends the header metadata md back to the client.
  730. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  731. if s.updateHeaderSent() || s.getState() == streamDone {
  732. return ErrIllegalHeaderWrite
  733. }
  734. s.hdrMu.Lock()
  735. if md.Len() > 0 {
  736. if s.header.Len() > 0 {
  737. s.header = metadata.Join(s.header, md)
  738. } else {
  739. s.header = md
  740. }
  741. }
  742. if err := t.writeHeaderLocked(s); err != nil {
  743. s.hdrMu.Unlock()
  744. return err
  745. }
  746. s.hdrMu.Unlock()
  747. return nil
  748. }
  749. func (t *http2Server) setResetPingStrikes() {
  750. atomic.StoreUint32(&t.resetPingStrikes, 1)
  751. }
  752. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  753. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  754. // first and create a slice of that exact size.
  755. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  756. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  757. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  758. if s.sendCompress != "" {
  759. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  760. }
  761. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  762. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  763. streamID: s.id,
  764. hf: headerFields,
  765. endStream: false,
  766. onWrite: t.setResetPingStrikes,
  767. })
  768. if !success {
  769. if err != nil {
  770. return err
  771. }
  772. t.closeStream(s, true, http2.ErrCodeInternal, false)
  773. return ErrHeaderListSizeLimitViolation
  774. }
  775. if t.stats != nil {
  776. // Note: WireLength is not set in outHeader.
  777. // TODO(mmukhi): Revisit this later, if needed.
  778. outHeader := &stats.OutHeader{
  779. Header: s.header.Copy(),
  780. }
  781. t.stats.HandleRPC(s.Context(), outHeader)
  782. }
  783. return nil
  784. }
  785. // WriteStatus sends stream status to the client and terminates the stream.
  786. // There is no further I/O operations being able to perform on this stream.
  787. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  788. // OK is adopted.
  789. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  790. if s.getState() == streamDone {
  791. return nil
  792. }
  793. s.hdrMu.Lock()
  794. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  795. // first and create a slice of that exact size.
  796. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  797. if !s.updateHeaderSent() { // No headers have been sent.
  798. if len(s.header) > 0 { // Send a separate header frame.
  799. if err := t.writeHeaderLocked(s); err != nil {
  800. s.hdrMu.Unlock()
  801. return err
  802. }
  803. } else { // Send a trailer only response.
  804. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  805. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  806. }
  807. }
  808. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  809. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  810. if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
  811. stBytes, err := proto.Marshal(p)
  812. if err != nil {
  813. // TODO: return error instead, when callers are able to handle it.
  814. grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
  815. } else {
  816. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  817. }
  818. }
  819. // Attach the trailer metadata.
  820. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  821. trailingHeader := &headerFrame{
  822. streamID: s.id,
  823. hf: headerFields,
  824. endStream: true,
  825. onWrite: t.setResetPingStrikes,
  826. }
  827. s.hdrMu.Unlock()
  828. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  829. if !success {
  830. if err != nil {
  831. return err
  832. }
  833. t.closeStream(s, true, http2.ErrCodeInternal, false)
  834. return ErrHeaderListSizeLimitViolation
  835. }
  836. // Send a RST_STREAM after the trailers if the client has not already half-closed.
  837. rst := s.getState() == streamActive
  838. t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  839. if t.stats != nil {
  840. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
  841. Trailer: s.trailer.Copy(),
  842. })
  843. }
  844. return nil
  845. }
  846. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  847. // is returns if it fails (e.g., framing error, transport error).
  848. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  849. if !s.isHeaderSent() { // Headers haven't been written yet.
  850. if err := t.WriteHeader(s, nil); err != nil {
  851. if _, ok := err.(ConnectionError); ok {
  852. return err
  853. }
  854. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
  855. return status.Errorf(codes.Internal, "transport: %v", err)
  856. }
  857. } else {
  858. // Writing headers checks for this condition.
  859. if s.getState() == streamDone {
  860. // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
  861. s.cancel()
  862. select {
  863. case <-t.done:
  864. return ErrConnClosing
  865. default:
  866. }
  867. return ContextErr(s.ctx.Err())
  868. }
  869. }
  870. // Add some data to header frame so that we can equally distribute bytes across frames.
  871. emptyLen := http2MaxFrameLen - len(hdr)
  872. if emptyLen > len(data) {
  873. emptyLen = len(data)
  874. }
  875. hdr = append(hdr, data[:emptyLen]...)
  876. data = data[emptyLen:]
  877. df := &dataFrame{
  878. streamID: s.id,
  879. h: hdr,
  880. d: data,
  881. onEachWrite: t.setResetPingStrikes,
  882. }
  883. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  884. select {
  885. case <-t.done:
  886. return ErrConnClosing
  887. default:
  888. }
  889. return ContextErr(s.ctx.Err())
  890. }
  891. return t.controlBuf.put(df)
  892. }
  893. // keepalive running in a separate goroutine does the following:
  894. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  895. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  896. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  897. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  898. // after an additional duration of keepalive.Timeout.
  899. func (t *http2Server) keepalive() {
  900. p := &ping{}
  901. // True iff a ping has been sent, and no data has been received since then.
  902. outstandingPing := false
  903. // Amount of time remaining before which we should receive an ACK for the
  904. // last sent ping.
  905. kpTimeoutLeft := time.Duration(0)
  906. // Records the last value of t.lastRead before we go block on the timer.
  907. // This is required to check for read activity since then.
  908. prevNano := time.Now().UnixNano()
  909. // Initialize the different timers to their default values.
  910. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  911. ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  912. kpTimer := time.NewTimer(t.kp.Time)
  913. defer func() {
  914. // We need to drain the underlying channel in these timers after a call
  915. // to Stop(), only if we are interested in resetting them. Clearly we
  916. // are not interested in resetting them here.
  917. idleTimer.Stop()
  918. ageTimer.Stop()
  919. kpTimer.Stop()
  920. }()
  921. for {
  922. select {
  923. case <-idleTimer.C:
  924. t.mu.Lock()
  925. idle := t.idle
  926. if idle.IsZero() { // The connection is non-idle.
  927. t.mu.Unlock()
  928. idleTimer.Reset(t.kp.MaxConnectionIdle)
  929. continue
  930. }
  931. val := t.kp.MaxConnectionIdle - time.Since(idle)
  932. t.mu.Unlock()
  933. if val <= 0 {
  934. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  935. // Gracefully close the connection.
  936. t.drain(http2.ErrCodeNo, []byte{})
  937. return
  938. }
  939. idleTimer.Reset(val)
  940. case <-ageTimer.C:
  941. t.drain(http2.ErrCodeNo, []byte{})
  942. ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  943. select {
  944. case <-ageTimer.C:
  945. // Close the connection after grace period.
  946. infof("transport: closing server transport due to maximum connection age.")
  947. t.Close()
  948. case <-t.done:
  949. }
  950. return
  951. case <-kpTimer.C:
  952. lastRead := atomic.LoadInt64(&t.lastRead)
  953. if lastRead > prevNano {
  954. // There has been read activity since the last time we were
  955. // here. Setup the timer to fire at kp.Time seconds from
  956. // lastRead time and continue.
  957. outstandingPing = false
  958. kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  959. prevNano = lastRead
  960. continue
  961. }
  962. if outstandingPing && kpTimeoutLeft <= 0 {
  963. infof("transport: closing server transport due to idleness.")
  964. t.Close()
  965. return
  966. }
  967. if !outstandingPing {
  968. if channelz.IsOn() {
  969. atomic.AddInt64(&t.czData.kpCount, 1)
  970. }
  971. t.controlBuf.put(p)
  972. kpTimeoutLeft = t.kp.Timeout
  973. outstandingPing = true
  974. }
  975. // The amount of time to sleep here is the minimum of kp.Time and
  976. // timeoutLeft. This will ensure that we wait only for kp.Time
  977. // before sending out the next ping (for cases where the ping is
  978. // acked).
  979. sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  980. kpTimeoutLeft -= sleepDuration
  981. kpTimer.Reset(sleepDuration)
  982. case <-t.done:
  983. return
  984. }
  985. }
  986. }
  987. // Close starts shutting down the http2Server transport.
  988. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  989. // could cause some resource issue. Revisit this later.
  990. func (t *http2Server) Close() error {
  991. t.mu.Lock()
  992. if t.state == closing {
  993. t.mu.Unlock()
  994. return errors.New("transport: Close() was already called")
  995. }
  996. t.state = closing
  997. streams := t.activeStreams
  998. t.activeStreams = nil
  999. t.mu.Unlock()
  1000. t.controlBuf.finish()
  1001. close(t.done)
  1002. err := t.conn.Close()
  1003. if channelz.IsOn() {
  1004. channelz.RemoveEntry(t.channelzID)
  1005. }
  1006. // Cancel all active streams.
  1007. for _, s := range streams {
  1008. s.cancel()
  1009. }
  1010. if t.stats != nil {
  1011. connEnd := &stats.ConnEnd{}
  1012. t.stats.HandleConn(t.ctx, connEnd)
  1013. }
  1014. return err
  1015. }
  1016. // deleteStream deletes the stream s from transport's active streams.
  1017. func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
  1018. // In case stream sending and receiving are invoked in separate
  1019. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1020. // called to interrupt the potential blocking on other goroutines.
  1021. s.cancel()
  1022. t.mu.Lock()
  1023. if _, ok := t.activeStreams[s.id]; ok {
  1024. delete(t.activeStreams, s.id)
  1025. if len(t.activeStreams) == 0 {
  1026. t.idle = time.Now()
  1027. }
  1028. }
  1029. t.mu.Unlock()
  1030. if channelz.IsOn() {
  1031. if eosReceived {
  1032. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  1033. } else {
  1034. atomic.AddInt64(&t.czData.streamsFailed, 1)
  1035. }
  1036. }
  1037. }
  1038. // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1039. func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1040. oldState := s.swapState(streamDone)
  1041. if oldState == streamDone {
  1042. // If the stream was already done, return.
  1043. return
  1044. }
  1045. hdr.cleanup = &cleanupStream{
  1046. streamID: s.id,
  1047. rst: rst,
  1048. rstCode: rstCode,
  1049. onWrite: func() {
  1050. t.deleteStream(s, eosReceived)
  1051. },
  1052. }
  1053. t.controlBuf.put(hdr)
  1054. }
  1055. // closeStream clears the footprint of a stream when the stream is not needed any more.
  1056. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1057. s.swapState(streamDone)
  1058. t.deleteStream(s, eosReceived)
  1059. t.controlBuf.put(&cleanupStream{
  1060. streamID: s.id,
  1061. rst: rst,
  1062. rstCode: rstCode,
  1063. onWrite: func() {},
  1064. })
  1065. }
  1066. func (t *http2Server) RemoteAddr() net.Addr {
  1067. return t.remoteAddr
  1068. }
  1069. func (t *http2Server) Drain() {
  1070. t.drain(http2.ErrCodeNo, []byte{})
  1071. }
  1072. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1073. t.mu.Lock()
  1074. defer t.mu.Unlock()
  1075. if t.drainChan != nil {
  1076. return
  1077. }
  1078. t.drainChan = make(chan struct{})
  1079. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1080. }
  1081. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1082. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1083. // in draining mode.
  1084. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1085. t.mu.Lock()
  1086. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1087. t.mu.Unlock()
  1088. // The transport is closing.
  1089. return false, ErrConnClosing
  1090. }
  1091. sid := t.maxStreamID
  1092. if !g.headsUp {
  1093. // Stop accepting more streams now.
  1094. t.state = draining
  1095. if len(t.activeStreams) == 0 {
  1096. g.closeConn = true
  1097. }
  1098. t.mu.Unlock()
  1099. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1100. return false, err
  1101. }
  1102. if g.closeConn {
  1103. // Abruptly close the connection following the GoAway (via
  1104. // loopywriter). But flush out what's inside the buffer first.
  1105. t.framer.writer.Flush()
  1106. return false, fmt.Errorf("transport: Connection closing")
  1107. }
  1108. return true, nil
  1109. }
  1110. t.mu.Unlock()
  1111. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1112. // Follow that with a ping and wait for the ack to come back or a timer
  1113. // to expire. During this time accept new streams since they might have
  1114. // originated before the GoAway reaches the client.
  1115. // After getting the ack or timer expiration send out another GoAway this
  1116. // time with an ID of the max stream server intends to process.
  1117. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1118. return false, err
  1119. }
  1120. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1121. return false, err
  1122. }
  1123. go func() {
  1124. timer := time.NewTimer(time.Minute)
  1125. defer timer.Stop()
  1126. select {
  1127. case <-t.drainChan:
  1128. case <-timer.C:
  1129. case <-t.done:
  1130. return
  1131. }
  1132. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1133. }()
  1134. return false, nil
  1135. }
  1136. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1137. s := channelz.SocketInternalMetric{
  1138. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1139. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1140. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1141. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1142. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1143. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1144. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1145. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1146. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1147. LocalFlowControlWindow: int64(t.fc.getSize()),
  1148. SocketOptions: channelz.GetSocketOption(t.conn),
  1149. LocalAddr: t.localAddr,
  1150. RemoteAddr: t.remoteAddr,
  1151. // RemoteName :
  1152. }
  1153. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1154. s.Security = au.GetSecurityValue()
  1155. }
  1156. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1157. return &s
  1158. }
  1159. func (t *http2Server) IncrMsgSent() {
  1160. atomic.AddInt64(&t.czData.msgSent, 1)
  1161. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1162. }
  1163. func (t *http2Server) IncrMsgRecv() {
  1164. atomic.AddInt64(&t.czData.msgRecv, 1)
  1165. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1166. }
  1167. func (t *http2Server) getOutFlowWindow() int64 {
  1168. resp := make(chan uint32, 1)
  1169. timer := time.NewTimer(time.Second)
  1170. defer timer.Stop()
  1171. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1172. select {
  1173. case sz := <-resp:
  1174. return int64(sz)
  1175. case <-t.done:
  1176. return -1
  1177. case <-timer.C:
  1178. return -2
  1179. }
  1180. }
  1181. func getJitter(v time.Duration) time.Duration {
  1182. if v == infinity {
  1183. return 0
  1184. }
  1185. // Generate a jitter between +/- 10% of the value.
  1186. r := int64(v / 10)
  1187. j := grpcrand.Int63n(2*r) - r
  1188. return time.Duration(j)
  1189. }