123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110 |
- // Copyright 2011 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // HTTP client implementation. See RFC 2616.
- //
- // This is the low-level Transport implementation of RoundTripper.
- // The high-level interface is in client.go.
- package remote
- import (
- "bufio"
- "crypto/tls"
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync"
- "time"
- )
- // DefaultMaxIdleConnsPerHost is the default value of Transport's
- // MaxIdleConnsPerHost.
- const DefaultMaxIdleConnsPerHost = 2
- // Transport is an implementation of RoundTripper that supports HTTP,
- // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
- // Transport can also cache connections for future re-use.
- type Transport struct {
- idleMu sync.Mutex
- wantIdle bool // user has requested to close all idle conns
- idleConn map[connectMethodKey][]*persistConn
- idleConnCh map[connectMethodKey]chan *persistConn
- reqMu sync.Mutex
- reqCanceler map[*http.Request]func()
- reqCanceled map[*http.Request]bool
- altMu sync.RWMutex
- altProto map[string]http.RoundTripper // nil or map of URI scheme => RoundTripper
- // Dial specifies the dial function for creating unencrypted
- // TCP connections.
- // If Dial is nil, net.Dial is used.
- Dial func(network, addr string) (net.Conn, error)
- // DialTLS specifies an optional dial function for creating
- // TLS connections for non-proxied HTTPS requests.
- //
- // If DialTLS is nil, Dial and TLSClientConfig are used.
- //
- // If DialTLS is set, the Dial hook is not used for HTTPS
- // requests and the TLSClientConfig and TLSHandshakeTimeout
- // are ignored. The returned net.Conn is assumed to already be
- // past the TLS handshake.
- DialTLS func(network, addr string) (net.Conn, error)
- // TLSClientConfig specifies the TLS configuration to use with
- // tls.Client. If nil, the default configuration is used.
- TLSClientConfig *tls.Config
- // TLSHandshakeTimeout specifies the maximum amount of time waiting to
- // wait for a TLS handshake. Zero means no timeout.
- TLSHandshakeTimeout time.Duration
- // DisableKeepAlives, if true, prevents re-use of TCP connections
- // between different HTTP requests.
- DisableKeepAlives bool
- // DisableCompression, if true, prevents the Transport from
- // requesting compression with an "Accept-Encoding: gzip"
- // request header when the Request contains no existing
- // Accept-Encoding value. If the Transport requests gzip on
- // its own and gets a gzipped response, it's transparently
- // decoded in the Response.Body. However, if the user
- // explicitly requested gzip it is not automatically
- // uncompressed.
- DisableCompression bool
- // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
- // (keep-alive) to keep per-host. If zero,
- // DefaultMaxIdleConnsPerHost is used.
- MaxIdleConnsPerHost int
- // ResponseHeaderTimeout, if non-zero, specifies the amount of
- // time to wait for a server's response headers after fully
- // writing the request (including its body, if any). This
- // time does not include the time to read the response body.
- ResponseHeaderTimeout time.Duration
- // TODO: tunable on global max cached connections
- // TODO: tunable on timeout on cached connections
- }
- // transportRequest is a wrapper around a *Request that adds
- // optional extra headers to write.
- type transportRequest struct {
- *http.Request // original request, not to be mutated
- extra http.Header // extra headers to write, or nil
- }
- func (tr *transportRequest) extraHeaders() http.Header {
- if tr.extra == nil {
- tr.extra = make(http.Header)
- }
- return tr.extra
- }
- func closeBody(req *http.Request) {
- if req.Body != nil {
- req.Body.Close()
- }
- }
- func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
- // RoundTrip implements the RoundTripper interface.
- //
- // For higher-level HTTP client support (such as handling of cookies
- // and redirects), see Get, Post, and the Client type.
- func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
- if req.URL == nil {
- closeBody(req)
- return nil, errors.New("http: nil Request.URL")
- }
- if req.Header == nil {
- closeBody(req)
- return nil, errors.New("http: nil Request.Header")
- }
- if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
- closeBody(req)
- return nil, errors.New("http: bad scheme")
- }
- if req.URL.Host == "" {
- closeBody(req)
- return nil, errors.New("http: no Host in request URL")
- }
- treq := &transportRequest{Request: req}
- cm := t.connectMethodForRequest(treq)
- // Get the cached or newly-created connection to either the
- // host (for http or https), the http proxy, or the http proxy
- // pre-CONNECTed to https server. In any case, we'll be ready
- // to send it requests.
- pconn, err := t.getConn(req, cm)
- if err != nil {
- t.setReqCanceler(req, nil)
- closeBody(req)
- return nil, err
- }
- return pconn.roundTrip(treq)
- }
- // RegisterProtocol registers a new protocol with scheme.
- // The Transport will pass requests using the given scheme to rt.
- // It is rt's responsibility to simulate HTTP request semantics.
- //
- // RegisterProtocol can be used by other packages to provide
- // implementations of protocol schemes like "ftp" or "file".
- func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) {
- if scheme == "http" || scheme == "https" {
- panic("protocol " + scheme + " already registered")
- }
- t.altMu.Lock()
- defer t.altMu.Unlock()
- if t.altProto == nil {
- t.altProto = make(map[string]http.RoundTripper)
- }
- if _, exists := t.altProto[scheme]; exists {
- panic("protocol " + scheme + " already registered")
- }
- t.altProto[scheme] = rt
- }
- // CloseIdleConnections closes any connections which were previously
- // connected from previous requests but are now sitting idle in
- // a "keep-alive" state. It does not interrupt any connections currently
- // in use.
- func (t *Transport) CloseIdleConnections() {
- t.idleMu.Lock()
- m := t.idleConn
- t.idleConn = nil
- t.idleConnCh = nil
- t.wantIdle = true
- t.idleMu.Unlock()
- for _, conns := range m {
- for _, pconn := range conns {
- pconn.close()
- }
- }
- }
- // CancelRequest cancels an in-flight request by closing its connection.
- // CancelRequest should only be called after RoundTrip has returned.
- func (t *Transport) CancelRequest(req *http.Request) {
- t.reqMu.Lock()
- if cancel, ok := t.reqCanceler[req]; ok {
- delete(t.reqCanceler, req)
- t.reqMu.Unlock()
- if cancel != nil {
- cancel()
- }
- } else {
- if t.reqCanceled == nil {
- t.reqCanceled = make(map[*http.Request]bool)
- }
- t.reqCanceled[req] = true
- t.reqMu.Unlock()
- }
- }
- //
- // Private implementation past this point.
- //
- // envOnce looks up an environment variable (optionally by multiple
- // names) once. It mitigates expensive lookups on some platforms
- // (e.g. Windows).
- type envOnce struct {
- names []string
- once sync.Once
- val string
- }
- func (e *envOnce) Get() string {
- e.once.Do(e.init)
- return e.val
- }
- func (e *envOnce) init() {
- for _, n := range e.names {
- e.val = os.Getenv(n)
- if e.val != "" {
- return
- }
- }
- }
- // reset is used by tests
- func (e *envOnce) reset() {
- e.once = sync.Once{}
- e.val = ""
- }
- func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod) {
- cm.targetScheme = treq.URL.Scheme
- cm.targetAddr = canonicalAddr(treq.URL)
- return cm
- }
- // putIdleConn adds pconn to the list of idle persistent connections awaiting
- // a new request.
- // If pconn is no longer needed or not in a good state, putIdleConn
- // returns false.
- func (t *Transport) putIdleConn(pconn *persistConn) bool {
- if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
- pconn.close()
- return false
- }
- if pconn.isBroken() {
- return false
- }
- key := pconn.cacheKey
- max := t.MaxIdleConnsPerHost
- if max == 0 {
- max = DefaultMaxIdleConnsPerHost
- }
- t.idleMu.Lock()
- waitingDialer := t.idleConnCh[key]
- select {
- case waitingDialer <- pconn:
- // We're done with this pconn and somebody else is
- // currently waiting for a conn of this type (they're
- // actively dialing, but this conn is ready
- // first). Chrome calls this socket late binding. See
- // https://insouciant.org/tech/connection-management-in-chromium/
- t.idleMu.Unlock()
- return true
- default:
- if waitingDialer != nil {
- // They had populated this, but their dial won
- // first, so we can clean up this map entry.
- delete(t.idleConnCh, key)
- }
- }
- if t.wantIdle {
- t.idleMu.Unlock()
- pconn.close()
- return false
- }
- if t.idleConn == nil {
- t.idleConn = make(map[connectMethodKey][]*persistConn)
- }
- if len(t.idleConn[key]) >= max {
- t.idleMu.Unlock()
- pconn.close()
- return false
- }
- for _, exist := range t.idleConn[key] {
- if exist == pconn {
- log.Fatalf("dup idle pconn %p in freelist", pconn)
- }
- }
- t.idleConn[key] = append(t.idleConn[key], pconn)
- t.idleMu.Unlock()
- return true
- }
- // getIdleConnCh returns a channel to receive and return idle
- // persistent connection for the given connectMethod.
- // It may return nil, if persistent connections are not being used.
- func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
- if t.DisableKeepAlives {
- return nil
- }
- key := cm.key()
- t.idleMu.Lock()
- defer t.idleMu.Unlock()
- t.wantIdle = false
- if t.idleConnCh == nil {
- t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
- }
- ch, ok := t.idleConnCh[key]
- if !ok {
- ch = make(chan *persistConn)
- t.idleConnCh[key] = ch
- }
- return ch
- }
- func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
- key := cm.key()
- t.idleMu.Lock()
- defer t.idleMu.Unlock()
- if t.idleConn == nil {
- return nil
- }
- for {
- pconns, ok := t.idleConn[key]
- if !ok {
- return nil
- }
- if len(pconns) == 1 {
- pconn = pconns[0]
- delete(t.idleConn, key)
- } else {
- // 2 or more cached connections; pop last
- // TODO: queue?
- pconn = pconns[len(pconns)-1]
- t.idleConn[key] = pconns[:len(pconns)-1]
- }
- if !pconn.isBroken() {
- return
- }
- }
- }
- func (t *Transport) setReqCanceler(r *http.Request, fn func()) {
- t.reqMu.Lock()
- defer t.reqMu.Unlock()
- if t.reqCanceler == nil {
- t.reqCanceler = make(map[*http.Request]func())
- }
- if fn != nil {
- t.reqCanceler[r] = fn
- } else {
- delete(t.reqCanceler, r)
- }
- }
- // replaceReqCanceler replaces an existing cancel function. If there is no cancel function
- // for the request, we don't set the function and return false.
- // Since CancelRequest will clear the canceler, we can use the return value to detect if
- // the request was canceled since the last setReqCancel call.
- func (t *Transport) replaceReqCanceler(r *http.Request, fn func()) bool {
- t.reqMu.Lock()
- defer t.reqMu.Unlock()
- _, ok := t.reqCanceler[r]
- if !ok {
- return false
- }
- if fn != nil {
- t.reqCanceler[r] = fn
- } else {
- delete(t.reqCanceler, r)
- }
- return true
- }
- func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
- if t.Dial != nil {
- return t.Dial(network, addr)
- }
- return net.Dial(network, addr)
- }
- // Testing hooks:
- var prePendingDial, postPendingDial func()
- // getConn dials and creates a new persistConn to the target as
- // specified in the connectMethod. This includes doing a proxy CONNECT
- // and/or setting up TLS. If this doesn't return an error, the persistConn
- // is ready to write requests to.
- func (t *Transport) getConn(req *http.Request, cm connectMethod) (*persistConn, error) {
- if pc := t.getIdleConn(cm); pc != nil {
- // set request canceler to some non-nil function so we
- // can detect whether it was cleared between now and when
- // we enter roundTrip
- t.setReqCanceler(req, func() {})
- t.reqMu.Lock()
- if _, ok := t.reqCanceled[req]; ok {
- // request canceled before we knew about it
- delete(t.reqCanceled, req)
- pc.cancelRequest()
- }
- t.reqMu.Unlock()
- return pc, nil
- }
- type dialRes struct {
- pc *persistConn
- err error
- }
- dialc := make(chan dialRes)
- // Copy these hooks so we don't race on the postPendingDial in
- // the goroutine we launch. Issue 11136.
- prePendingDial := prePendingDial
- postPendingDial := postPendingDial
- handlePendingDial := func() {
- if prePendingDial != nil {
- prePendingDial()
- }
- go func() {
- if v := <-dialc; v.err == nil {
- t.putIdleConn(v.pc)
- }
- if postPendingDial != nil {
- postPendingDial()
- }
- }()
- }
- cancelc := make(chan struct{})
- t.setReqCanceler(req, func() { close(cancelc) })
- t.reqMu.Lock()
- if _, ok := t.reqCanceled[req]; ok {
- // request canceled before we knew about it
- if cancel, ok := t.reqCanceler[req]; ok {
- delete(t.reqCanceler, req)
- delete(t.reqCanceled, req)
- if cancel != nil {
- cancel()
- }
- }
- }
- t.reqMu.Unlock()
- go func() {
- pc, err := t.dialConn(cm)
- dialc <- dialRes{pc, err}
- }()
- idleConnCh := t.getIdleConnCh(cm)
- select {
- case v := <-dialc:
- // Our dial finished.
- return v.pc, v.err
- case pc := <-idleConnCh:
- // Another request finished first and its net.Conn
- // became available before our dial. Or somebody
- // else's dial that they didn't use.
- // But our dial is still going, so give it away
- // when it finishes:
- handlePendingDial()
- return pc, nil
- case <-cancelc:
- handlePendingDial()
- return nil, errors.New("net/http: request canceled while waiting for connection")
- }
- }
- func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
- pconn := &persistConn{
- t: t,
- cacheKey: cm.key(),
- reqch: make(chan requestAndChan, 1),
- writech: make(chan writeRequest, 1),
- closech: make(chan struct{}),
- writeErrCh: make(chan error, 1),
- }
- tlsDial := t.DialTLS != nil && cm.targetScheme == "https"
- if tlsDial {
- var err error
- pconn.conn, err = t.DialTLS("tcp", cm.addr())
- if err != nil {
- return nil, err
- }
- if tc, ok := pconn.conn.(*tls.Conn); ok {
- cs := tc.ConnectionState()
- pconn.tlsState = &cs
- }
- } else {
- conn, err := t.dial("tcp", cm.addr())
- if err != nil {
- return nil, err
- }
- pconn.conn = conn
- }
- if cm.targetScheme == "https" && !tlsDial {
- // Initiate TLS and check remote host name against certificate.
- cfg := t.TLSClientConfig
- if cfg == nil || cfg.ServerName == "" {
- host := cm.tlsHost()
- if cfg == nil {
- cfg = &tls.Config{ServerName: host}
- } else {
- clone := *cfg // shallow clone
- clone.ServerName = host
- cfg = &clone
- }
- }
- plainConn := pconn.conn
- tlsConn := tls.Client(plainConn, cfg)
- errc := make(chan error, 2)
- var timer *time.Timer // for canceling TLS handshake
- if d := t.TLSHandshakeTimeout; d != 0 {
- timer = time.AfterFunc(d, func() {
- errc <- tlsHandshakeTimeoutError{}
- })
- }
- go func() {
- err := tlsConn.Handshake()
- if timer != nil {
- timer.Stop()
- }
- errc <- err
- }()
- if err := <-errc; err != nil {
- plainConn.Close()
- return nil, err
- }
- if !cfg.InsecureSkipVerify {
- if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
- plainConn.Close()
- return nil, err
- }
- }
- cs := tlsConn.ConnectionState()
- pconn.tlsState = &cs
- pconn.conn = tlsConn
- }
- pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
- pconn.bw = bufio.NewWriter(pconn.conn)
- go pconn.readLoop()
- go pconn.writeLoop()
- return pconn, nil
- }
- // connectMethod is the map key (in its String form) for keeping persistent
- // TCP connections alive for subsequent HTTP requests.
- //
- // A connect method may be of the following types:
- //
- // Cache key form Description
- // ----------------- -------------------------
- // |http|foo.com http directly to server, no proxy
- // |https|foo.com https directly to server, no proxy
- // http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com
- // http://proxy.com|http http to proxy, http to anywhere after that
- //
- // Note: no support to https to the proxy yet.
- //
- type connectMethod struct {
- targetScheme string // "http" or "https"
- targetAddr string // Not used if proxy + http targetScheme (4th example in table)
- }
- func (cm *connectMethod) key() connectMethodKey {
- return connectMethodKey{
- scheme: cm.targetScheme,
- addr: cm.targetAddr,
- }
- }
- // addr returns the first hop "host:port" to which we need to TCP connect.
- func (cm *connectMethod) addr() string {
- return cm.targetAddr
- }
- // tlsHost returns the host name to match against the peer's
- // TLS certificate.
- func (cm *connectMethod) tlsHost() string {
- h := cm.targetAddr
- if hasPort(h) {
- h = h[:strings.LastIndex(h, ":")]
- }
- return h
- }
- // connectMethodKey is the map key version of connectMethod, with a
- // stringified proxy URL (or the empty string) instead of a pointer to
- // a URL.
- type connectMethodKey struct {
- scheme, addr string
- }
- func (k connectMethodKey) String() string {
- // Only used by tests.
- return fmt.Sprintf("%s|%s|%s", k.scheme, k.addr)
- }
- // persistConn wraps a connection, usually a persistent one
- // (but may be used for non-keep-alive requests as well)
- type persistConn struct {
- t *Transport
- cacheKey connectMethodKey
- conn net.Conn
- tlsState *tls.ConnectionState
- br *bufio.Reader // from conn
- sawEOF bool // whether we've seen EOF from conn; owned by readLoop
- bw *bufio.Writer // to conn
- reqch chan requestAndChan // written by roundTrip; read by readLoop
- writech chan writeRequest // written by roundTrip; read by writeLoop
- closech chan struct{} // closed when conn closed
- // writeErrCh passes the request write error (usually nil)
- // from the writeLoop goroutine to the readLoop which passes
- // it off to the res.Body reader, which then uses it to decide
- // whether or not a connection can be reused. Issue 7569.
- writeErrCh chan error
- lk sync.Mutex // guards following fields
- numExpectedResponses int
- closed bool // whether conn has been closed
- broken bool // an error has happened on this connection; marked broken so it's not reused.
- canceled bool // whether this conn was broken due a CancelRequest
- // mutateHeaderFunc is an optional func to modify extra
- // headers on each outbound request before it's written. (the
- // original Request given to RoundTrip is not modified)
- mutateHeaderFunc func(http.Header)
- }
- // isBroken reports whether this connection is in a known broken state.
- func (pc *persistConn) isBroken() bool {
- pc.lk.Lock()
- b := pc.broken
- pc.lk.Unlock()
- return b
- }
- // isCanceled reports whether this connection was closed due to CancelRequest.
- func (pc *persistConn) isCanceled() bool {
- pc.lk.Lock()
- defer pc.lk.Unlock()
- return pc.canceled
- }
- func (pc *persistConn) cancelRequest() {
- pc.lk.Lock()
- defer pc.lk.Unlock()
- pc.canceled = true
- pc.closeLocked()
- }
- func (pc *persistConn) readLoop() {
- // eofc is used to block http.Handler goroutines reading from Response.Body
- // at EOF until this goroutines has (potentially) added the connection
- // back to the idle pool.
- eofc := make(chan struct{})
- defer close(eofc) // unblock reader on errors
- // Read this once, before loop starts. (to avoid races in tests)
- testHookMu.Lock()
- testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
- testHookMu.Unlock()
- alive := true
- for alive {
- pb, err := pc.br.Peek(1)
- pc.lk.Lock()
- if pc.numExpectedResponses == 0 {
- if !pc.closed {
- pc.closeLocked()
- if len(pb) > 0 {
- log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
- string(pb), err)
- }
- }
- pc.lk.Unlock()
- return
- }
- pc.lk.Unlock()
- rc := <-pc.reqch
- var resp *http.Response
- if err == nil {
- resp, err = http.ReadResponse(pc.br, rc.req)
- if err == nil && resp.StatusCode == 100 {
- // Skip any 100-continue for now.
- // TODO(bradfitz): if rc.req had "Expect: 100-continue",
- // actually block the request body write and signal the
- // writeLoop now to begin sending it. (Issue 2184) For now we
- // eat it, since we're never expecting one.
- resp, err = http.ReadResponse(pc.br, rc.req)
- }
- }
- if resp != nil {
- resp.TLS = pc.tlsState
- }
- hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
- if err != nil {
- pc.close()
- } else {
- resp.Body = &bodyEOFSignal{body: resp.Body}
- }
- if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
- // Don't do keep-alive on error if either party requested a close
- // or we get an unexpected informational (1xx) response.
- // StatusCode 100 is already handled above.
- alive = false
- }
- var waitForBodyRead chan bool // channel is nil when there's no body
- if hasBody {
- waitForBodyRead = make(chan bool, 2)
- resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
- waitForBodyRead <- false
- return nil
- }
- resp.Body.(*bodyEOFSignal).fn = func(err error) error {
- isEOF := err == io.EOF
- waitForBodyRead <- isEOF
- if isEOF {
- <-eofc // see comment at top
- } else if err != nil && pc.isCanceled() {
- return errRequestCanceled
- }
- return err
- }
- }
- pc.lk.Lock()
- pc.numExpectedResponses--
- pc.lk.Unlock()
- // The connection might be going away when we put the
- // idleConn below. When that happens, we close the response channel to signal
- // to roundTrip that the connection is gone. roundTrip waits for
- // both closing and a response in a select, so it might choose
- // the close channel, rather than the response.
- // We send the response first so that roundTrip can check
- // if there is a pending one with a non-blocking select
- // on the response channel before erroring out.
- rc.ch <- responseAndError{resp, err}
- if hasBody {
- // To avoid a race, wait for the just-returned
- // response body to be fully consumed before peek on
- // the underlying bufio reader.
- select {
- case bodyEOF := <-waitForBodyRead:
- pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
- alive = alive &&
- bodyEOF &&
- !pc.sawEOF &&
- pc.wroteRequest() &&
- pc.t.putIdleConn(pc)
- if bodyEOF {
- eofc <- struct{}{}
- }
- case <-pc.closech:
- alive = false
- }
- } else {
- pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
- alive = alive &&
- !pc.sawEOF &&
- pc.wroteRequest() &&
- pc.t.putIdleConn(pc)
- }
- if hook := testHookReadLoopBeforeNextRead; hook != nil {
- hook()
- }
- }
- pc.close()
- }
- func (pc *persistConn) writeLoop() {
- for {
- select {
- case wr := <-pc.writech:
- if pc.isBroken() {
- wr.ch <- errors.New("http: can't write HTTP request on broken connection")
- continue
- }
- err := wr.req.Request.Write(pc.bw)
- if err == nil {
- err = pc.bw.Flush()
- }
- if err != nil {
- pc.markBroken()
- closeBody(wr.req.Request)
- }
- pc.writeErrCh <- err // to the body reader, which might recycle us
- wr.ch <- err // to the roundTrip function
- case <-pc.closech:
- return
- }
- }
- }
- // wroteRequest is a check before recycling a connection that the previous write
- // (from writeLoop above) happened and was successful.
- func (pc *persistConn) wroteRequest() bool {
- select {
- case err := <-pc.writeErrCh:
- // Common case: the write happened well before the response, so
- // avoid creating a timer.
- return err == nil
- default:
- // Rare case: the request was written in writeLoop above but
- // before it could send to pc.writeErrCh, the reader read it
- // all, processed it, and called us here. In this case, give the
- // write goroutine a bit of time to finish its send.
- //
- // Less rare case: We also get here in the legitimate case of
- // Issue 7569, where the writer is still writing (or stalled),
- // but the server has already replied. In this case, we don't
- // want to wait too long, and we want to return false so this
- // connection isn't re-used.
- select {
- case err := <-pc.writeErrCh:
- return err == nil
- case <-time.After(50 * time.Millisecond):
- return false
- }
- }
- }
- type responseAndError struct {
- res *http.Response
- err error
- }
- type requestAndChan struct {
- req *http.Request
- ch chan responseAndError
- }
- // A writeRequest is sent by the readLoop's goroutine to the
- // writeLoop's goroutine to write a request while the read loop
- // concurrently waits on both the write response and the server's
- // reply.
- type writeRequest struct {
- req *transportRequest
- ch chan<- error
- }
- type trHttpError struct {
- err string
- timeout bool
- }
- func (e *trHttpError) Error() string { return e.err }
- func (e *trHttpError) Timeout() bool { return e.timeout }
- func (e *trHttpError) Temporary() bool { return true }
- var errTimeout error = &trHttpError{err: "net/http: timeout awaiting response headers", timeout: true}
- var errClosed error = &trHttpError{err: "net/http: transport closed before response was received"}
- var errRequestCanceled = errors.New("net/http: request canceled")
- // nil except for tests
- var (
- testHookPersistConnClosedGotRes func()
- testHookEnterRoundTrip func()
- testHookMu sync.Locker = fakeLocker{} // guards following
- testHookReadLoopBeforeNextRead func()
- )
- func (pc *persistConn) roundTrip(req *transportRequest) (resp *http.Response, err error) {
- if hook := testHookEnterRoundTrip; hook != nil {
- hook()
- }
- if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
- pc.t.putIdleConn(pc)
- return nil, errRequestCanceled
- }
- pc.lk.Lock()
- pc.numExpectedResponses++
- headerFn := pc.mutateHeaderFunc
- pc.lk.Unlock()
- if headerFn != nil {
- headerFn(req.extraHeaders())
- }
- // Write the request concurrently with waiting for a response,
- // in case the server decides to reply before reading our full
- // request body.
- writeErrCh := make(chan error, 1)
- pc.writech <- writeRequest{req, writeErrCh}
- resc := make(chan responseAndError, 1)
- pc.reqch <- requestAndChan{req.Request, resc}
- var re responseAndError
- var respHeaderTimer <-chan time.Time
- WaitResponse:
- for {
- select {
- case err := <-writeErrCh:
- if err != nil {
- re = responseAndError{nil, err}
- pc.close()
- break WaitResponse
- }
- if d := pc.t.ResponseHeaderTimeout; d > 0 {
- timer := time.NewTimer(d)
- defer timer.Stop() // prevent leaks
- respHeaderTimer = timer.C
- }
- case <-pc.closech:
- // The persist connection is dead. This shouldn't
- // usually happen (only with Connection: close responses
- // with no response bodies), but if it does happen it
- // means either a) the remote server hung up on us
- // prematurely, or b) the readLoop sent us a response &
- // closed its closech at roughly the same time, and we
- // selected this case first. If we got a response, readLoop makes sure
- // to send it before it puts the conn and closes the channel.
- // That way, we can fetch the response, if there is one,
- // with a non-blocking receive.
- select {
- case re = <-resc:
- if fn := testHookPersistConnClosedGotRes; fn != nil {
- fn()
- }
- default:
- re = responseAndError{err: errClosed}
- }
- break WaitResponse
- case <-respHeaderTimer:
- pc.close()
- re = responseAndError{err: errTimeout}
- break WaitResponse
- case re = <-resc:
- break WaitResponse
- }
- }
- if re.err != nil {
- pc.t.setReqCanceler(req.Request, nil)
- }
- return re.res, re.err
- }
- // markBroken marks a connection as broken (so it's not reused).
- // It differs from close in that it doesn't close the underlying
- // connection for use when it's still being read.
- func (pc *persistConn) markBroken() {
- pc.lk.Lock()
- defer pc.lk.Unlock()
- pc.broken = true
- }
- func (pc *persistConn) close() {
- pc.lk.Lock()
- defer pc.lk.Unlock()
- pc.closeLocked()
- }
- func (pc *persistConn) closeLocked() {
- pc.broken = true
- if !pc.closed {
- pc.conn.Close()
- pc.closed = true
- close(pc.closech)
- }
- pc.mutateHeaderFunc = nil
- }
- var portMap = map[string]string{
- "http": "80",
- "https": "443",
- }
- // canonicalAddr returns url.Host but always with a ":port" suffix
- func canonicalAddr(url *url.URL) string {
- addr := url.Host
- if !hasPort(addr) {
- return addr + ":" + portMap[url.Scheme]
- }
- return addr
- }
- // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
- // once, right before its final (error-producing) Read or Close call
- // returns. fn should return the new error to return from Read or Close.
- //
- // If earlyCloseFn is non-nil and Close is called before io.EOF is
- // seen, earlyCloseFn is called instead of fn, and its return value is
- // the return value from Close.
- type bodyEOFSignal struct {
- body io.ReadCloser
- mu sync.Mutex // guards following 4 fields
- closed bool // whether Close has been called
- rerr error // sticky Read error
- fn func(error) error // err will be nil on Read io.EOF
- earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
- }
- func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
- es.mu.Lock()
- closed, rerr := es.closed, es.rerr
- es.mu.Unlock()
- if closed {
- return 0, errors.New("http: read on closed response body")
- }
- if rerr != nil {
- return 0, rerr
- }
- n, err = es.body.Read(p)
- if err != nil {
- es.mu.Lock()
- defer es.mu.Unlock()
- if es.rerr == nil {
- es.rerr = err
- }
- err = es.condfn(err)
- }
- return
- }
- func (es *bodyEOFSignal) Close() error {
- es.mu.Lock()
- defer es.mu.Unlock()
- if es.closed {
- return nil
- }
- es.closed = true
- if es.earlyCloseFn != nil && es.rerr != io.EOF {
- return es.earlyCloseFn()
- }
- err := es.body.Close()
- return es.condfn(err)
- }
- // caller must hold es.mu.
- func (es *bodyEOFSignal) condfn(err error) error {
- if es.fn == nil {
- return err
- }
- err = es.fn(err)
- es.fn = nil
- return err
- }
- type readerAndCloser struct {
- io.Reader
- io.Closer
- }
- type tlsHandshakeTimeoutError struct{}
- func (tlsHandshakeTimeoutError) Timeout() bool { return true }
- func (tlsHandshakeTimeoutError) Temporary() bool { return true }
- func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
- type noteEOFReader struct {
- r io.Reader
- sawEOF *bool
- }
- func (nr noteEOFReader) Read(p []byte) (n int, err error) {
- n, err = nr.r.Read(p)
- if err == io.EOF {
- *nr.sawEOF = true
- }
- return
- }
- // fakeLocker is a sync.Locker which does nothing. It's used to guard
- // test-only fields when not under test, to avoid runtime atomic
- // overhead.
- type fakeLocker struct{}
- func (fakeLocker) Lock() {}
- func (fakeLocker) Unlock() {}
|