Parcourir la source

Merge pull request #282 from dcbw/test-races

remote: ensure testing HTTP requests are canceled
Eugene Yakubovich il y a 9 ans
Parent
commit
9d1f082e96
3 fichiers modifiés avec 1143 ajouts et 4 suppressions
  1. 23 2
      remote/client.go
  2. 10 2
      remote/remote_test.go
  3. 1110 0
      remote/transport.go

+ 23 - 2
remote/client.go

@@ -19,8 +19,10 @@ import (
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"net"
 	"net/http"
 	"path"
+	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
@@ -31,7 +33,26 @@ import (
 // implements subnet.Manager by sending requests to the server
 type RemoteManager struct {
 	base      string // includes scheme, host, and port, and version
-	transport *http.Transport
+	transport *Transport
+}
+
+func NewTransport(info transport.TLSInfo) (*Transport, error) {
+	cfg, err := info.ClientConfig()
+	if err != nil {
+		return nil, err
+	}
+
+	t := &Transport{
+		// timeouts taken from http.DefaultTransport
+		Dial: (&net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+		}).Dial,
+		TLSHandshakeTimeout: 10 * time.Second,
+		TLSClientConfig:     cfg,
+	}
+
+	return t, nil
 }
 
 func NewRemoteManager(listenAddr, cafile, certfile, keyfile string) (subnet.Manager, error) {
@@ -41,7 +62,7 @@ func NewRemoteManager(listenAddr, cafile, certfile, keyfile string) (subnet.Mana
 		KeyFile:  keyfile,
 	}
 
-	t, err := transport.NewTransport(tls)
+	t, err := NewTransport(tls)
 	if err != nil {
 		return nil, err
 	}

+ 10 - 2
remote/remote_test.go

@@ -125,10 +125,18 @@ func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string) {
 
 func doTestWatch(t *testing.T, sm subnet.Manager) {
 	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	defer func() {
+		cancel()
+		wg.Wait()
+	}()
 
 	events := make(chan []subnet.Event)
-	go subnet.WatchLeases(ctx, sm, "_", events)
+	go func() {
+		subnet.WatchLeases(ctx, sm, "_", events)
+		wg.Done()
+	}()
 
 	// skip over the initial snapshot
 	<-events

+ 1110 - 0
remote/transport.go

@@ -0,0 +1,1110 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// HTTP client implementation. See RFC 2616.
+//
+// This is the low-level Transport implementation of RoundTripper.
+// The high-level interface is in client.go.
+
+package remote
+
+import (
+	"bufio"
+	"crypto/tls"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"strings"
+	"sync"
+	"time"
+)
+
+// DefaultMaxIdleConnsPerHost is the default value of Transport's
+// MaxIdleConnsPerHost.
+const DefaultMaxIdleConnsPerHost = 2
+
+// Transport is an implementation of RoundTripper that supports HTTP,
+// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
+// Transport can also cache connections for future re-use.
+type Transport struct {
+	idleMu     sync.Mutex
+	wantIdle   bool // user has requested to close all idle conns
+	idleConn   map[connectMethodKey][]*persistConn
+	idleConnCh map[connectMethodKey]chan *persistConn
+
+	reqMu       sync.Mutex
+	reqCanceler map[*http.Request]func()
+	reqCanceled map[*http.Request]bool
+
+	altMu    sync.RWMutex
+	altProto map[string]http.RoundTripper // nil or map of URI scheme => RoundTripper
+
+	// Dial specifies the dial function for creating unencrypted
+	// TCP connections.
+	// If Dial is nil, net.Dial is used.
+	Dial func(network, addr string) (net.Conn, error)
+
+	// DialTLS specifies an optional dial function for creating
+	// TLS connections for non-proxied HTTPS requests.
+	//
+	// If DialTLS is nil, Dial and TLSClientConfig are used.
+	//
+	// If DialTLS is set, the Dial hook is not used for HTTPS
+	// requests and the TLSClientConfig and TLSHandshakeTimeout
+	// are ignored. The returned net.Conn is assumed to already be
+	// past the TLS handshake.
+	DialTLS func(network, addr string) (net.Conn, error)
+
+	// TLSClientConfig specifies the TLS configuration to use with
+	// tls.Client. If nil, the default configuration is used.
+	TLSClientConfig *tls.Config
+
+	// TLSHandshakeTimeout specifies the maximum amount of time waiting to
+	// wait for a TLS handshake. Zero means no timeout.
+	TLSHandshakeTimeout time.Duration
+
+	// DisableKeepAlives, if true, prevents re-use of TCP connections
+	// between different HTTP requests.
+	DisableKeepAlives bool
+
+	// DisableCompression, if true, prevents the Transport from
+	// requesting compression with an "Accept-Encoding: gzip"
+	// request header when the Request contains no existing
+	// Accept-Encoding value. If the Transport requests gzip on
+	// its own and gets a gzipped response, it's transparently
+	// decoded in the Response.Body. However, if the user
+	// explicitly requested gzip it is not automatically
+	// uncompressed.
+	DisableCompression bool
+
+	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
+	// (keep-alive) to keep per-host.  If zero,
+	// DefaultMaxIdleConnsPerHost is used.
+	MaxIdleConnsPerHost int
+
+	// ResponseHeaderTimeout, if non-zero, specifies the amount of
+	// time to wait for a server's response headers after fully
+	// writing the request (including its body, if any). This
+	// time does not include the time to read the response body.
+	ResponseHeaderTimeout time.Duration
+
+	// TODO: tunable on global max cached connections
+	// TODO: tunable on timeout on cached connections
+}
+
+// transportRequest is a wrapper around a *Request that adds
+// optional extra headers to write.
+type transportRequest struct {
+	*http.Request             // original request, not to be mutated
+	extra         http.Header // extra headers to write, or nil
+}
+
+func (tr *transportRequest) extraHeaders() http.Header {
+	if tr.extra == nil {
+		tr.extra = make(http.Header)
+	}
+	return tr.extra
+}
+
+func closeBody(req *http.Request) {
+	if req.Body != nil {
+		req.Body.Close()
+	}
+}
+
+func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
+
+// RoundTrip implements the RoundTripper interface.
+//
+// For higher-level HTTP client support (such as handling of cookies
+// and redirects), see Get, Post, and the Client type.
+func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
+	if req.URL == nil {
+		closeBody(req)
+		return nil, errors.New("http: nil Request.URL")
+	}
+	if req.Header == nil {
+		closeBody(req)
+		return nil, errors.New("http: nil Request.Header")
+	}
+	if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
+		closeBody(req)
+		return nil, errors.New("http: bad scheme")
+	}
+	if req.URL.Host == "" {
+		closeBody(req)
+		return nil, errors.New("http: no Host in request URL")
+	}
+	treq := &transportRequest{Request: req}
+	cm := t.connectMethodForRequest(treq)
+
+	// Get the cached or newly-created connection to either the
+	// host (for http or https), the http proxy, or the http proxy
+	// pre-CONNECTed to https server.  In any case, we'll be ready
+	// to send it requests.
+	pconn, err := t.getConn(req, cm)
+	if err != nil {
+		t.setReqCanceler(req, nil)
+		closeBody(req)
+		return nil, err
+	}
+
+	return pconn.roundTrip(treq)
+}
+
+// RegisterProtocol registers a new protocol with scheme.
+// The Transport will pass requests using the given scheme to rt.
+// It is rt's responsibility to simulate HTTP request semantics.
+//
+// RegisterProtocol can be used by other packages to provide
+// implementations of protocol schemes like "ftp" or "file".
+func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) {
+	if scheme == "http" || scheme == "https" {
+		panic("protocol " + scheme + " already registered")
+	}
+	t.altMu.Lock()
+	defer t.altMu.Unlock()
+	if t.altProto == nil {
+		t.altProto = make(map[string]http.RoundTripper)
+	}
+	if _, exists := t.altProto[scheme]; exists {
+		panic("protocol " + scheme + " already registered")
+	}
+	t.altProto[scheme] = rt
+}
+
+// CloseIdleConnections closes any connections which were previously
+// connected from previous requests but are now sitting idle in
+// a "keep-alive" state. It does not interrupt any connections currently
+// in use.
+func (t *Transport) CloseIdleConnections() {
+	t.idleMu.Lock()
+	m := t.idleConn
+	t.idleConn = nil
+	t.idleConnCh = nil
+	t.wantIdle = true
+	t.idleMu.Unlock()
+	for _, conns := range m {
+		for _, pconn := range conns {
+			pconn.close()
+		}
+	}
+}
+
+// CancelRequest cancels an in-flight request by closing its connection.
+// CancelRequest should only be called after RoundTrip has returned.
+func (t *Transport) CancelRequest(req *http.Request) {
+	t.reqMu.Lock()
+	if cancel, ok := t.reqCanceler[req]; ok {
+		delete(t.reqCanceler, req)
+		t.reqMu.Unlock()
+		if cancel != nil {
+			cancel()
+		}
+	} else {
+		if t.reqCanceled == nil {
+			t.reqCanceled = make(map[*http.Request]bool)
+		}
+		t.reqCanceled[req] = true
+		t.reqMu.Unlock()
+	}
+}
+
+//
+// Private implementation past this point.
+//
+
+// envOnce looks up an environment variable (optionally by multiple
+// names) once. It mitigates expensive lookups on some platforms
+// (e.g. Windows).
+type envOnce struct {
+	names []string
+	once  sync.Once
+	val   string
+}
+
+func (e *envOnce) Get() string {
+	e.once.Do(e.init)
+	return e.val
+}
+
+func (e *envOnce) init() {
+	for _, n := range e.names {
+		e.val = os.Getenv(n)
+		if e.val != "" {
+			return
+		}
+	}
+}
+
+// reset is used by tests
+func (e *envOnce) reset() {
+	e.once = sync.Once{}
+	e.val = ""
+}
+
+func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod) {
+	cm.targetScheme = treq.URL.Scheme
+	cm.targetAddr = canonicalAddr(treq.URL)
+	return cm
+}
+
+// putIdleConn adds pconn to the list of idle persistent connections awaiting
+// a new request.
+// If pconn is no longer needed or not in a good state, putIdleConn
+// returns false.
+func (t *Transport) putIdleConn(pconn *persistConn) bool {
+	if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
+		pconn.close()
+		return false
+	}
+	if pconn.isBroken() {
+		return false
+	}
+	key := pconn.cacheKey
+	max := t.MaxIdleConnsPerHost
+	if max == 0 {
+		max = DefaultMaxIdleConnsPerHost
+	}
+	t.idleMu.Lock()
+
+	waitingDialer := t.idleConnCh[key]
+	select {
+	case waitingDialer <- pconn:
+		// We're done with this pconn and somebody else is
+		// currently waiting for a conn of this type (they're
+		// actively dialing, but this conn is ready
+		// first). Chrome calls this socket late binding.  See
+		// https://insouciant.org/tech/connection-management-in-chromium/
+		t.idleMu.Unlock()
+		return true
+	default:
+		if waitingDialer != nil {
+			// They had populated this, but their dial won
+			// first, so we can clean up this map entry.
+			delete(t.idleConnCh, key)
+		}
+	}
+	if t.wantIdle {
+		t.idleMu.Unlock()
+		pconn.close()
+		return false
+	}
+	if t.idleConn == nil {
+		t.idleConn = make(map[connectMethodKey][]*persistConn)
+	}
+	if len(t.idleConn[key]) >= max {
+		t.idleMu.Unlock()
+		pconn.close()
+		return false
+	}
+	for _, exist := range t.idleConn[key] {
+		if exist == pconn {
+			log.Fatalf("dup idle pconn %p in freelist", pconn)
+		}
+	}
+	t.idleConn[key] = append(t.idleConn[key], pconn)
+	t.idleMu.Unlock()
+	return true
+}
+
+// getIdleConnCh returns a channel to receive and return idle
+// persistent connection for the given connectMethod.
+// It may return nil, if persistent connections are not being used.
+func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
+	if t.DisableKeepAlives {
+		return nil
+	}
+	key := cm.key()
+	t.idleMu.Lock()
+	defer t.idleMu.Unlock()
+	t.wantIdle = false
+	if t.idleConnCh == nil {
+		t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
+	}
+	ch, ok := t.idleConnCh[key]
+	if !ok {
+		ch = make(chan *persistConn)
+		t.idleConnCh[key] = ch
+	}
+	return ch
+}
+
+func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
+	key := cm.key()
+	t.idleMu.Lock()
+	defer t.idleMu.Unlock()
+	if t.idleConn == nil {
+		return nil
+	}
+	for {
+		pconns, ok := t.idleConn[key]
+		if !ok {
+			return nil
+		}
+		if len(pconns) == 1 {
+			pconn = pconns[0]
+			delete(t.idleConn, key)
+		} else {
+			// 2 or more cached connections; pop last
+			// TODO: queue?
+			pconn = pconns[len(pconns)-1]
+			t.idleConn[key] = pconns[:len(pconns)-1]
+		}
+		if !pconn.isBroken() {
+			return
+		}
+	}
+}
+
+func (t *Transport) setReqCanceler(r *http.Request, fn func()) {
+	t.reqMu.Lock()
+	defer t.reqMu.Unlock()
+	if t.reqCanceler == nil {
+		t.reqCanceler = make(map[*http.Request]func())
+	}
+	if fn != nil {
+		t.reqCanceler[r] = fn
+	} else {
+		delete(t.reqCanceler, r)
+	}
+}
+
+// replaceReqCanceler replaces an existing cancel function. If there is no cancel function
+// for the request, we don't set the function and return false.
+// Since CancelRequest will clear the canceler, we can use the return value to detect if
+// the request was canceled since the last setReqCancel call.
+func (t *Transport) replaceReqCanceler(r *http.Request, fn func()) bool {
+	t.reqMu.Lock()
+	defer t.reqMu.Unlock()
+	_, ok := t.reqCanceler[r]
+	if !ok {
+		return false
+	}
+	if fn != nil {
+		t.reqCanceler[r] = fn
+	} else {
+		delete(t.reqCanceler, r)
+	}
+	return true
+}
+
+func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
+	if t.Dial != nil {
+		return t.Dial(network, addr)
+	}
+	return net.Dial(network, addr)
+}
+
+// Testing hooks:
+var prePendingDial, postPendingDial func()
+
+// getConn dials and creates a new persistConn to the target as
+// specified in the connectMethod.  This includes doing a proxy CONNECT
+// and/or setting up TLS.  If this doesn't return an error, the persistConn
+// is ready to write requests to.
+func (t *Transport) getConn(req *http.Request, cm connectMethod) (*persistConn, error) {
+	if pc := t.getIdleConn(cm); pc != nil {
+		// set request canceler to some non-nil function so we
+		// can detect whether it was cleared between now and when
+		// we enter roundTrip
+		t.setReqCanceler(req, func() {})
+
+		t.reqMu.Lock()
+		if _, ok := t.reqCanceled[req]; ok {
+			// request canceled before we knew about it
+			delete(t.reqCanceled, req)
+			pc.cancelRequest()
+		}
+		t.reqMu.Unlock()
+
+		return pc, nil
+	}
+
+	type dialRes struct {
+		pc  *persistConn
+		err error
+	}
+	dialc := make(chan dialRes)
+
+	// Copy these hooks so we don't race on the postPendingDial in
+	// the goroutine we launch. Issue 11136.
+	prePendingDial := prePendingDial
+	postPendingDial := postPendingDial
+
+	handlePendingDial := func() {
+		if prePendingDial != nil {
+			prePendingDial()
+		}
+		go func() {
+			if v := <-dialc; v.err == nil {
+				t.putIdleConn(v.pc)
+			}
+			if postPendingDial != nil {
+				postPendingDial()
+			}
+		}()
+	}
+
+	cancelc := make(chan struct{})
+	t.setReqCanceler(req, func() { close(cancelc) })
+
+	t.reqMu.Lock()
+	if _, ok := t.reqCanceled[req]; ok {
+		// request canceled before we knew about it
+		if cancel, ok := t.reqCanceler[req]; ok {
+			delete(t.reqCanceler, req)
+			delete(t.reqCanceled, req)
+			if cancel != nil {
+				cancel()
+			}
+		}
+	}
+	t.reqMu.Unlock()
+
+	go func() {
+		pc, err := t.dialConn(cm)
+		dialc <- dialRes{pc, err}
+	}()
+
+	idleConnCh := t.getIdleConnCh(cm)
+	select {
+	case v := <-dialc:
+		// Our dial finished.
+		return v.pc, v.err
+	case pc := <-idleConnCh:
+		// Another request finished first and its net.Conn
+		// became available before our dial. Or somebody
+		// else's dial that they didn't use.
+		// But our dial is still going, so give it away
+		// when it finishes:
+		handlePendingDial()
+		return pc, nil
+	case <-cancelc:
+		handlePendingDial()
+		return nil, errors.New("net/http: request canceled while waiting for connection")
+	}
+}
+
+func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
+	pconn := &persistConn{
+		t:          t,
+		cacheKey:   cm.key(),
+		reqch:      make(chan requestAndChan, 1),
+		writech:    make(chan writeRequest, 1),
+		closech:    make(chan struct{}),
+		writeErrCh: make(chan error, 1),
+	}
+	tlsDial := t.DialTLS != nil && cm.targetScheme == "https"
+	if tlsDial {
+		var err error
+		pconn.conn, err = t.DialTLS("tcp", cm.addr())
+		if err != nil {
+			return nil, err
+		}
+		if tc, ok := pconn.conn.(*tls.Conn); ok {
+			cs := tc.ConnectionState()
+			pconn.tlsState = &cs
+		}
+	} else {
+		conn, err := t.dial("tcp", cm.addr())
+		if err != nil {
+			return nil, err
+		}
+		pconn.conn = conn
+	}
+
+	if cm.targetScheme == "https" && !tlsDial {
+		// Initiate TLS and check remote host name against certificate.
+		cfg := t.TLSClientConfig
+		if cfg == nil || cfg.ServerName == "" {
+			host := cm.tlsHost()
+			if cfg == nil {
+				cfg = &tls.Config{ServerName: host}
+			} else {
+				clone := *cfg // shallow clone
+				clone.ServerName = host
+				cfg = &clone
+			}
+		}
+		plainConn := pconn.conn
+		tlsConn := tls.Client(plainConn, cfg)
+		errc := make(chan error, 2)
+		var timer *time.Timer // for canceling TLS handshake
+		if d := t.TLSHandshakeTimeout; d != 0 {
+			timer = time.AfterFunc(d, func() {
+				errc <- tlsHandshakeTimeoutError{}
+			})
+		}
+		go func() {
+			err := tlsConn.Handshake()
+			if timer != nil {
+				timer.Stop()
+			}
+			errc <- err
+		}()
+		if err := <-errc; err != nil {
+			plainConn.Close()
+			return nil, err
+		}
+		if !cfg.InsecureSkipVerify {
+			if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
+				plainConn.Close()
+				return nil, err
+			}
+		}
+		cs := tlsConn.ConnectionState()
+		pconn.tlsState = &cs
+		pconn.conn = tlsConn
+	}
+
+	pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
+	pconn.bw = bufio.NewWriter(pconn.conn)
+	go pconn.readLoop()
+	go pconn.writeLoop()
+	return pconn, nil
+}
+
+// connectMethod is the map key (in its String form) for keeping persistent
+// TCP connections alive for subsequent HTTP requests.
+//
+// A connect method may be of the following types:
+//
+// Cache key form                Description
+// -----------------             -------------------------
+// |http|foo.com                 http directly to server, no proxy
+// |https|foo.com                https directly to server, no proxy
+// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
+// http://proxy.com|http           http to proxy, http to anywhere after that
+//
+// Note: no support to https to the proxy yet.
+//
+type connectMethod struct {
+	targetScheme string // "http" or "https"
+	targetAddr   string // Not used if proxy + http targetScheme (4th example in table)
+}
+
+func (cm *connectMethod) key() connectMethodKey {
+	return connectMethodKey{
+		scheme: cm.targetScheme,
+		addr:   cm.targetAddr,
+	}
+}
+
+// addr returns the first hop "host:port" to which we need to TCP connect.
+func (cm *connectMethod) addr() string {
+	return cm.targetAddr
+}
+
+// tlsHost returns the host name to match against the peer's
+// TLS certificate.
+func (cm *connectMethod) tlsHost() string {
+	h := cm.targetAddr
+	if hasPort(h) {
+		h = h[:strings.LastIndex(h, ":")]
+	}
+	return h
+}
+
+// connectMethodKey is the map key version of connectMethod, with a
+// stringified proxy URL (or the empty string) instead of a pointer to
+// a URL.
+type connectMethodKey struct {
+	scheme, addr string
+}
+
+func (k connectMethodKey) String() string {
+	// Only used by tests.
+	return fmt.Sprintf("%s|%s|%s", k.scheme, k.addr)
+}
+
+// persistConn wraps a connection, usually a persistent one
+// (but may be used for non-keep-alive requests as well)
+type persistConn struct {
+	t        *Transport
+	cacheKey connectMethodKey
+	conn     net.Conn
+	tlsState *tls.ConnectionState
+	br       *bufio.Reader       // from conn
+	sawEOF   bool                // whether we've seen EOF from conn; owned by readLoop
+	bw       *bufio.Writer       // to conn
+	reqch    chan requestAndChan // written by roundTrip; read by readLoop
+	writech  chan writeRequest   // written by roundTrip; read by writeLoop
+	closech  chan struct{}       // closed when conn closed
+	// writeErrCh passes the request write error (usually nil)
+	// from the writeLoop goroutine to the readLoop which passes
+	// it off to the res.Body reader, which then uses it to decide
+	// whether or not a connection can be reused. Issue 7569.
+	writeErrCh chan error
+
+	lk                   sync.Mutex // guards following fields
+	numExpectedResponses int
+	closed               bool // whether conn has been closed
+	broken               bool // an error has happened on this connection; marked broken so it's not reused.
+	canceled             bool // whether this conn was broken due a CancelRequest
+	// mutateHeaderFunc is an optional func to modify extra
+	// headers on each outbound request before it's written. (the
+	// original Request given to RoundTrip is not modified)
+	mutateHeaderFunc func(http.Header)
+}
+
+// isBroken reports whether this connection is in a known broken state.
+func (pc *persistConn) isBroken() bool {
+	pc.lk.Lock()
+	b := pc.broken
+	pc.lk.Unlock()
+	return b
+}
+
+// isCanceled reports whether this connection was closed due to CancelRequest.
+func (pc *persistConn) isCanceled() bool {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	return pc.canceled
+}
+
+func (pc *persistConn) cancelRequest() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.canceled = true
+	pc.closeLocked()
+}
+
+func (pc *persistConn) readLoop() {
+	// eofc is used to block http.Handler goroutines reading from Response.Body
+	// at EOF until this goroutines has (potentially) added the connection
+	// back to the idle pool.
+	eofc := make(chan struct{})
+	defer close(eofc) // unblock reader on errors
+
+	// Read this once, before loop starts. (to avoid races in tests)
+	testHookMu.Lock()
+	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
+	testHookMu.Unlock()
+
+	alive := true
+	for alive {
+		pb, err := pc.br.Peek(1)
+
+		pc.lk.Lock()
+		if pc.numExpectedResponses == 0 {
+			if !pc.closed {
+				pc.closeLocked()
+				if len(pb) > 0 {
+					log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
+						string(pb), err)
+				}
+			}
+			pc.lk.Unlock()
+			return
+		}
+		pc.lk.Unlock()
+
+		rc := <-pc.reqch
+
+		var resp *http.Response
+		if err == nil {
+			resp, err = http.ReadResponse(pc.br, rc.req)
+			if err == nil && resp.StatusCode == 100 {
+				// Skip any 100-continue for now.
+				// TODO(bradfitz): if rc.req had "Expect: 100-continue",
+				// actually block the request body write and signal the
+				// writeLoop now to begin sending it. (Issue 2184) For now we
+				// eat it, since we're never expecting one.
+				resp, err = http.ReadResponse(pc.br, rc.req)
+			}
+		}
+
+		if resp != nil {
+			resp.TLS = pc.tlsState
+		}
+
+		hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
+
+		if err != nil {
+			pc.close()
+		} else {
+			resp.Body = &bodyEOFSignal{body: resp.Body}
+		}
+
+		if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
+			// Don't do keep-alive on error if either party requested a close
+			// or we get an unexpected informational (1xx) response.
+			// StatusCode 100 is already handled above.
+			alive = false
+		}
+
+		var waitForBodyRead chan bool // channel is nil when there's no body
+		if hasBody {
+			waitForBodyRead = make(chan bool, 2)
+			resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
+				waitForBodyRead <- false
+				return nil
+			}
+			resp.Body.(*bodyEOFSignal).fn = func(err error) error {
+				isEOF := err == io.EOF
+				waitForBodyRead <- isEOF
+				if isEOF {
+					<-eofc // see comment at top
+				} else if err != nil && pc.isCanceled() {
+					return errRequestCanceled
+				}
+				return err
+			}
+		}
+
+		pc.lk.Lock()
+		pc.numExpectedResponses--
+		pc.lk.Unlock()
+
+		// The connection might be going away when we put the
+		// idleConn below. When that happens, we close the response channel to signal
+		// to roundTrip that the connection is gone. roundTrip waits for
+		// both closing and a response in a select, so it might choose
+		// the close channel, rather than the response.
+		// We send the response first so that roundTrip can check
+		// if there is a pending one with a non-blocking select
+		// on the response channel before erroring out.
+		rc.ch <- responseAndError{resp, err}
+
+		if hasBody {
+			// To avoid a race, wait for the just-returned
+			// response body to be fully consumed before peek on
+			// the underlying bufio reader.
+			select {
+			case bodyEOF := <-waitForBodyRead:
+				pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+				alive = alive &&
+					bodyEOF &&
+					!pc.sawEOF &&
+					pc.wroteRequest() &&
+					pc.t.putIdleConn(pc)
+				if bodyEOF {
+					eofc <- struct{}{}
+				}
+			case <-pc.closech:
+				alive = false
+			}
+		} else {
+			pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+			alive = alive &&
+				!pc.sawEOF &&
+				pc.wroteRequest() &&
+				pc.t.putIdleConn(pc)
+		}
+
+		if hook := testHookReadLoopBeforeNextRead; hook != nil {
+			hook()
+		}
+	}
+	pc.close()
+}
+
+func (pc *persistConn) writeLoop() {
+	for {
+		select {
+		case wr := <-pc.writech:
+			if pc.isBroken() {
+				wr.ch <- errors.New("http: can't write HTTP request on broken connection")
+				continue
+			}
+			err := wr.req.Request.Write(pc.bw)
+			if err == nil {
+				err = pc.bw.Flush()
+			}
+			if err != nil {
+				pc.markBroken()
+				closeBody(wr.req.Request)
+			}
+			pc.writeErrCh <- err // to the body reader, which might recycle us
+			wr.ch <- err         // to the roundTrip function
+		case <-pc.closech:
+			return
+		}
+	}
+}
+
+// wroteRequest is a check before recycling a connection that the previous write
+// (from writeLoop above) happened and was successful.
+func (pc *persistConn) wroteRequest() bool {
+	select {
+	case err := <-pc.writeErrCh:
+		// Common case: the write happened well before the response, so
+		// avoid creating a timer.
+		return err == nil
+	default:
+		// Rare case: the request was written in writeLoop above but
+		// before it could send to pc.writeErrCh, the reader read it
+		// all, processed it, and called us here. In this case, give the
+		// write goroutine a bit of time to finish its send.
+		//
+		// Less rare case: We also get here in the legitimate case of
+		// Issue 7569, where the writer is still writing (or stalled),
+		// but the server has already replied. In this case, we don't
+		// want to wait too long, and we want to return false so this
+		// connection isn't re-used.
+		select {
+		case err := <-pc.writeErrCh:
+			return err == nil
+		case <-time.After(50 * time.Millisecond):
+			return false
+		}
+	}
+}
+
+type responseAndError struct {
+	res *http.Response
+	err error
+}
+
+type requestAndChan struct {
+	req *http.Request
+	ch  chan responseAndError
+}
+
+// A writeRequest is sent by the readLoop's goroutine to the
+// writeLoop's goroutine to write a request while the read loop
+// concurrently waits on both the write response and the server's
+// reply.
+type writeRequest struct {
+	req *transportRequest
+	ch  chan<- error
+}
+
+type trHttpError struct {
+	err     string
+	timeout bool
+}
+
+func (e *trHttpError) Error() string   { return e.err }
+func (e *trHttpError) Timeout() bool   { return e.timeout }
+func (e *trHttpError) Temporary() bool { return true }
+
+var errTimeout error = &trHttpError{err: "net/http: timeout awaiting response headers", timeout: true}
+var errClosed error = &trHttpError{err: "net/http: transport closed before response was received"}
+var errRequestCanceled = errors.New("net/http: request canceled")
+
+// nil except for tests
+var (
+	testHookPersistConnClosedGotRes func()
+	testHookEnterRoundTrip          func()
+	testHookMu                      sync.Locker = fakeLocker{} // guards following
+	testHookReadLoopBeforeNextRead  func()
+)
+
+func (pc *persistConn) roundTrip(req *transportRequest) (resp *http.Response, err error) {
+	if hook := testHookEnterRoundTrip; hook != nil {
+		hook()
+	}
+	if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
+		pc.t.putIdleConn(pc)
+		return nil, errRequestCanceled
+	}
+	pc.lk.Lock()
+	pc.numExpectedResponses++
+	headerFn := pc.mutateHeaderFunc
+	pc.lk.Unlock()
+
+	if headerFn != nil {
+		headerFn(req.extraHeaders())
+	}
+
+	// Write the request concurrently with waiting for a response,
+	// in case the server decides to reply before reading our full
+	// request body.
+	writeErrCh := make(chan error, 1)
+	pc.writech <- writeRequest{req, writeErrCh}
+
+	resc := make(chan responseAndError, 1)
+	pc.reqch <- requestAndChan{req.Request, resc}
+
+	var re responseAndError
+	var respHeaderTimer <-chan time.Time
+WaitResponse:
+	for {
+		select {
+		case err := <-writeErrCh:
+			if err != nil {
+				re = responseAndError{nil, err}
+				pc.close()
+				break WaitResponse
+			}
+			if d := pc.t.ResponseHeaderTimeout; d > 0 {
+				timer := time.NewTimer(d)
+				defer timer.Stop() // prevent leaks
+				respHeaderTimer = timer.C
+			}
+		case <-pc.closech:
+			// The persist connection is dead. This shouldn't
+			// usually happen (only with Connection: close responses
+			// with no response bodies), but if it does happen it
+			// means either a) the remote server hung up on us
+			// prematurely, or b) the readLoop sent us a response &
+			// closed its closech at roughly the same time, and we
+			// selected this case first. If we got a response, readLoop makes sure
+			// to send it before it puts the conn and closes the channel.
+			// That way, we can fetch the response, if there is one,
+			// with a non-blocking receive.
+			select {
+			case re = <-resc:
+				if fn := testHookPersistConnClosedGotRes; fn != nil {
+					fn()
+				}
+			default:
+				re = responseAndError{err: errClosed}
+			}
+			break WaitResponse
+		case <-respHeaderTimer:
+			pc.close()
+			re = responseAndError{err: errTimeout}
+			break WaitResponse
+		case re = <-resc:
+			break WaitResponse
+		}
+	}
+
+	if re.err != nil {
+		pc.t.setReqCanceler(req.Request, nil)
+	}
+	return re.res, re.err
+}
+
+// markBroken marks a connection as broken (so it's not reused).
+// It differs from close in that it doesn't close the underlying
+// connection for use when it's still being read.
+func (pc *persistConn) markBroken() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.broken = true
+}
+
+func (pc *persistConn) close() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.closeLocked()
+}
+
+func (pc *persistConn) closeLocked() {
+	pc.broken = true
+	if !pc.closed {
+		pc.conn.Close()
+		pc.closed = true
+		close(pc.closech)
+	}
+	pc.mutateHeaderFunc = nil
+}
+
+var portMap = map[string]string{
+	"http":  "80",
+	"https": "443",
+}
+
+// canonicalAddr returns url.Host but always with a ":port" suffix
+func canonicalAddr(url *url.URL) string {
+	addr := url.Host
+	if !hasPort(addr) {
+		return addr + ":" + portMap[url.Scheme]
+	}
+	return addr
+}
+
+// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
+// once, right before its final (error-producing) Read or Close call
+// returns. fn should return the new error to return from Read or Close.
+//
+// If earlyCloseFn is non-nil and Close is called before io.EOF is
+// seen, earlyCloseFn is called instead of fn, and its return value is
+// the return value from Close.
+type bodyEOFSignal struct {
+	body         io.ReadCloser
+	mu           sync.Mutex        // guards following 4 fields
+	closed       bool              // whether Close has been called
+	rerr         error             // sticky Read error
+	fn           func(error) error // err will be nil on Read io.EOF
+	earlyCloseFn func() error      // optional alt Close func used if io.EOF not seen
+}
+
+func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
+	es.mu.Lock()
+	closed, rerr := es.closed, es.rerr
+	es.mu.Unlock()
+	if closed {
+		return 0, errors.New("http: read on closed response body")
+	}
+	if rerr != nil {
+		return 0, rerr
+	}
+
+	n, err = es.body.Read(p)
+	if err != nil {
+		es.mu.Lock()
+		defer es.mu.Unlock()
+		if es.rerr == nil {
+			es.rerr = err
+		}
+		err = es.condfn(err)
+	}
+	return
+}
+
+func (es *bodyEOFSignal) Close() error {
+	es.mu.Lock()
+	defer es.mu.Unlock()
+	if es.closed {
+		return nil
+	}
+	es.closed = true
+	if es.earlyCloseFn != nil && es.rerr != io.EOF {
+		return es.earlyCloseFn()
+	}
+	err := es.body.Close()
+	return es.condfn(err)
+}
+
+// caller must hold es.mu.
+func (es *bodyEOFSignal) condfn(err error) error {
+	if es.fn == nil {
+		return err
+	}
+	err = es.fn(err)
+	es.fn = nil
+	return err
+}
+
+type readerAndCloser struct {
+	io.Reader
+	io.Closer
+}
+
+type tlsHandshakeTimeoutError struct{}
+
+func (tlsHandshakeTimeoutError) Timeout() bool   { return true }
+func (tlsHandshakeTimeoutError) Temporary() bool { return true }
+func (tlsHandshakeTimeoutError) Error() string   { return "net/http: TLS handshake timeout" }
+
+type noteEOFReader struct {
+	r      io.Reader
+	sawEOF *bool
+}
+
+func (nr noteEOFReader) Read(p []byte) (n int, err error) {
+	n, err = nr.r.Read(p)
+	if err == io.EOF {
+		*nr.sawEOF = true
+	}
+	return
+}
+
+// fakeLocker is a sync.Locker which does nothing. It's used to guard
+// test-only fields when not under test, to avoid runtime atomic
+// overhead.
+type fakeLocker struct{}
+
+func (fakeLocker) Lock()   {}
+func (fakeLocker) Unlock() {}