controlbuf.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "fmt"
  22. "runtime"
  23. "sync"
  24. "sync/atomic"
  25. "golang.org/x/net/http2"
  26. "golang.org/x/net/http2/hpack"
  27. )
  28. var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  29. e.SetMaxDynamicTableSizeLimit(v)
  30. }
  31. type itemNode struct {
  32. it interface{}
  33. next *itemNode
  34. }
  35. type itemList struct {
  36. head *itemNode
  37. tail *itemNode
  38. }
  39. func (il *itemList) enqueue(i interface{}) {
  40. n := &itemNode{it: i}
  41. if il.tail == nil {
  42. il.head, il.tail = n, n
  43. return
  44. }
  45. il.tail.next = n
  46. il.tail = n
  47. }
  48. // peek returns the first item in the list without removing it from the
  49. // list.
  50. func (il *itemList) peek() interface{} {
  51. return il.head.it
  52. }
  53. func (il *itemList) dequeue() interface{} {
  54. if il.head == nil {
  55. return nil
  56. }
  57. i := il.head.it
  58. il.head = il.head.next
  59. if il.head == nil {
  60. il.tail = nil
  61. }
  62. return i
  63. }
  64. func (il *itemList) dequeueAll() *itemNode {
  65. h := il.head
  66. il.head, il.tail = nil, nil
  67. return h
  68. }
  69. func (il *itemList) isEmpty() bool {
  70. return il.head == nil
  71. }
  72. // The following defines various control items which could flow through
  73. // the control buffer of transport. They represent different aspects of
  74. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  75. // maxQueuedTransportResponseFrames is the most queued "transport response"
  76. // frames we will buffer before preventing new reads from occurring on the
  77. // transport. These are control frames sent in response to client requests,
  78. // such as RST_STREAM due to bad headers or settings acks.
  79. const maxQueuedTransportResponseFrames = 50
  80. type cbItem interface {
  81. isTransportResponseFrame() bool
  82. }
  83. // registerStream is used to register an incoming stream with loopy writer.
  84. type registerStream struct {
  85. streamID uint32
  86. wq *writeQuota
  87. }
  88. func (*registerStream) isTransportResponseFrame() bool { return false }
  89. // headerFrame is also used to register stream on the client-side.
  90. type headerFrame struct {
  91. streamID uint32
  92. hf []hpack.HeaderField
  93. endStream bool // Valid on server side.
  94. initStream func(uint32) error // Used only on the client side.
  95. onWrite func()
  96. wq *writeQuota // write quota for the stream created.
  97. cleanup *cleanupStream // Valid on the server side.
  98. onOrphaned func(error) // Valid on client-side
  99. }
  100. func (h *headerFrame) isTransportResponseFrame() bool {
  101. return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
  102. }
  103. type cleanupStream struct {
  104. streamID uint32
  105. rst bool
  106. rstCode http2.ErrCode
  107. onWrite func()
  108. }
  109. func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
  110. type dataFrame struct {
  111. streamID uint32
  112. endStream bool
  113. h []byte
  114. d []byte
  115. // onEachWrite is called every time
  116. // a part of d is written out.
  117. onEachWrite func()
  118. }
  119. func (*dataFrame) isTransportResponseFrame() bool { return false }
  120. type incomingWindowUpdate struct {
  121. streamID uint32
  122. increment uint32
  123. }
  124. func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
  125. type outgoingWindowUpdate struct {
  126. streamID uint32
  127. increment uint32
  128. }
  129. func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
  130. return false // window updates are throttled by thresholds
  131. }
  132. type incomingSettings struct {
  133. ss []http2.Setting
  134. }
  135. func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
  136. type outgoingSettings struct {
  137. ss []http2.Setting
  138. }
  139. func (*outgoingSettings) isTransportResponseFrame() bool { return false }
  140. type incomingGoAway struct {
  141. }
  142. func (*incomingGoAway) isTransportResponseFrame() bool { return false }
  143. type goAway struct {
  144. code http2.ErrCode
  145. debugData []byte
  146. headsUp bool
  147. closeConn bool
  148. }
  149. func (*goAway) isTransportResponseFrame() bool { return false }
  150. type ping struct {
  151. ack bool
  152. data [8]byte
  153. }
  154. func (*ping) isTransportResponseFrame() bool { return true }
  155. type outFlowControlSizeRequest struct {
  156. resp chan uint32
  157. }
  158. func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  159. type outStreamState int
  160. const (
  161. active outStreamState = iota
  162. empty
  163. waitingOnStreamQuota
  164. )
  165. type outStream struct {
  166. id uint32
  167. state outStreamState
  168. itl *itemList
  169. bytesOutStanding int
  170. wq *writeQuota
  171. next *outStream
  172. prev *outStream
  173. }
  174. func (s *outStream) deleteSelf() {
  175. if s.prev != nil {
  176. s.prev.next = s.next
  177. }
  178. if s.next != nil {
  179. s.next.prev = s.prev
  180. }
  181. s.next, s.prev = nil, nil
  182. }
  183. type outStreamList struct {
  184. // Following are sentinel objects that mark the
  185. // beginning and end of the list. They do not
  186. // contain any item lists. All valid objects are
  187. // inserted in between them.
  188. // This is needed so that an outStream object can
  189. // deleteSelf() in O(1) time without knowing which
  190. // list it belongs to.
  191. head *outStream
  192. tail *outStream
  193. }
  194. func newOutStreamList() *outStreamList {
  195. head, tail := new(outStream), new(outStream)
  196. head.next = tail
  197. tail.prev = head
  198. return &outStreamList{
  199. head: head,
  200. tail: tail,
  201. }
  202. }
  203. func (l *outStreamList) enqueue(s *outStream) {
  204. e := l.tail.prev
  205. e.next = s
  206. s.prev = e
  207. s.next = l.tail
  208. l.tail.prev = s
  209. }
  210. // remove from the beginning of the list.
  211. func (l *outStreamList) dequeue() *outStream {
  212. b := l.head.next
  213. if b == l.tail {
  214. return nil
  215. }
  216. b.deleteSelf()
  217. return b
  218. }
  219. // controlBuffer is a way to pass information to loopy.
  220. // Information is passed as specific struct types called control frames.
  221. // A control frame not only represents data, messages or headers to be sent out
  222. // but can also be used to instruct loopy to update its internal state.
  223. // It shouldn't be confused with an HTTP2 frame, although some of the control frames
  224. // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
  225. type controlBuffer struct {
  226. ch chan struct{}
  227. done <-chan struct{}
  228. mu sync.Mutex
  229. consumerWaiting bool
  230. list *itemList
  231. err error
  232. // transportResponseFrames counts the number of queued items that represent
  233. // the response of an action initiated by the peer. trfChan is created
  234. // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
  235. // closed and nilled when transportResponseFrames drops below the
  236. // threshold. Both fields are protected by mu.
  237. transportResponseFrames int
  238. trfChan atomic.Value // *chan struct{}
  239. }
  240. func newControlBuffer(done <-chan struct{}) *controlBuffer {
  241. return &controlBuffer{
  242. ch: make(chan struct{}, 1),
  243. list: &itemList{},
  244. done: done,
  245. }
  246. }
  247. // throttle blocks if there are too many incomingSettings/cleanupStreams in the
  248. // controlbuf.
  249. func (c *controlBuffer) throttle() {
  250. ch, _ := c.trfChan.Load().(*chan struct{})
  251. if ch != nil {
  252. select {
  253. case <-*ch:
  254. case <-c.done:
  255. }
  256. }
  257. }
  258. func (c *controlBuffer) put(it cbItem) error {
  259. _, err := c.executeAndPut(nil, it)
  260. return err
  261. }
  262. func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
  263. var wakeUp bool
  264. c.mu.Lock()
  265. if c.err != nil {
  266. c.mu.Unlock()
  267. return false, c.err
  268. }
  269. if f != nil {
  270. if !f(it) { // f wasn't successful
  271. c.mu.Unlock()
  272. return false, nil
  273. }
  274. }
  275. if c.consumerWaiting {
  276. wakeUp = true
  277. c.consumerWaiting = false
  278. }
  279. c.list.enqueue(it)
  280. if it.isTransportResponseFrame() {
  281. c.transportResponseFrames++
  282. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  283. // We are adding the frame that puts us over the threshold; create
  284. // a throttling channel.
  285. ch := make(chan struct{})
  286. c.trfChan.Store(&ch)
  287. }
  288. }
  289. c.mu.Unlock()
  290. if wakeUp {
  291. select {
  292. case c.ch <- struct{}{}:
  293. default:
  294. }
  295. }
  296. return true, nil
  297. }
  298. // Note argument f should never be nil.
  299. func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
  300. c.mu.Lock()
  301. if c.err != nil {
  302. c.mu.Unlock()
  303. return false, c.err
  304. }
  305. if !f(it) { // f wasn't successful
  306. c.mu.Unlock()
  307. return false, nil
  308. }
  309. c.mu.Unlock()
  310. return true, nil
  311. }
  312. func (c *controlBuffer) get(block bool) (interface{}, error) {
  313. for {
  314. c.mu.Lock()
  315. if c.err != nil {
  316. c.mu.Unlock()
  317. return nil, c.err
  318. }
  319. if !c.list.isEmpty() {
  320. h := c.list.dequeue().(cbItem)
  321. if h.isTransportResponseFrame() {
  322. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  323. // We are removing the frame that put us over the
  324. // threshold; close and clear the throttling channel.
  325. ch := c.trfChan.Load().(*chan struct{})
  326. close(*ch)
  327. c.trfChan.Store((*chan struct{})(nil))
  328. }
  329. c.transportResponseFrames--
  330. }
  331. c.mu.Unlock()
  332. return h, nil
  333. }
  334. if !block {
  335. c.mu.Unlock()
  336. return nil, nil
  337. }
  338. c.consumerWaiting = true
  339. c.mu.Unlock()
  340. select {
  341. case <-c.ch:
  342. case <-c.done:
  343. c.finish()
  344. return nil, ErrConnClosing
  345. }
  346. }
  347. }
  348. func (c *controlBuffer) finish() {
  349. c.mu.Lock()
  350. if c.err != nil {
  351. c.mu.Unlock()
  352. return
  353. }
  354. c.err = ErrConnClosing
  355. // There may be headers for streams in the control buffer.
  356. // These streams need to be cleaned out since the transport
  357. // is still not aware of these yet.
  358. for head := c.list.dequeueAll(); head != nil; head = head.next {
  359. hdr, ok := head.it.(*headerFrame)
  360. if !ok {
  361. continue
  362. }
  363. if hdr.onOrphaned != nil { // It will be nil on the server-side.
  364. hdr.onOrphaned(ErrConnClosing)
  365. }
  366. }
  367. c.mu.Unlock()
  368. }
  369. type side int
  370. const (
  371. clientSide side = iota
  372. serverSide
  373. )
  374. // Loopy receives frames from the control buffer.
  375. // Each frame is handled individually; most of the work done by loopy goes
  376. // into handling data frames. Loopy maintains a queue of active streams, and each
  377. // stream maintains a queue of data frames; as loopy receives data frames
  378. // it gets added to the queue of the relevant stream.
  379. // Loopy goes over this list of active streams by processing one node every iteration,
  380. // thereby closely resemebling to a round-robin scheduling over all streams. While
  381. // processing a stream, loopy writes out data bytes from this stream capped by the min
  382. // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
  383. type loopyWriter struct {
  384. side side
  385. cbuf *controlBuffer
  386. sendQuota uint32
  387. oiws uint32 // outbound initial window size.
  388. // estdStreams is map of all established streams that are not cleaned-up yet.
  389. // On client-side, this is all streams whose headers were sent out.
  390. // On server-side, this is all streams whose headers were received.
  391. estdStreams map[uint32]*outStream // Established streams.
  392. // activeStreams is a linked-list of all streams that have data to send and some
  393. // stream-level flow control quota.
  394. // Each of these streams internally have a list of data items(and perhaps trailers
  395. // on the server-side) to be sent out.
  396. activeStreams *outStreamList
  397. framer *framer
  398. hBuf *bytes.Buffer // The buffer for HPACK encoding.
  399. hEnc *hpack.Encoder // HPACK encoder.
  400. bdpEst *bdpEstimator
  401. draining bool
  402. // Side-specific handlers
  403. ssGoAwayHandler func(*goAway) (bool, error)
  404. }
  405. func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
  406. var buf bytes.Buffer
  407. l := &loopyWriter{
  408. side: s,
  409. cbuf: cbuf,
  410. sendQuota: defaultWindowSize,
  411. oiws: defaultWindowSize,
  412. estdStreams: make(map[uint32]*outStream),
  413. activeStreams: newOutStreamList(),
  414. framer: fr,
  415. hBuf: &buf,
  416. hEnc: hpack.NewEncoder(&buf),
  417. bdpEst: bdpEst,
  418. }
  419. return l
  420. }
  421. const minBatchSize = 1000
  422. // run should be run in a separate goroutine.
  423. // It reads control frames from controlBuf and processes them by:
  424. // 1. Updating loopy's internal state, or/and
  425. // 2. Writing out HTTP2 frames on the wire.
  426. //
  427. // Loopy keeps all active streams with data to send in a linked-list.
  428. // All streams in the activeStreams linked-list must have both:
  429. // 1. Data to send, and
  430. // 2. Stream level flow control quota available.
  431. //
  432. // In each iteration of run loop, other than processing the incoming control
  433. // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
  434. // This results in writing of HTTP2 frames into an underlying write buffer.
  435. // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
  436. // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
  437. // if the batch size is too low to give stream goroutines a chance to fill it up.
  438. func (l *loopyWriter) run() (err error) {
  439. defer func() {
  440. if err == ErrConnClosing {
  441. // Don't log ErrConnClosing as error since it happens
  442. // 1. When the connection is closed by some other known issue.
  443. // 2. User closed the connection.
  444. // 3. A graceful close of connection.
  445. infof("transport: loopyWriter.run returning. %v", err)
  446. err = nil
  447. }
  448. }()
  449. for {
  450. it, err := l.cbuf.get(true)
  451. if err != nil {
  452. return err
  453. }
  454. if err = l.handle(it); err != nil {
  455. return err
  456. }
  457. if _, err = l.processData(); err != nil {
  458. return err
  459. }
  460. gosched := true
  461. hasdata:
  462. for {
  463. it, err := l.cbuf.get(false)
  464. if err != nil {
  465. return err
  466. }
  467. if it != nil {
  468. if err = l.handle(it); err != nil {
  469. return err
  470. }
  471. if _, err = l.processData(); err != nil {
  472. return err
  473. }
  474. continue hasdata
  475. }
  476. isEmpty, err := l.processData()
  477. if err != nil {
  478. return err
  479. }
  480. if !isEmpty {
  481. continue hasdata
  482. }
  483. if gosched {
  484. gosched = false
  485. if l.framer.writer.offset < minBatchSize {
  486. runtime.Gosched()
  487. continue hasdata
  488. }
  489. }
  490. l.framer.writer.Flush()
  491. break hasdata
  492. }
  493. }
  494. }
  495. func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
  496. return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
  497. }
  498. func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
  499. // Otherwise update the quota.
  500. if w.streamID == 0 {
  501. l.sendQuota += w.increment
  502. return nil
  503. }
  504. // Find the stream and update it.
  505. if str, ok := l.estdStreams[w.streamID]; ok {
  506. str.bytesOutStanding -= int(w.increment)
  507. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  508. str.state = active
  509. l.activeStreams.enqueue(str)
  510. return nil
  511. }
  512. }
  513. return nil
  514. }
  515. func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
  516. return l.framer.fr.WriteSettings(s.ss...)
  517. }
  518. func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
  519. if err := l.applySettings(s.ss); err != nil {
  520. return err
  521. }
  522. return l.framer.fr.WriteSettingsAck()
  523. }
  524. func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
  525. str := &outStream{
  526. id: h.streamID,
  527. state: empty,
  528. itl: &itemList{},
  529. wq: h.wq,
  530. }
  531. l.estdStreams[h.streamID] = str
  532. return nil
  533. }
  534. func (l *loopyWriter) headerHandler(h *headerFrame) error {
  535. if l.side == serverSide {
  536. str, ok := l.estdStreams[h.streamID]
  537. if !ok {
  538. warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
  539. return nil
  540. }
  541. // Case 1.A: Server is responding back with headers.
  542. if !h.endStream {
  543. return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
  544. }
  545. // else: Case 1.B: Server wants to close stream.
  546. if str.state != empty { // either active or waiting on stream quota.
  547. // add it str's list of items.
  548. str.itl.enqueue(h)
  549. return nil
  550. }
  551. if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
  552. return err
  553. }
  554. return l.cleanupStreamHandler(h.cleanup)
  555. }
  556. // Case 2: Client wants to originate stream.
  557. str := &outStream{
  558. id: h.streamID,
  559. state: empty,
  560. itl: &itemList{},
  561. wq: h.wq,
  562. }
  563. str.itl.enqueue(h)
  564. return l.originateStream(str)
  565. }
  566. func (l *loopyWriter) originateStream(str *outStream) error {
  567. hdr := str.itl.dequeue().(*headerFrame)
  568. if err := hdr.initStream(str.id); err != nil {
  569. if err == ErrConnClosing {
  570. return err
  571. }
  572. // Other errors(errStreamDrain) need not close transport.
  573. return nil
  574. }
  575. if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
  576. return err
  577. }
  578. l.estdStreams[str.id] = str
  579. return nil
  580. }
  581. func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  582. if onWrite != nil {
  583. onWrite()
  584. }
  585. l.hBuf.Reset()
  586. for _, f := range hf {
  587. if err := l.hEnc.WriteField(f); err != nil {
  588. warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
  589. }
  590. }
  591. var (
  592. err error
  593. endHeaders, first bool
  594. )
  595. first = true
  596. for !endHeaders {
  597. size := l.hBuf.Len()
  598. if size > http2MaxFrameLen {
  599. size = http2MaxFrameLen
  600. } else {
  601. endHeaders = true
  602. }
  603. if first {
  604. first = false
  605. err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  606. StreamID: streamID,
  607. BlockFragment: l.hBuf.Next(size),
  608. EndStream: endStream,
  609. EndHeaders: endHeaders,
  610. })
  611. } else {
  612. err = l.framer.fr.WriteContinuation(
  613. streamID,
  614. endHeaders,
  615. l.hBuf.Next(size),
  616. )
  617. }
  618. if err != nil {
  619. return err
  620. }
  621. }
  622. return nil
  623. }
  624. func (l *loopyWriter) preprocessData(df *dataFrame) error {
  625. str, ok := l.estdStreams[df.streamID]
  626. if !ok {
  627. return nil
  628. }
  629. // If we got data for a stream it means that
  630. // stream was originated and the headers were sent out.
  631. str.itl.enqueue(df)
  632. if str.state == empty {
  633. str.state = active
  634. l.activeStreams.enqueue(str)
  635. }
  636. return nil
  637. }
  638. func (l *loopyWriter) pingHandler(p *ping) error {
  639. if !p.ack {
  640. l.bdpEst.timesnap(p.data)
  641. }
  642. return l.framer.fr.WritePing(p.ack, p.data)
  643. }
  644. func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
  645. o.resp <- l.sendQuota
  646. return nil
  647. }
  648. func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
  649. c.onWrite()
  650. if str, ok := l.estdStreams[c.streamID]; ok {
  651. // On the server side it could be a trailers-only response or
  652. // a RST_STREAM before stream initialization thus the stream might
  653. // not be established yet.
  654. delete(l.estdStreams, c.streamID)
  655. str.deleteSelf()
  656. }
  657. if c.rst { // If RST_STREAM needs to be sent.
  658. if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
  659. return err
  660. }
  661. }
  662. if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
  663. return ErrConnClosing
  664. }
  665. return nil
  666. }
  667. func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
  668. if l.side == clientSide {
  669. l.draining = true
  670. if len(l.estdStreams) == 0 {
  671. return ErrConnClosing
  672. }
  673. }
  674. return nil
  675. }
  676. func (l *loopyWriter) goAwayHandler(g *goAway) error {
  677. // Handling of outgoing GoAway is very specific to side.
  678. if l.ssGoAwayHandler != nil {
  679. draining, err := l.ssGoAwayHandler(g)
  680. if err != nil {
  681. return err
  682. }
  683. l.draining = draining
  684. }
  685. return nil
  686. }
  687. func (l *loopyWriter) handle(i interface{}) error {
  688. switch i := i.(type) {
  689. case *incomingWindowUpdate:
  690. return l.incomingWindowUpdateHandler(i)
  691. case *outgoingWindowUpdate:
  692. return l.outgoingWindowUpdateHandler(i)
  693. case *incomingSettings:
  694. return l.incomingSettingsHandler(i)
  695. case *outgoingSettings:
  696. return l.outgoingSettingsHandler(i)
  697. case *headerFrame:
  698. return l.headerHandler(i)
  699. case *registerStream:
  700. return l.registerStreamHandler(i)
  701. case *cleanupStream:
  702. return l.cleanupStreamHandler(i)
  703. case *incomingGoAway:
  704. return l.incomingGoAwayHandler(i)
  705. case *dataFrame:
  706. return l.preprocessData(i)
  707. case *ping:
  708. return l.pingHandler(i)
  709. case *goAway:
  710. return l.goAwayHandler(i)
  711. case *outFlowControlSizeRequest:
  712. return l.outFlowControlSizeRequestHandler(i)
  713. default:
  714. return fmt.Errorf("transport: unknown control message type %T", i)
  715. }
  716. }
  717. func (l *loopyWriter) applySettings(ss []http2.Setting) error {
  718. for _, s := range ss {
  719. switch s.ID {
  720. case http2.SettingInitialWindowSize:
  721. o := l.oiws
  722. l.oiws = s.Val
  723. if o < l.oiws {
  724. // If the new limit is greater make all depleted streams active.
  725. for _, stream := range l.estdStreams {
  726. if stream.state == waitingOnStreamQuota {
  727. stream.state = active
  728. l.activeStreams.enqueue(stream)
  729. }
  730. }
  731. }
  732. case http2.SettingHeaderTableSize:
  733. updateHeaderTblSize(l.hEnc, s.Val)
  734. }
  735. }
  736. return nil
  737. }
  738. // processData removes the first stream from active streams, writes out at most 16KB
  739. // of its data and then puts it at the end of activeStreams if there's still more data
  740. // to be sent and stream has some stream-level flow control.
  741. func (l *loopyWriter) processData() (bool, error) {
  742. if l.sendQuota == 0 {
  743. return true, nil
  744. }
  745. str := l.activeStreams.dequeue() // Remove the first stream.
  746. if str == nil {
  747. return true, nil
  748. }
  749. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
  750. // A data item is represented by a dataFrame, since it later translates into
  751. // multiple HTTP2 data frames.
  752. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
  753. // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
  754. // maximum possilbe HTTP2 frame size.
  755. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
  756. // Client sends out empty data frame with endStream = true
  757. if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
  758. return false, err
  759. }
  760. str.itl.dequeue() // remove the empty data item from stream
  761. if str.itl.isEmpty() {
  762. str.state = empty
  763. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
  764. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  765. return false, err
  766. }
  767. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  768. return false, nil
  769. }
  770. } else {
  771. l.activeStreams.enqueue(str)
  772. }
  773. return false, nil
  774. }
  775. var (
  776. idx int
  777. buf []byte
  778. )
  779. if len(dataItem.h) != 0 { // data header has not been written out yet.
  780. buf = dataItem.h
  781. } else {
  782. idx = 1
  783. buf = dataItem.d
  784. }
  785. size := http2MaxFrameLen
  786. if len(buf) < size {
  787. size = len(buf)
  788. }
  789. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  790. str.state = waitingOnStreamQuota
  791. return false, nil
  792. } else if strQuota < size {
  793. size = strQuota
  794. }
  795. if l.sendQuota < uint32(size) { // connection-level flow control.
  796. size = int(l.sendQuota)
  797. }
  798. // Now that outgoing flow controls are checked we can replenish str's write quota
  799. str.wq.replenish(size)
  800. var endStream bool
  801. // If this is the last data message on this stream and all of it can be written in this iteration.
  802. if dataItem.endStream && size == len(buf) {
  803. // buf contains either data or it contains header but data is empty.
  804. if idx == 1 || len(dataItem.d) == 0 {
  805. endStream = true
  806. }
  807. }
  808. if dataItem.onEachWrite != nil {
  809. dataItem.onEachWrite()
  810. }
  811. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  812. return false, err
  813. }
  814. buf = buf[size:]
  815. str.bytesOutStanding += size
  816. l.sendQuota -= uint32(size)
  817. if idx == 0 {
  818. dataItem.h = buf
  819. } else {
  820. dataItem.d = buf
  821. }
  822. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
  823. str.itl.dequeue()
  824. }
  825. if str.itl.isEmpty() {
  826. str.state = empty
  827. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
  828. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  829. return false, err
  830. }
  831. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  832. return false, err
  833. }
  834. } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
  835. str.state = waitingOnStreamQuota
  836. } else { // Otherwise add it back to the list of active streams.
  837. l.activeStreams.enqueue(str)
  838. }
  839. return false, nil
  840. }