transport.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // HTTP client implementation. See RFC 2616.
  5. //
  6. // This is the low-level Transport implementation of RoundTripper.
  7. // The high-level interface is in client.go.
  8. package remote
  9. import (
  10. "bufio"
  11. "crypto/tls"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log"
  16. "net"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. // DefaultMaxIdleConnsPerHost is the default value of Transport's
  25. // MaxIdleConnsPerHost.
  26. const DefaultMaxIdleConnsPerHost = 2
  27. // Transport is an implementation of RoundTripper that supports HTTP,
  28. // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
  29. // Transport can also cache connections for future re-use.
  30. type Transport struct {
  31. idleMu sync.Mutex
  32. wantIdle bool // user has requested to close all idle conns
  33. idleConn map[connectMethodKey][]*persistConn
  34. idleConnCh map[connectMethodKey]chan *persistConn
  35. reqMu sync.Mutex
  36. reqCanceler map[*http.Request]func()
  37. reqCanceled map[*http.Request]bool
  38. altMu sync.RWMutex
  39. altProto map[string]http.RoundTripper // nil or map of URI scheme => RoundTripper
  40. // Dial specifies the dial function for creating unencrypted
  41. // TCP connections.
  42. // If Dial is nil, net.Dial is used.
  43. Dial func(network, addr string) (net.Conn, error)
  44. // DialTLS specifies an optional dial function for creating
  45. // TLS connections for non-proxied HTTPS requests.
  46. //
  47. // If DialTLS is nil, Dial and TLSClientConfig are used.
  48. //
  49. // If DialTLS is set, the Dial hook is not used for HTTPS
  50. // requests and the TLSClientConfig and TLSHandshakeTimeout
  51. // are ignored. The returned net.Conn is assumed to already be
  52. // past the TLS handshake.
  53. DialTLS func(network, addr string) (net.Conn, error)
  54. // TLSClientConfig specifies the TLS configuration to use with
  55. // tls.Client. If nil, the default configuration is used.
  56. TLSClientConfig *tls.Config
  57. // TLSHandshakeTimeout specifies the maximum amount of time waiting to
  58. // wait for a TLS handshake. Zero means no timeout.
  59. TLSHandshakeTimeout time.Duration
  60. // DisableKeepAlives, if true, prevents re-use of TCP connections
  61. // between different HTTP requests.
  62. DisableKeepAlives bool
  63. // DisableCompression, if true, prevents the Transport from
  64. // requesting compression with an "Accept-Encoding: gzip"
  65. // request header when the Request contains no existing
  66. // Accept-Encoding value. If the Transport requests gzip on
  67. // its own and gets a gzipped response, it's transparently
  68. // decoded in the Response.Body. However, if the user
  69. // explicitly requested gzip it is not automatically
  70. // uncompressed.
  71. DisableCompression bool
  72. // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
  73. // (keep-alive) to keep per-host. If zero,
  74. // DefaultMaxIdleConnsPerHost is used.
  75. MaxIdleConnsPerHost int
  76. // ResponseHeaderTimeout, if non-zero, specifies the amount of
  77. // time to wait for a server's response headers after fully
  78. // writing the request (including its body, if any). This
  79. // time does not include the time to read the response body.
  80. ResponseHeaderTimeout time.Duration
  81. // TODO: tunable on global max cached connections
  82. // TODO: tunable on timeout on cached connections
  83. }
  84. // transportRequest is a wrapper around a *Request that adds
  85. // optional extra headers to write.
  86. type transportRequest struct {
  87. *http.Request // original request, not to be mutated
  88. extra http.Header // extra headers to write, or nil
  89. }
  90. func (tr *transportRequest) extraHeaders() http.Header {
  91. if tr.extra == nil {
  92. tr.extra = make(http.Header)
  93. }
  94. return tr.extra
  95. }
  96. func closeBody(req *http.Request) {
  97. if req.Body != nil {
  98. req.Body.Close()
  99. }
  100. }
  101. func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
  102. // RoundTrip implements the RoundTripper interface.
  103. //
  104. // For higher-level HTTP client support (such as handling of cookies
  105. // and redirects), see Get, Post, and the Client type.
  106. func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
  107. if req.URL == nil {
  108. closeBody(req)
  109. return nil, errors.New("http: nil Request.URL")
  110. }
  111. if req.Header == nil {
  112. closeBody(req)
  113. return nil, errors.New("http: nil Request.Header")
  114. }
  115. if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
  116. closeBody(req)
  117. return nil, errors.New("http: bad scheme")
  118. }
  119. if req.URL.Host == "" {
  120. closeBody(req)
  121. return nil, errors.New("http: no Host in request URL")
  122. }
  123. treq := &transportRequest{Request: req}
  124. cm := t.connectMethodForRequest(treq)
  125. // Get the cached or newly-created connection to either the
  126. // host (for http or https), the http proxy, or the http proxy
  127. // pre-CONNECTed to https server. In any case, we'll be ready
  128. // to send it requests.
  129. pconn, err := t.getConn(req, cm)
  130. if err != nil {
  131. t.setReqCanceler(req, nil)
  132. closeBody(req)
  133. return nil, err
  134. }
  135. return pconn.roundTrip(treq)
  136. }
  137. // RegisterProtocol registers a new protocol with scheme.
  138. // The Transport will pass requests using the given scheme to rt.
  139. // It is rt's responsibility to simulate HTTP request semantics.
  140. //
  141. // RegisterProtocol can be used by other packages to provide
  142. // implementations of protocol schemes like "ftp" or "file".
  143. func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) {
  144. if scheme == "http" || scheme == "https" {
  145. panic("protocol " + scheme + " already registered")
  146. }
  147. t.altMu.Lock()
  148. defer t.altMu.Unlock()
  149. if t.altProto == nil {
  150. t.altProto = make(map[string]http.RoundTripper)
  151. }
  152. if _, exists := t.altProto[scheme]; exists {
  153. panic("protocol " + scheme + " already registered")
  154. }
  155. t.altProto[scheme] = rt
  156. }
  157. // CloseIdleConnections closes any connections which were previously
  158. // connected from previous requests but are now sitting idle in
  159. // a "keep-alive" state. It does not interrupt any connections currently
  160. // in use.
  161. func (t *Transport) CloseIdleConnections() {
  162. t.idleMu.Lock()
  163. m := t.idleConn
  164. t.idleConn = nil
  165. t.idleConnCh = nil
  166. t.wantIdle = true
  167. t.idleMu.Unlock()
  168. for _, conns := range m {
  169. for _, pconn := range conns {
  170. pconn.close()
  171. }
  172. }
  173. }
  174. // CancelRequest cancels an in-flight request by closing its connection.
  175. // CancelRequest should only be called after RoundTrip has returned.
  176. func (t *Transport) CancelRequest(req *http.Request) {
  177. t.reqMu.Lock()
  178. if cancel, ok := t.reqCanceler[req]; ok {
  179. delete(t.reqCanceler, req)
  180. t.reqMu.Unlock()
  181. if cancel != nil {
  182. cancel()
  183. }
  184. } else {
  185. if t.reqCanceled == nil {
  186. t.reqCanceled = make(map[*http.Request]bool)
  187. }
  188. t.reqCanceled[req] = true
  189. t.reqMu.Unlock()
  190. }
  191. }
  192. //
  193. // Private implementation past this point.
  194. //
  195. // envOnce looks up an environment variable (optionally by multiple
  196. // names) once. It mitigates expensive lookups on some platforms
  197. // (e.g. Windows).
  198. type envOnce struct {
  199. names []string
  200. once sync.Once
  201. val string
  202. }
  203. func (e *envOnce) Get() string {
  204. e.once.Do(e.init)
  205. return e.val
  206. }
  207. func (e *envOnce) init() {
  208. for _, n := range e.names {
  209. e.val = os.Getenv(n)
  210. if e.val != "" {
  211. return
  212. }
  213. }
  214. }
  215. // reset is used by tests
  216. func (e *envOnce) reset() {
  217. e.once = sync.Once{}
  218. e.val = ""
  219. }
  220. func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod) {
  221. cm.targetScheme = treq.URL.Scheme
  222. cm.targetAddr = canonicalAddr(treq.URL)
  223. return cm
  224. }
  225. // putIdleConn adds pconn to the list of idle persistent connections awaiting
  226. // a new request.
  227. // If pconn is no longer needed or not in a good state, putIdleConn
  228. // returns false.
  229. func (t *Transport) putIdleConn(pconn *persistConn) bool {
  230. if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
  231. pconn.close()
  232. return false
  233. }
  234. if pconn.isBroken() {
  235. return false
  236. }
  237. key := pconn.cacheKey
  238. max := t.MaxIdleConnsPerHost
  239. if max == 0 {
  240. max = DefaultMaxIdleConnsPerHost
  241. }
  242. t.idleMu.Lock()
  243. waitingDialer := t.idleConnCh[key]
  244. select {
  245. case waitingDialer <- pconn:
  246. // We're done with this pconn and somebody else is
  247. // currently waiting for a conn of this type (they're
  248. // actively dialing, but this conn is ready
  249. // first). Chrome calls this socket late binding. See
  250. // https://insouciant.org/tech/connection-management-in-chromium/
  251. t.idleMu.Unlock()
  252. return true
  253. default:
  254. if waitingDialer != nil {
  255. // They had populated this, but their dial won
  256. // first, so we can clean up this map entry.
  257. delete(t.idleConnCh, key)
  258. }
  259. }
  260. if t.wantIdle {
  261. t.idleMu.Unlock()
  262. pconn.close()
  263. return false
  264. }
  265. if t.idleConn == nil {
  266. t.idleConn = make(map[connectMethodKey][]*persistConn)
  267. }
  268. if len(t.idleConn[key]) >= max {
  269. t.idleMu.Unlock()
  270. pconn.close()
  271. return false
  272. }
  273. for _, exist := range t.idleConn[key] {
  274. if exist == pconn {
  275. log.Fatalf("dup idle pconn %p in freelist", pconn)
  276. }
  277. }
  278. t.idleConn[key] = append(t.idleConn[key], pconn)
  279. t.idleMu.Unlock()
  280. return true
  281. }
  282. // getIdleConnCh returns a channel to receive and return idle
  283. // persistent connection for the given connectMethod.
  284. // It may return nil, if persistent connections are not being used.
  285. func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
  286. if t.DisableKeepAlives {
  287. return nil
  288. }
  289. key := cm.key()
  290. t.idleMu.Lock()
  291. defer t.idleMu.Unlock()
  292. t.wantIdle = false
  293. if t.idleConnCh == nil {
  294. t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
  295. }
  296. ch, ok := t.idleConnCh[key]
  297. if !ok {
  298. ch = make(chan *persistConn)
  299. t.idleConnCh[key] = ch
  300. }
  301. return ch
  302. }
  303. func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
  304. key := cm.key()
  305. t.idleMu.Lock()
  306. defer t.idleMu.Unlock()
  307. if t.idleConn == nil {
  308. return nil
  309. }
  310. for {
  311. pconns, ok := t.idleConn[key]
  312. if !ok {
  313. return nil
  314. }
  315. if len(pconns) == 1 {
  316. pconn = pconns[0]
  317. delete(t.idleConn, key)
  318. } else {
  319. // 2 or more cached connections; pop last
  320. // TODO: queue?
  321. pconn = pconns[len(pconns)-1]
  322. t.idleConn[key] = pconns[:len(pconns)-1]
  323. }
  324. if !pconn.isBroken() {
  325. return
  326. }
  327. }
  328. }
  329. func (t *Transport) setReqCanceler(r *http.Request, fn func()) {
  330. t.reqMu.Lock()
  331. defer t.reqMu.Unlock()
  332. if t.reqCanceler == nil {
  333. t.reqCanceler = make(map[*http.Request]func())
  334. }
  335. if fn != nil {
  336. t.reqCanceler[r] = fn
  337. } else {
  338. delete(t.reqCanceler, r)
  339. }
  340. }
  341. // replaceReqCanceler replaces an existing cancel function. If there is no cancel function
  342. // for the request, we don't set the function and return false.
  343. // Since CancelRequest will clear the canceler, we can use the return value to detect if
  344. // the request was canceled since the last setReqCancel call.
  345. func (t *Transport) replaceReqCanceler(r *http.Request, fn func()) bool {
  346. t.reqMu.Lock()
  347. defer t.reqMu.Unlock()
  348. _, ok := t.reqCanceler[r]
  349. if !ok {
  350. return false
  351. }
  352. if fn != nil {
  353. t.reqCanceler[r] = fn
  354. } else {
  355. delete(t.reqCanceler, r)
  356. }
  357. return true
  358. }
  359. func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
  360. if t.Dial != nil {
  361. return t.Dial(network, addr)
  362. }
  363. return net.Dial(network, addr)
  364. }
  365. // Testing hooks:
  366. var prePendingDial, postPendingDial func()
  367. // getConn dials and creates a new persistConn to the target as
  368. // specified in the connectMethod. This includes doing a proxy CONNECT
  369. // and/or setting up TLS. If this doesn't return an error, the persistConn
  370. // is ready to write requests to.
  371. func (t *Transport) getConn(req *http.Request, cm connectMethod) (*persistConn, error) {
  372. if pc := t.getIdleConn(cm); pc != nil {
  373. // set request canceler to some non-nil function so we
  374. // can detect whether it was cleared between now and when
  375. // we enter roundTrip
  376. t.setReqCanceler(req, func() {})
  377. t.reqMu.Lock()
  378. if _, ok := t.reqCanceled[req]; ok {
  379. // request canceled before we knew about it
  380. delete(t.reqCanceled, req)
  381. pc.cancelRequest()
  382. }
  383. t.reqMu.Unlock()
  384. return pc, nil
  385. }
  386. type dialRes struct {
  387. pc *persistConn
  388. err error
  389. }
  390. dialc := make(chan dialRes)
  391. // Copy these hooks so we don't race on the postPendingDial in
  392. // the goroutine we launch. Issue 11136.
  393. prePendingDial := prePendingDial
  394. postPendingDial := postPendingDial
  395. handlePendingDial := func() {
  396. if prePendingDial != nil {
  397. prePendingDial()
  398. }
  399. go func() {
  400. if v := <-dialc; v.err == nil {
  401. t.putIdleConn(v.pc)
  402. }
  403. if postPendingDial != nil {
  404. postPendingDial()
  405. }
  406. }()
  407. }
  408. cancelc := make(chan struct{})
  409. t.setReqCanceler(req, func() { close(cancelc) })
  410. t.reqMu.Lock()
  411. if _, ok := t.reqCanceled[req]; ok {
  412. // request canceled before we knew about it
  413. if cancel, ok := t.reqCanceler[req]; ok {
  414. delete(t.reqCanceler, req)
  415. delete(t.reqCanceled, req)
  416. if cancel != nil {
  417. cancel()
  418. }
  419. }
  420. }
  421. t.reqMu.Unlock()
  422. go func() {
  423. pc, err := t.dialConn(cm)
  424. dialc <- dialRes{pc, err}
  425. }()
  426. idleConnCh := t.getIdleConnCh(cm)
  427. select {
  428. case v := <-dialc:
  429. // Our dial finished.
  430. return v.pc, v.err
  431. case pc := <-idleConnCh:
  432. // Another request finished first and its net.Conn
  433. // became available before our dial. Or somebody
  434. // else's dial that they didn't use.
  435. // But our dial is still going, so give it away
  436. // when it finishes:
  437. handlePendingDial()
  438. return pc, nil
  439. case <-cancelc:
  440. handlePendingDial()
  441. return nil, errors.New("net/http: request canceled while waiting for connection")
  442. }
  443. }
  444. func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
  445. pconn := &persistConn{
  446. t: t,
  447. cacheKey: cm.key(),
  448. reqch: make(chan requestAndChan, 1),
  449. writech: make(chan writeRequest, 1),
  450. closech: make(chan struct{}),
  451. writeErrCh: make(chan error, 1),
  452. }
  453. tlsDial := t.DialTLS != nil && cm.targetScheme == "https"
  454. if tlsDial {
  455. var err error
  456. pconn.conn, err = t.DialTLS("tcp", cm.addr())
  457. if err != nil {
  458. return nil, err
  459. }
  460. if tc, ok := pconn.conn.(*tls.Conn); ok {
  461. cs := tc.ConnectionState()
  462. pconn.tlsState = &cs
  463. }
  464. } else {
  465. conn, err := t.dial("tcp", cm.addr())
  466. if err != nil {
  467. return nil, err
  468. }
  469. pconn.conn = conn
  470. }
  471. if cm.targetScheme == "https" && !tlsDial {
  472. // Initiate TLS and check remote host name against certificate.
  473. cfg := t.TLSClientConfig
  474. if cfg == nil || cfg.ServerName == "" {
  475. host := cm.tlsHost()
  476. if cfg == nil {
  477. cfg = &tls.Config{ServerName: host}
  478. } else {
  479. clone := *cfg // shallow clone
  480. clone.ServerName = host
  481. cfg = &clone
  482. }
  483. }
  484. plainConn := pconn.conn
  485. tlsConn := tls.Client(plainConn, cfg)
  486. errc := make(chan error, 2)
  487. var timer *time.Timer // for canceling TLS handshake
  488. if d := t.TLSHandshakeTimeout; d != 0 {
  489. timer = time.AfterFunc(d, func() {
  490. errc <- tlsHandshakeTimeoutError{}
  491. })
  492. }
  493. go func() {
  494. err := tlsConn.Handshake()
  495. if timer != nil {
  496. timer.Stop()
  497. }
  498. errc <- err
  499. }()
  500. if err := <-errc; err != nil {
  501. plainConn.Close()
  502. return nil, err
  503. }
  504. if !cfg.InsecureSkipVerify {
  505. if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
  506. plainConn.Close()
  507. return nil, err
  508. }
  509. }
  510. cs := tlsConn.ConnectionState()
  511. pconn.tlsState = &cs
  512. pconn.conn = tlsConn
  513. }
  514. pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
  515. pconn.bw = bufio.NewWriter(pconn.conn)
  516. go pconn.readLoop()
  517. go pconn.writeLoop()
  518. return pconn, nil
  519. }
  520. // connectMethod is the map key (in its String form) for keeping persistent
  521. // TCP connections alive for subsequent HTTP requests.
  522. //
  523. // A connect method may be of the following types:
  524. //
  525. // Cache key form Description
  526. // ----------------- -------------------------
  527. // |http|foo.com http directly to server, no proxy
  528. // |https|foo.com https directly to server, no proxy
  529. // http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com
  530. // http://proxy.com|http http to proxy, http to anywhere after that
  531. //
  532. // Note: no support to https to the proxy yet.
  533. //
  534. type connectMethod struct {
  535. targetScheme string // "http" or "https"
  536. targetAddr string // Not used if proxy + http targetScheme (4th example in table)
  537. }
  538. func (cm *connectMethod) key() connectMethodKey {
  539. return connectMethodKey{
  540. scheme: cm.targetScheme,
  541. addr: cm.targetAddr,
  542. }
  543. }
  544. // addr returns the first hop "host:port" to which we need to TCP connect.
  545. func (cm *connectMethod) addr() string {
  546. return cm.targetAddr
  547. }
  548. // tlsHost returns the host name to match against the peer's
  549. // TLS certificate.
  550. func (cm *connectMethod) tlsHost() string {
  551. h := cm.targetAddr
  552. if hasPort(h) {
  553. h = h[:strings.LastIndex(h, ":")]
  554. }
  555. return h
  556. }
  557. // connectMethodKey is the map key version of connectMethod, with a
  558. // stringified proxy URL (or the empty string) instead of a pointer to
  559. // a URL.
  560. type connectMethodKey struct {
  561. scheme, addr string
  562. }
  563. func (k connectMethodKey) String() string {
  564. // Only used by tests.
  565. return fmt.Sprintf("%s|%s|%s", k.scheme, k.addr)
  566. }
  567. // persistConn wraps a connection, usually a persistent one
  568. // (but may be used for non-keep-alive requests as well)
  569. type persistConn struct {
  570. t *Transport
  571. cacheKey connectMethodKey
  572. conn net.Conn
  573. tlsState *tls.ConnectionState
  574. br *bufio.Reader // from conn
  575. sawEOF bool // whether we've seen EOF from conn; owned by readLoop
  576. bw *bufio.Writer // to conn
  577. reqch chan requestAndChan // written by roundTrip; read by readLoop
  578. writech chan writeRequest // written by roundTrip; read by writeLoop
  579. closech chan struct{} // closed when conn closed
  580. // writeErrCh passes the request write error (usually nil)
  581. // from the writeLoop goroutine to the readLoop which passes
  582. // it off to the res.Body reader, which then uses it to decide
  583. // whether or not a connection can be reused. Issue 7569.
  584. writeErrCh chan error
  585. lk sync.Mutex // guards following fields
  586. numExpectedResponses int
  587. closed bool // whether conn has been closed
  588. broken bool // an error has happened on this connection; marked broken so it's not reused.
  589. canceled bool // whether this conn was broken due a CancelRequest
  590. // mutateHeaderFunc is an optional func to modify extra
  591. // headers on each outbound request before it's written. (the
  592. // original Request given to RoundTrip is not modified)
  593. mutateHeaderFunc func(http.Header)
  594. }
  595. // isBroken reports whether this connection is in a known broken state.
  596. func (pc *persistConn) isBroken() bool {
  597. pc.lk.Lock()
  598. b := pc.broken
  599. pc.lk.Unlock()
  600. return b
  601. }
  602. // isCanceled reports whether this connection was closed due to CancelRequest.
  603. func (pc *persistConn) isCanceled() bool {
  604. pc.lk.Lock()
  605. defer pc.lk.Unlock()
  606. return pc.canceled
  607. }
  608. func (pc *persistConn) cancelRequest() {
  609. pc.lk.Lock()
  610. defer pc.lk.Unlock()
  611. pc.canceled = true
  612. pc.closeLocked()
  613. }
  614. func (pc *persistConn) readLoop() {
  615. // eofc is used to block http.Handler goroutines reading from Response.Body
  616. // at EOF until this goroutines has (potentially) added the connection
  617. // back to the idle pool.
  618. eofc := make(chan struct{})
  619. defer close(eofc) // unblock reader on errors
  620. // Read this once, before loop starts. (to avoid races in tests)
  621. testHookMu.Lock()
  622. testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
  623. testHookMu.Unlock()
  624. alive := true
  625. for alive {
  626. pb, err := pc.br.Peek(1)
  627. pc.lk.Lock()
  628. if pc.numExpectedResponses == 0 {
  629. if !pc.closed {
  630. pc.closeLocked()
  631. if len(pb) > 0 {
  632. log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
  633. string(pb), err)
  634. }
  635. }
  636. pc.lk.Unlock()
  637. return
  638. }
  639. pc.lk.Unlock()
  640. rc := <-pc.reqch
  641. var resp *http.Response
  642. if err == nil {
  643. resp, err = http.ReadResponse(pc.br, rc.req)
  644. if err == nil && resp.StatusCode == 100 {
  645. // Skip any 100-continue for now.
  646. // TODO(bradfitz): if rc.req had "Expect: 100-continue",
  647. // actually block the request body write and signal the
  648. // writeLoop now to begin sending it. (Issue 2184) For now we
  649. // eat it, since we're never expecting one.
  650. resp, err = http.ReadResponse(pc.br, rc.req)
  651. }
  652. }
  653. if resp != nil {
  654. resp.TLS = pc.tlsState
  655. }
  656. hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
  657. if err != nil {
  658. pc.close()
  659. } else {
  660. resp.Body = &bodyEOFSignal{body: resp.Body}
  661. }
  662. if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
  663. // Don't do keep-alive on error if either party requested a close
  664. // or we get an unexpected informational (1xx) response.
  665. // StatusCode 100 is already handled above.
  666. alive = false
  667. }
  668. var waitForBodyRead chan bool // channel is nil when there's no body
  669. if hasBody {
  670. waitForBodyRead = make(chan bool, 2)
  671. resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
  672. waitForBodyRead <- false
  673. return nil
  674. }
  675. resp.Body.(*bodyEOFSignal).fn = func(err error) error {
  676. isEOF := err == io.EOF
  677. waitForBodyRead <- isEOF
  678. if isEOF {
  679. <-eofc // see comment at top
  680. } else if err != nil && pc.isCanceled() {
  681. return errRequestCanceled
  682. }
  683. return err
  684. }
  685. }
  686. pc.lk.Lock()
  687. pc.numExpectedResponses--
  688. pc.lk.Unlock()
  689. // The connection might be going away when we put the
  690. // idleConn below. When that happens, we close the response channel to signal
  691. // to roundTrip that the connection is gone. roundTrip waits for
  692. // both closing and a response in a select, so it might choose
  693. // the close channel, rather than the response.
  694. // We send the response first so that roundTrip can check
  695. // if there is a pending one with a non-blocking select
  696. // on the response channel before erroring out.
  697. rc.ch <- responseAndError{resp, err}
  698. if hasBody {
  699. // To avoid a race, wait for the just-returned
  700. // response body to be fully consumed before peek on
  701. // the underlying bufio reader.
  702. select {
  703. case bodyEOF := <-waitForBodyRead:
  704. pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
  705. alive = alive &&
  706. bodyEOF &&
  707. !pc.sawEOF &&
  708. pc.wroteRequest() &&
  709. pc.t.putIdleConn(pc)
  710. if bodyEOF {
  711. eofc <- struct{}{}
  712. }
  713. case <-pc.closech:
  714. alive = false
  715. }
  716. } else {
  717. pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
  718. alive = alive &&
  719. !pc.sawEOF &&
  720. pc.wroteRequest() &&
  721. pc.t.putIdleConn(pc)
  722. }
  723. if hook := testHookReadLoopBeforeNextRead; hook != nil {
  724. hook()
  725. }
  726. }
  727. pc.close()
  728. }
  729. func (pc *persistConn) writeLoop() {
  730. for {
  731. select {
  732. case wr := <-pc.writech:
  733. if pc.isBroken() {
  734. wr.ch <- errors.New("http: can't write HTTP request on broken connection")
  735. continue
  736. }
  737. err := wr.req.Request.Write(pc.bw)
  738. if err == nil {
  739. err = pc.bw.Flush()
  740. }
  741. if err != nil {
  742. pc.markBroken()
  743. closeBody(wr.req.Request)
  744. }
  745. pc.writeErrCh <- err // to the body reader, which might recycle us
  746. wr.ch <- err // to the roundTrip function
  747. case <-pc.closech:
  748. return
  749. }
  750. }
  751. }
  752. // wroteRequest is a check before recycling a connection that the previous write
  753. // (from writeLoop above) happened and was successful.
  754. func (pc *persistConn) wroteRequest() bool {
  755. select {
  756. case err := <-pc.writeErrCh:
  757. // Common case: the write happened well before the response, so
  758. // avoid creating a timer.
  759. return err == nil
  760. default:
  761. // Rare case: the request was written in writeLoop above but
  762. // before it could send to pc.writeErrCh, the reader read it
  763. // all, processed it, and called us here. In this case, give the
  764. // write goroutine a bit of time to finish its send.
  765. //
  766. // Less rare case: We also get here in the legitimate case of
  767. // Issue 7569, where the writer is still writing (or stalled),
  768. // but the server has already replied. In this case, we don't
  769. // want to wait too long, and we want to return false so this
  770. // connection isn't re-used.
  771. select {
  772. case err := <-pc.writeErrCh:
  773. return err == nil
  774. case <-time.After(50 * time.Millisecond):
  775. return false
  776. }
  777. }
  778. }
  779. type responseAndError struct {
  780. res *http.Response
  781. err error
  782. }
  783. type requestAndChan struct {
  784. req *http.Request
  785. ch chan responseAndError
  786. }
  787. // A writeRequest is sent by the readLoop's goroutine to the
  788. // writeLoop's goroutine to write a request while the read loop
  789. // concurrently waits on both the write response and the server's
  790. // reply.
  791. type writeRequest struct {
  792. req *transportRequest
  793. ch chan<- error
  794. }
  795. type trHttpError struct {
  796. err string
  797. timeout bool
  798. }
  799. func (e *trHttpError) Error() string { return e.err }
  800. func (e *trHttpError) Timeout() bool { return e.timeout }
  801. func (e *trHttpError) Temporary() bool { return true }
  802. var errTimeout error = &trHttpError{err: "net/http: timeout awaiting response headers", timeout: true}
  803. var errClosed error = &trHttpError{err: "net/http: transport closed before response was received"}
  804. var errRequestCanceled = errors.New("net/http: request canceled")
  805. // nil except for tests
  806. var (
  807. testHookPersistConnClosedGotRes func()
  808. testHookEnterRoundTrip func()
  809. testHookMu sync.Locker = fakeLocker{} // guards following
  810. testHookReadLoopBeforeNextRead func()
  811. )
  812. func (pc *persistConn) roundTrip(req *transportRequest) (resp *http.Response, err error) {
  813. if hook := testHookEnterRoundTrip; hook != nil {
  814. hook()
  815. }
  816. if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
  817. pc.t.putIdleConn(pc)
  818. return nil, errRequestCanceled
  819. }
  820. pc.lk.Lock()
  821. pc.numExpectedResponses++
  822. headerFn := pc.mutateHeaderFunc
  823. pc.lk.Unlock()
  824. if headerFn != nil {
  825. headerFn(req.extraHeaders())
  826. }
  827. // Write the request concurrently with waiting for a response,
  828. // in case the server decides to reply before reading our full
  829. // request body.
  830. writeErrCh := make(chan error, 1)
  831. pc.writech <- writeRequest{req, writeErrCh}
  832. resc := make(chan responseAndError, 1)
  833. pc.reqch <- requestAndChan{req.Request, resc}
  834. var re responseAndError
  835. var respHeaderTimer <-chan time.Time
  836. WaitResponse:
  837. for {
  838. select {
  839. case err := <-writeErrCh:
  840. if err != nil {
  841. re = responseAndError{nil, err}
  842. pc.close()
  843. break WaitResponse
  844. }
  845. if d := pc.t.ResponseHeaderTimeout; d > 0 {
  846. timer := time.NewTimer(d)
  847. defer timer.Stop() // prevent leaks
  848. respHeaderTimer = timer.C
  849. }
  850. case <-pc.closech:
  851. // The persist connection is dead. This shouldn't
  852. // usually happen (only with Connection: close responses
  853. // with no response bodies), but if it does happen it
  854. // means either a) the remote server hung up on us
  855. // prematurely, or b) the readLoop sent us a response &
  856. // closed its closech at roughly the same time, and we
  857. // selected this case first. If we got a response, readLoop makes sure
  858. // to send it before it puts the conn and closes the channel.
  859. // That way, we can fetch the response, if there is one,
  860. // with a non-blocking receive.
  861. select {
  862. case re = <-resc:
  863. if fn := testHookPersistConnClosedGotRes; fn != nil {
  864. fn()
  865. }
  866. default:
  867. re = responseAndError{err: errClosed}
  868. }
  869. break WaitResponse
  870. case <-respHeaderTimer:
  871. pc.close()
  872. re = responseAndError{err: errTimeout}
  873. break WaitResponse
  874. case re = <-resc:
  875. break WaitResponse
  876. }
  877. }
  878. if re.err != nil {
  879. pc.t.setReqCanceler(req.Request, nil)
  880. }
  881. return re.res, re.err
  882. }
  883. // markBroken marks a connection as broken (so it's not reused).
  884. // It differs from close in that it doesn't close the underlying
  885. // connection for use when it's still being read.
  886. func (pc *persistConn) markBroken() {
  887. pc.lk.Lock()
  888. defer pc.lk.Unlock()
  889. pc.broken = true
  890. }
  891. func (pc *persistConn) close() {
  892. pc.lk.Lock()
  893. defer pc.lk.Unlock()
  894. pc.closeLocked()
  895. }
  896. func (pc *persistConn) closeLocked() {
  897. pc.broken = true
  898. if !pc.closed {
  899. pc.conn.Close()
  900. pc.closed = true
  901. close(pc.closech)
  902. }
  903. pc.mutateHeaderFunc = nil
  904. }
  905. var portMap = map[string]string{
  906. "http": "80",
  907. "https": "443",
  908. }
  909. // canonicalAddr returns url.Host but always with a ":port" suffix
  910. func canonicalAddr(url *url.URL) string {
  911. addr := url.Host
  912. if !hasPort(addr) {
  913. return addr + ":" + portMap[url.Scheme]
  914. }
  915. return addr
  916. }
  917. // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
  918. // once, right before its final (error-producing) Read or Close call
  919. // returns. fn should return the new error to return from Read or Close.
  920. //
  921. // If earlyCloseFn is non-nil and Close is called before io.EOF is
  922. // seen, earlyCloseFn is called instead of fn, and its return value is
  923. // the return value from Close.
  924. type bodyEOFSignal struct {
  925. body io.ReadCloser
  926. mu sync.Mutex // guards following 4 fields
  927. closed bool // whether Close has been called
  928. rerr error // sticky Read error
  929. fn func(error) error // err will be nil on Read io.EOF
  930. earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
  931. }
  932. func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
  933. es.mu.Lock()
  934. closed, rerr := es.closed, es.rerr
  935. es.mu.Unlock()
  936. if closed {
  937. return 0, errors.New("http: read on closed response body")
  938. }
  939. if rerr != nil {
  940. return 0, rerr
  941. }
  942. n, err = es.body.Read(p)
  943. if err != nil {
  944. es.mu.Lock()
  945. defer es.mu.Unlock()
  946. if es.rerr == nil {
  947. es.rerr = err
  948. }
  949. err = es.condfn(err)
  950. }
  951. return
  952. }
  953. func (es *bodyEOFSignal) Close() error {
  954. es.mu.Lock()
  955. defer es.mu.Unlock()
  956. if es.closed {
  957. return nil
  958. }
  959. es.closed = true
  960. if es.earlyCloseFn != nil && es.rerr != io.EOF {
  961. return es.earlyCloseFn()
  962. }
  963. err := es.body.Close()
  964. return es.condfn(err)
  965. }
  966. // caller must hold es.mu.
  967. func (es *bodyEOFSignal) condfn(err error) error {
  968. if es.fn == nil {
  969. return err
  970. }
  971. err = es.fn(err)
  972. es.fn = nil
  973. return err
  974. }
  975. type readerAndCloser struct {
  976. io.Reader
  977. io.Closer
  978. }
  979. type tlsHandshakeTimeoutError struct{}
  980. func (tlsHandshakeTimeoutError) Timeout() bool { return true }
  981. func (tlsHandshakeTimeoutError) Temporary() bool { return true }
  982. func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
  983. type noteEOFReader struct {
  984. r io.Reader
  985. sawEOF *bool
  986. }
  987. func (nr noteEOFReader) Read(p []byte) (n int, err error) {
  988. n, err = nr.r.Read(p)
  989. if err == io.EOF {
  990. *nr.sawEOF = true
  991. }
  992. return
  993. }
  994. // fakeLocker is a sync.Locker which does nothing. It's used to guard
  995. // test-only fields when not under test, to avoid runtime atomic
  996. // overhead.
  997. type fakeLocker struct{}
  998. func (fakeLocker) Lock() {}
  999. func (fakeLocker) Unlock() {}