Browse Source

remote: ensure testing HTTP requests are canceled

Due to https://github.com/golang/go/issues/11013 race conditions
in the remote testcases mean that HTTP requests aren't actually
canceled.  In client.go::httpDo() the goroutine that actually
runs the http.Do() may not execute until after that routine's
select{} loop, at which point the Context may have already been
canceled.  Unfortunately with Go < 1.5 there is a race in the
http.Transport package that drops cancellations on the floor.

This issue can be reproduced by using the remote_test.go hunks
that use a WaitGroup to wait for subnet.WatchLeases() to exit
after the Context is canceled, but not the client.go hunks.
Specifically, doTestWatch() calls cancel() before
watch.go::WatchLeases() calls sm.WatchLeases(), which in this
case is RemoteManager.WatchLeases(), which then calls
RemoteManager.httpGet() -> httpDo(), which then calls
http.Client.Do() which has the aformentioned cancellation race.

To fix the races more completely (though not as completely
as Go 1.5 will do) we have to copy most of http.Transport
into the tree and implement some cancellation fixups ourselves.
A few parts of the new Transport are notable:

1) CancelRequest() - if the request is not yet being tracked,
the cancelation is saved in another map for later

2) getConn() - the real work is done here; when adding a
new request to the internal table, check if it the client has
asked to cancel it, and actually cancel it.  Second if we
are going to use an idle connection for the request, and the
request has already been canceled, then cancel the idle
connection too.

Second, use a WaitGroup in doTestWatch() to assert that the
outstanding watch is actually canceled.
Dan Williams 9 years ago
parent
commit
b82b1cbe57
3 changed files with 1143 additions and 4 deletions
  1. 23 2
      remote/client.go
  2. 10 2
      remote/remote_test.go
  3. 1110 0
      remote/transport.go

+ 23 - 2
remote/client.go

@@ -19,8 +19,10 @@ import (
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"net"
 	"net/http"
 	"path"
+	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
@@ -31,7 +33,26 @@ import (
 // implements subnet.Manager by sending requests to the server
 type RemoteManager struct {
 	base      string // includes scheme, host, and port, and version
-	transport *http.Transport
+	transport *Transport
+}
+
+func NewTransport(info transport.TLSInfo) (*Transport, error) {
+	cfg, err := info.ClientConfig()
+	if err != nil {
+		return nil, err
+	}
+
+	t := &Transport{
+		// timeouts taken from http.DefaultTransport
+		Dial: (&net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+		}).Dial,
+		TLSHandshakeTimeout: 10 * time.Second,
+		TLSClientConfig:     cfg,
+	}
+
+	return t, nil
 }
 
 func NewRemoteManager(listenAddr, cafile, certfile, keyfile string) (subnet.Manager, error) {
@@ -41,7 +62,7 @@ func NewRemoteManager(listenAddr, cafile, certfile, keyfile string) (subnet.Mana
 		KeyFile:  keyfile,
 	}
 
-	t, err := transport.NewTransport(tls)
+	t, err := NewTransport(tls)
 	if err != nil {
 		return nil, err
 	}

+ 10 - 2
remote/remote_test.go

@@ -125,10 +125,18 @@ func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string) {
 
 func doTestWatch(t *testing.T, sm subnet.Manager) {
 	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	defer func() {
+		cancel()
+		wg.Wait()
+	}()
 
 	events := make(chan []subnet.Event)
-	go subnet.WatchLeases(ctx, sm, "_", events)
+	go func() {
+		subnet.WatchLeases(ctx, sm, "_", events)
+		wg.Done()
+	}()
 
 	// skip over the initial snapshot
 	<-events

+ 1110 - 0
remote/transport.go

@@ -0,0 +1,1110 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// HTTP client implementation. See RFC 2616.
+//
+// This is the low-level Transport implementation of RoundTripper.
+// The high-level interface is in client.go.
+
+package remote
+
+import (
+	"bufio"
+	"crypto/tls"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"strings"
+	"sync"
+	"time"
+)
+
+// DefaultMaxIdleConnsPerHost is the default value of Transport's
+// MaxIdleConnsPerHost.
+const DefaultMaxIdleConnsPerHost = 2
+
+// Transport is an implementation of RoundTripper that supports HTTP,
+// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
+// Transport can also cache connections for future re-use.
+type Transport struct {
+	idleMu     sync.Mutex
+	wantIdle   bool // user has requested to close all idle conns
+	idleConn   map[connectMethodKey][]*persistConn
+	idleConnCh map[connectMethodKey]chan *persistConn
+
+	reqMu       sync.Mutex
+	reqCanceler map[*http.Request]func()
+	reqCanceled map[*http.Request]bool
+
+	altMu    sync.RWMutex
+	altProto map[string]http.RoundTripper // nil or map of URI scheme => RoundTripper
+
+	// Dial specifies the dial function for creating unencrypted
+	// TCP connections.
+	// If Dial is nil, net.Dial is used.
+	Dial func(network, addr string) (net.Conn, error)
+
+	// DialTLS specifies an optional dial function for creating
+	// TLS connections for non-proxied HTTPS requests.
+	//
+	// If DialTLS is nil, Dial and TLSClientConfig are used.
+	//
+	// If DialTLS is set, the Dial hook is not used for HTTPS
+	// requests and the TLSClientConfig and TLSHandshakeTimeout
+	// are ignored. The returned net.Conn is assumed to already be
+	// past the TLS handshake.
+	DialTLS func(network, addr string) (net.Conn, error)
+
+	// TLSClientConfig specifies the TLS configuration to use with
+	// tls.Client. If nil, the default configuration is used.
+	TLSClientConfig *tls.Config
+
+	// TLSHandshakeTimeout specifies the maximum amount of time waiting to
+	// wait for a TLS handshake. Zero means no timeout.
+	TLSHandshakeTimeout time.Duration
+
+	// DisableKeepAlives, if true, prevents re-use of TCP connections
+	// between different HTTP requests.
+	DisableKeepAlives bool
+
+	// DisableCompression, if true, prevents the Transport from
+	// requesting compression with an "Accept-Encoding: gzip"
+	// request header when the Request contains no existing
+	// Accept-Encoding value. If the Transport requests gzip on
+	// its own and gets a gzipped response, it's transparently
+	// decoded in the Response.Body. However, if the user
+	// explicitly requested gzip it is not automatically
+	// uncompressed.
+	DisableCompression bool
+
+	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
+	// (keep-alive) to keep per-host.  If zero,
+	// DefaultMaxIdleConnsPerHost is used.
+	MaxIdleConnsPerHost int
+
+	// ResponseHeaderTimeout, if non-zero, specifies the amount of
+	// time to wait for a server's response headers after fully
+	// writing the request (including its body, if any). This
+	// time does not include the time to read the response body.
+	ResponseHeaderTimeout time.Duration
+
+	// TODO: tunable on global max cached connections
+	// TODO: tunable on timeout on cached connections
+}
+
+// transportRequest is a wrapper around a *Request that adds
+// optional extra headers to write.
+type transportRequest struct {
+	*http.Request             // original request, not to be mutated
+	extra         http.Header // extra headers to write, or nil
+}
+
+func (tr *transportRequest) extraHeaders() http.Header {
+	if tr.extra == nil {
+		tr.extra = make(http.Header)
+	}
+	return tr.extra
+}
+
+func closeBody(req *http.Request) {
+	if req.Body != nil {
+		req.Body.Close()
+	}
+}
+
+func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
+
+// RoundTrip implements the RoundTripper interface.
+//
+// For higher-level HTTP client support (such as handling of cookies
+// and redirects), see Get, Post, and the Client type.
+func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
+	if req.URL == nil {
+		closeBody(req)
+		return nil, errors.New("http: nil Request.URL")
+	}
+	if req.Header == nil {
+		closeBody(req)
+		return nil, errors.New("http: nil Request.Header")
+	}
+	if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
+		closeBody(req)
+		return nil, errors.New("http: bad scheme")
+	}
+	if req.URL.Host == "" {
+		closeBody(req)
+		return nil, errors.New("http: no Host in request URL")
+	}
+	treq := &transportRequest{Request: req}
+	cm := t.connectMethodForRequest(treq)
+
+	// Get the cached or newly-created connection to either the
+	// host (for http or https), the http proxy, or the http proxy
+	// pre-CONNECTed to https server.  In any case, we'll be ready
+	// to send it requests.
+	pconn, err := t.getConn(req, cm)
+	if err != nil {
+		t.setReqCanceler(req, nil)
+		closeBody(req)
+		return nil, err
+	}
+
+	return pconn.roundTrip(treq)
+}
+
+// RegisterProtocol registers a new protocol with scheme.
+// The Transport will pass requests using the given scheme to rt.
+// It is rt's responsibility to simulate HTTP request semantics.
+//
+// RegisterProtocol can be used by other packages to provide
+// implementations of protocol schemes like "ftp" or "file".
+func (t *Transport) RegisterProtocol(scheme string, rt http.RoundTripper) {
+	if scheme == "http" || scheme == "https" {
+		panic("protocol " + scheme + " already registered")
+	}
+	t.altMu.Lock()
+	defer t.altMu.Unlock()
+	if t.altProto == nil {
+		t.altProto = make(map[string]http.RoundTripper)
+	}
+	if _, exists := t.altProto[scheme]; exists {
+		panic("protocol " + scheme + " already registered")
+	}
+	t.altProto[scheme] = rt
+}
+
+// CloseIdleConnections closes any connections which were previously
+// connected from previous requests but are now sitting idle in
+// a "keep-alive" state. It does not interrupt any connections currently
+// in use.
+func (t *Transport) CloseIdleConnections() {
+	t.idleMu.Lock()
+	m := t.idleConn
+	t.idleConn = nil
+	t.idleConnCh = nil
+	t.wantIdle = true
+	t.idleMu.Unlock()
+	for _, conns := range m {
+		for _, pconn := range conns {
+			pconn.close()
+		}
+	}
+}
+
+// CancelRequest cancels an in-flight request by closing its connection.
+// CancelRequest should only be called after RoundTrip has returned.
+func (t *Transport) CancelRequest(req *http.Request) {
+	t.reqMu.Lock()
+	if cancel, ok := t.reqCanceler[req]; ok {
+		delete(t.reqCanceler, req)
+		t.reqMu.Unlock()
+		if cancel != nil {
+			cancel()
+		}
+	} else {
+		if t.reqCanceled == nil {
+			t.reqCanceled = make(map[*http.Request]bool)
+		}
+		t.reqCanceled[req] = true
+		t.reqMu.Unlock()
+	}
+}
+
+//
+// Private implementation past this point.
+//
+
+// envOnce looks up an environment variable (optionally by multiple
+// names) once. It mitigates expensive lookups on some platforms
+// (e.g. Windows).
+type envOnce struct {
+	names []string
+	once  sync.Once
+	val   string
+}
+
+func (e *envOnce) Get() string {
+	e.once.Do(e.init)
+	return e.val
+}
+
+func (e *envOnce) init() {
+	for _, n := range e.names {
+		e.val = os.Getenv(n)
+		if e.val != "" {
+			return
+		}
+	}
+}
+
+// reset is used by tests
+func (e *envOnce) reset() {
+	e.once = sync.Once{}
+	e.val = ""
+}
+
+func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod) {
+	cm.targetScheme = treq.URL.Scheme
+	cm.targetAddr = canonicalAddr(treq.URL)
+	return cm
+}
+
+// putIdleConn adds pconn to the list of idle persistent connections awaiting
+// a new request.
+// If pconn is no longer needed or not in a good state, putIdleConn
+// returns false.
+func (t *Transport) putIdleConn(pconn *persistConn) bool {
+	if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
+		pconn.close()
+		return false
+	}
+	if pconn.isBroken() {
+		return false
+	}
+	key := pconn.cacheKey
+	max := t.MaxIdleConnsPerHost
+	if max == 0 {
+		max = DefaultMaxIdleConnsPerHost
+	}
+	t.idleMu.Lock()
+
+	waitingDialer := t.idleConnCh[key]
+	select {
+	case waitingDialer <- pconn:
+		// We're done with this pconn and somebody else is
+		// currently waiting for a conn of this type (they're
+		// actively dialing, but this conn is ready
+		// first). Chrome calls this socket late binding.  See
+		// https://insouciant.org/tech/connection-management-in-chromium/
+		t.idleMu.Unlock()
+		return true
+	default:
+		if waitingDialer != nil {
+			// They had populated this, but their dial won
+			// first, so we can clean up this map entry.
+			delete(t.idleConnCh, key)
+		}
+	}
+	if t.wantIdle {
+		t.idleMu.Unlock()
+		pconn.close()
+		return false
+	}
+	if t.idleConn == nil {
+		t.idleConn = make(map[connectMethodKey][]*persistConn)
+	}
+	if len(t.idleConn[key]) >= max {
+		t.idleMu.Unlock()
+		pconn.close()
+		return false
+	}
+	for _, exist := range t.idleConn[key] {
+		if exist == pconn {
+			log.Fatalf("dup idle pconn %p in freelist", pconn)
+		}
+	}
+	t.idleConn[key] = append(t.idleConn[key], pconn)
+	t.idleMu.Unlock()
+	return true
+}
+
+// getIdleConnCh returns a channel to receive and return idle
+// persistent connection for the given connectMethod.
+// It may return nil, if persistent connections are not being used.
+func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
+	if t.DisableKeepAlives {
+		return nil
+	}
+	key := cm.key()
+	t.idleMu.Lock()
+	defer t.idleMu.Unlock()
+	t.wantIdle = false
+	if t.idleConnCh == nil {
+		t.idleConnCh = make(map[connectMethodKey]chan *persistConn)
+	}
+	ch, ok := t.idleConnCh[key]
+	if !ok {
+		ch = make(chan *persistConn)
+		t.idleConnCh[key] = ch
+	}
+	return ch
+}
+
+func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
+	key := cm.key()
+	t.idleMu.Lock()
+	defer t.idleMu.Unlock()
+	if t.idleConn == nil {
+		return nil
+	}
+	for {
+		pconns, ok := t.idleConn[key]
+		if !ok {
+			return nil
+		}
+		if len(pconns) == 1 {
+			pconn = pconns[0]
+			delete(t.idleConn, key)
+		} else {
+			// 2 or more cached connections; pop last
+			// TODO: queue?
+			pconn = pconns[len(pconns)-1]
+			t.idleConn[key] = pconns[:len(pconns)-1]
+		}
+		if !pconn.isBroken() {
+			return
+		}
+	}
+}
+
+func (t *Transport) setReqCanceler(r *http.Request, fn func()) {
+	t.reqMu.Lock()
+	defer t.reqMu.Unlock()
+	if t.reqCanceler == nil {
+		t.reqCanceler = make(map[*http.Request]func())
+	}
+	if fn != nil {
+		t.reqCanceler[r] = fn
+	} else {
+		delete(t.reqCanceler, r)
+	}
+}
+
+// replaceReqCanceler replaces an existing cancel function. If there is no cancel function
+// for the request, we don't set the function and return false.
+// Since CancelRequest will clear the canceler, we can use the return value to detect if
+// the request was canceled since the last setReqCancel call.
+func (t *Transport) replaceReqCanceler(r *http.Request, fn func()) bool {
+	t.reqMu.Lock()
+	defer t.reqMu.Unlock()
+	_, ok := t.reqCanceler[r]
+	if !ok {
+		return false
+	}
+	if fn != nil {
+		t.reqCanceler[r] = fn
+	} else {
+		delete(t.reqCanceler, r)
+	}
+	return true
+}
+
+func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
+	if t.Dial != nil {
+		return t.Dial(network, addr)
+	}
+	return net.Dial(network, addr)
+}
+
+// Testing hooks:
+var prePendingDial, postPendingDial func()
+
+// getConn dials and creates a new persistConn to the target as
+// specified in the connectMethod.  This includes doing a proxy CONNECT
+// and/or setting up TLS.  If this doesn't return an error, the persistConn
+// is ready to write requests to.
+func (t *Transport) getConn(req *http.Request, cm connectMethod) (*persistConn, error) {
+	if pc := t.getIdleConn(cm); pc != nil {
+		// set request canceler to some non-nil function so we
+		// can detect whether it was cleared between now and when
+		// we enter roundTrip
+		t.setReqCanceler(req, func() {})
+
+		t.reqMu.Lock()
+		if _, ok := t.reqCanceled[req]; ok {
+			// request canceled before we knew about it
+			delete(t.reqCanceled, req)
+			pc.cancelRequest()
+		}
+		t.reqMu.Unlock()
+
+		return pc, nil
+	}
+
+	type dialRes struct {
+		pc  *persistConn
+		err error
+	}
+	dialc := make(chan dialRes)
+
+	// Copy these hooks so we don't race on the postPendingDial in
+	// the goroutine we launch. Issue 11136.
+	prePendingDial := prePendingDial
+	postPendingDial := postPendingDial
+
+	handlePendingDial := func() {
+		if prePendingDial != nil {
+			prePendingDial()
+		}
+		go func() {
+			if v := <-dialc; v.err == nil {
+				t.putIdleConn(v.pc)
+			}
+			if postPendingDial != nil {
+				postPendingDial()
+			}
+		}()
+	}
+
+	cancelc := make(chan struct{})
+	t.setReqCanceler(req, func() { close(cancelc) })
+
+	t.reqMu.Lock()
+	if _, ok := t.reqCanceled[req]; ok {
+		// request canceled before we knew about it
+		if cancel, ok := t.reqCanceler[req]; ok {
+			delete(t.reqCanceler, req)
+			delete(t.reqCanceled, req)
+			if cancel != nil {
+				cancel()
+			}
+		}
+	}
+	t.reqMu.Unlock()
+
+	go func() {
+		pc, err := t.dialConn(cm)
+		dialc <- dialRes{pc, err}
+	}()
+
+	idleConnCh := t.getIdleConnCh(cm)
+	select {
+	case v := <-dialc:
+		// Our dial finished.
+		return v.pc, v.err
+	case pc := <-idleConnCh:
+		// Another request finished first and its net.Conn
+		// became available before our dial. Or somebody
+		// else's dial that they didn't use.
+		// But our dial is still going, so give it away
+		// when it finishes:
+		handlePendingDial()
+		return pc, nil
+	case <-cancelc:
+		handlePendingDial()
+		return nil, errors.New("net/http: request canceled while waiting for connection")
+	}
+}
+
+func (t *Transport) dialConn(cm connectMethod) (*persistConn, error) {
+	pconn := &persistConn{
+		t:          t,
+		cacheKey:   cm.key(),
+		reqch:      make(chan requestAndChan, 1),
+		writech:    make(chan writeRequest, 1),
+		closech:    make(chan struct{}),
+		writeErrCh: make(chan error, 1),
+	}
+	tlsDial := t.DialTLS != nil && cm.targetScheme == "https"
+	if tlsDial {
+		var err error
+		pconn.conn, err = t.DialTLS("tcp", cm.addr())
+		if err != nil {
+			return nil, err
+		}
+		if tc, ok := pconn.conn.(*tls.Conn); ok {
+			cs := tc.ConnectionState()
+			pconn.tlsState = &cs
+		}
+	} else {
+		conn, err := t.dial("tcp", cm.addr())
+		if err != nil {
+			return nil, err
+		}
+		pconn.conn = conn
+	}
+
+	if cm.targetScheme == "https" && !tlsDial {
+		// Initiate TLS and check remote host name against certificate.
+		cfg := t.TLSClientConfig
+		if cfg == nil || cfg.ServerName == "" {
+			host := cm.tlsHost()
+			if cfg == nil {
+				cfg = &tls.Config{ServerName: host}
+			} else {
+				clone := *cfg // shallow clone
+				clone.ServerName = host
+				cfg = &clone
+			}
+		}
+		plainConn := pconn.conn
+		tlsConn := tls.Client(plainConn, cfg)
+		errc := make(chan error, 2)
+		var timer *time.Timer // for canceling TLS handshake
+		if d := t.TLSHandshakeTimeout; d != 0 {
+			timer = time.AfterFunc(d, func() {
+				errc <- tlsHandshakeTimeoutError{}
+			})
+		}
+		go func() {
+			err := tlsConn.Handshake()
+			if timer != nil {
+				timer.Stop()
+			}
+			errc <- err
+		}()
+		if err := <-errc; err != nil {
+			plainConn.Close()
+			return nil, err
+		}
+		if !cfg.InsecureSkipVerify {
+			if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
+				plainConn.Close()
+				return nil, err
+			}
+		}
+		cs := tlsConn.ConnectionState()
+		pconn.tlsState = &cs
+		pconn.conn = tlsConn
+	}
+
+	pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF})
+	pconn.bw = bufio.NewWriter(pconn.conn)
+	go pconn.readLoop()
+	go pconn.writeLoop()
+	return pconn, nil
+}
+
+// connectMethod is the map key (in its String form) for keeping persistent
+// TCP connections alive for subsequent HTTP requests.
+//
+// A connect method may be of the following types:
+//
+// Cache key form                Description
+// -----------------             -------------------------
+// |http|foo.com                 http directly to server, no proxy
+// |https|foo.com                https directly to server, no proxy
+// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
+// http://proxy.com|http           http to proxy, http to anywhere after that
+//
+// Note: no support to https to the proxy yet.
+//
+type connectMethod struct {
+	targetScheme string // "http" or "https"
+	targetAddr   string // Not used if proxy + http targetScheme (4th example in table)
+}
+
+func (cm *connectMethod) key() connectMethodKey {
+	return connectMethodKey{
+		scheme: cm.targetScheme,
+		addr:   cm.targetAddr,
+	}
+}
+
+// addr returns the first hop "host:port" to which we need to TCP connect.
+func (cm *connectMethod) addr() string {
+	return cm.targetAddr
+}
+
+// tlsHost returns the host name to match against the peer's
+// TLS certificate.
+func (cm *connectMethod) tlsHost() string {
+	h := cm.targetAddr
+	if hasPort(h) {
+		h = h[:strings.LastIndex(h, ":")]
+	}
+	return h
+}
+
+// connectMethodKey is the map key version of connectMethod, with a
+// stringified proxy URL (or the empty string) instead of a pointer to
+// a URL.
+type connectMethodKey struct {
+	scheme, addr string
+}
+
+func (k connectMethodKey) String() string {
+	// Only used by tests.
+	return fmt.Sprintf("%s|%s|%s", k.scheme, k.addr)
+}
+
+// persistConn wraps a connection, usually a persistent one
+// (but may be used for non-keep-alive requests as well)
+type persistConn struct {
+	t        *Transport
+	cacheKey connectMethodKey
+	conn     net.Conn
+	tlsState *tls.ConnectionState
+	br       *bufio.Reader       // from conn
+	sawEOF   bool                // whether we've seen EOF from conn; owned by readLoop
+	bw       *bufio.Writer       // to conn
+	reqch    chan requestAndChan // written by roundTrip; read by readLoop
+	writech  chan writeRequest   // written by roundTrip; read by writeLoop
+	closech  chan struct{}       // closed when conn closed
+	// writeErrCh passes the request write error (usually nil)
+	// from the writeLoop goroutine to the readLoop which passes
+	// it off to the res.Body reader, which then uses it to decide
+	// whether or not a connection can be reused. Issue 7569.
+	writeErrCh chan error
+
+	lk                   sync.Mutex // guards following fields
+	numExpectedResponses int
+	closed               bool // whether conn has been closed
+	broken               bool // an error has happened on this connection; marked broken so it's not reused.
+	canceled             bool // whether this conn was broken due a CancelRequest
+	// mutateHeaderFunc is an optional func to modify extra
+	// headers on each outbound request before it's written. (the
+	// original Request given to RoundTrip is not modified)
+	mutateHeaderFunc func(http.Header)
+}
+
+// isBroken reports whether this connection is in a known broken state.
+func (pc *persistConn) isBroken() bool {
+	pc.lk.Lock()
+	b := pc.broken
+	pc.lk.Unlock()
+	return b
+}
+
+// isCanceled reports whether this connection was closed due to CancelRequest.
+func (pc *persistConn) isCanceled() bool {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	return pc.canceled
+}
+
+func (pc *persistConn) cancelRequest() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.canceled = true
+	pc.closeLocked()
+}
+
+func (pc *persistConn) readLoop() {
+	// eofc is used to block http.Handler goroutines reading from Response.Body
+	// at EOF until this goroutines has (potentially) added the connection
+	// back to the idle pool.
+	eofc := make(chan struct{})
+	defer close(eofc) // unblock reader on errors
+
+	// Read this once, before loop starts. (to avoid races in tests)
+	testHookMu.Lock()
+	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
+	testHookMu.Unlock()
+
+	alive := true
+	for alive {
+		pb, err := pc.br.Peek(1)
+
+		pc.lk.Lock()
+		if pc.numExpectedResponses == 0 {
+			if !pc.closed {
+				pc.closeLocked()
+				if len(pb) > 0 {
+					log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
+						string(pb), err)
+				}
+			}
+			pc.lk.Unlock()
+			return
+		}
+		pc.lk.Unlock()
+
+		rc := <-pc.reqch
+
+		var resp *http.Response
+		if err == nil {
+			resp, err = http.ReadResponse(pc.br, rc.req)
+			if err == nil && resp.StatusCode == 100 {
+				// Skip any 100-continue for now.
+				// TODO(bradfitz): if rc.req had "Expect: 100-continue",
+				// actually block the request body write and signal the
+				// writeLoop now to begin sending it. (Issue 2184) For now we
+				// eat it, since we're never expecting one.
+				resp, err = http.ReadResponse(pc.br, rc.req)
+			}
+		}
+
+		if resp != nil {
+			resp.TLS = pc.tlsState
+		}
+
+		hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
+
+		if err != nil {
+			pc.close()
+		} else {
+			resp.Body = &bodyEOFSignal{body: resp.Body}
+		}
+
+		if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 {
+			// Don't do keep-alive on error if either party requested a close
+			// or we get an unexpected informational (1xx) response.
+			// StatusCode 100 is already handled above.
+			alive = false
+		}
+
+		var waitForBodyRead chan bool // channel is nil when there's no body
+		if hasBody {
+			waitForBodyRead = make(chan bool, 2)
+			resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
+				waitForBodyRead <- false
+				return nil
+			}
+			resp.Body.(*bodyEOFSignal).fn = func(err error) error {
+				isEOF := err == io.EOF
+				waitForBodyRead <- isEOF
+				if isEOF {
+					<-eofc // see comment at top
+				} else if err != nil && pc.isCanceled() {
+					return errRequestCanceled
+				}
+				return err
+			}
+		}
+
+		pc.lk.Lock()
+		pc.numExpectedResponses--
+		pc.lk.Unlock()
+
+		// The connection might be going away when we put the
+		// idleConn below. When that happens, we close the response channel to signal
+		// to roundTrip that the connection is gone. roundTrip waits for
+		// both closing and a response in a select, so it might choose
+		// the close channel, rather than the response.
+		// We send the response first so that roundTrip can check
+		// if there is a pending one with a non-blocking select
+		// on the response channel before erroring out.
+		rc.ch <- responseAndError{resp, err}
+
+		if hasBody {
+			// To avoid a race, wait for the just-returned
+			// response body to be fully consumed before peek on
+			// the underlying bufio reader.
+			select {
+			case bodyEOF := <-waitForBodyRead:
+				pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+				alive = alive &&
+					bodyEOF &&
+					!pc.sawEOF &&
+					pc.wroteRequest() &&
+					pc.t.putIdleConn(pc)
+				if bodyEOF {
+					eofc <- struct{}{}
+				}
+			case <-pc.closech:
+				alive = false
+			}
+		} else {
+			pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
+			alive = alive &&
+				!pc.sawEOF &&
+				pc.wroteRequest() &&
+				pc.t.putIdleConn(pc)
+		}
+
+		if hook := testHookReadLoopBeforeNextRead; hook != nil {
+			hook()
+		}
+	}
+	pc.close()
+}
+
+func (pc *persistConn) writeLoop() {
+	for {
+		select {
+		case wr := <-pc.writech:
+			if pc.isBroken() {
+				wr.ch <- errors.New("http: can't write HTTP request on broken connection")
+				continue
+			}
+			err := wr.req.Request.Write(pc.bw)
+			if err == nil {
+				err = pc.bw.Flush()
+			}
+			if err != nil {
+				pc.markBroken()
+				closeBody(wr.req.Request)
+			}
+			pc.writeErrCh <- err // to the body reader, which might recycle us
+			wr.ch <- err         // to the roundTrip function
+		case <-pc.closech:
+			return
+		}
+	}
+}
+
+// wroteRequest is a check before recycling a connection that the previous write
+// (from writeLoop above) happened and was successful.
+func (pc *persistConn) wroteRequest() bool {
+	select {
+	case err := <-pc.writeErrCh:
+		// Common case: the write happened well before the response, so
+		// avoid creating a timer.
+		return err == nil
+	default:
+		// Rare case: the request was written in writeLoop above but
+		// before it could send to pc.writeErrCh, the reader read it
+		// all, processed it, and called us here. In this case, give the
+		// write goroutine a bit of time to finish its send.
+		//
+		// Less rare case: We also get here in the legitimate case of
+		// Issue 7569, where the writer is still writing (or stalled),
+		// but the server has already replied. In this case, we don't
+		// want to wait too long, and we want to return false so this
+		// connection isn't re-used.
+		select {
+		case err := <-pc.writeErrCh:
+			return err == nil
+		case <-time.After(50 * time.Millisecond):
+			return false
+		}
+	}
+}
+
+type responseAndError struct {
+	res *http.Response
+	err error
+}
+
+type requestAndChan struct {
+	req *http.Request
+	ch  chan responseAndError
+}
+
+// A writeRequest is sent by the readLoop's goroutine to the
+// writeLoop's goroutine to write a request while the read loop
+// concurrently waits on both the write response and the server's
+// reply.
+type writeRequest struct {
+	req *transportRequest
+	ch  chan<- error
+}
+
+type trHttpError struct {
+	err     string
+	timeout bool
+}
+
+func (e *trHttpError) Error() string   { return e.err }
+func (e *trHttpError) Timeout() bool   { return e.timeout }
+func (e *trHttpError) Temporary() bool { return true }
+
+var errTimeout error = &trHttpError{err: "net/http: timeout awaiting response headers", timeout: true}
+var errClosed error = &trHttpError{err: "net/http: transport closed before response was received"}
+var errRequestCanceled = errors.New("net/http: request canceled")
+
+// nil except for tests
+var (
+	testHookPersistConnClosedGotRes func()
+	testHookEnterRoundTrip          func()
+	testHookMu                      sync.Locker = fakeLocker{} // guards following
+	testHookReadLoopBeforeNextRead  func()
+)
+
+func (pc *persistConn) roundTrip(req *transportRequest) (resp *http.Response, err error) {
+	if hook := testHookEnterRoundTrip; hook != nil {
+		hook()
+	}
+	if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
+		pc.t.putIdleConn(pc)
+		return nil, errRequestCanceled
+	}
+	pc.lk.Lock()
+	pc.numExpectedResponses++
+	headerFn := pc.mutateHeaderFunc
+	pc.lk.Unlock()
+
+	if headerFn != nil {
+		headerFn(req.extraHeaders())
+	}
+
+	// Write the request concurrently with waiting for a response,
+	// in case the server decides to reply before reading our full
+	// request body.
+	writeErrCh := make(chan error, 1)
+	pc.writech <- writeRequest{req, writeErrCh}
+
+	resc := make(chan responseAndError, 1)
+	pc.reqch <- requestAndChan{req.Request, resc}
+
+	var re responseAndError
+	var respHeaderTimer <-chan time.Time
+WaitResponse:
+	for {
+		select {
+		case err := <-writeErrCh:
+			if err != nil {
+				re = responseAndError{nil, err}
+				pc.close()
+				break WaitResponse
+			}
+			if d := pc.t.ResponseHeaderTimeout; d > 0 {
+				timer := time.NewTimer(d)
+				defer timer.Stop() // prevent leaks
+				respHeaderTimer = timer.C
+			}
+		case <-pc.closech:
+			// The persist connection is dead. This shouldn't
+			// usually happen (only with Connection: close responses
+			// with no response bodies), but if it does happen it
+			// means either a) the remote server hung up on us
+			// prematurely, or b) the readLoop sent us a response &
+			// closed its closech at roughly the same time, and we
+			// selected this case first. If we got a response, readLoop makes sure
+			// to send it before it puts the conn and closes the channel.
+			// That way, we can fetch the response, if there is one,
+			// with a non-blocking receive.
+			select {
+			case re = <-resc:
+				if fn := testHookPersistConnClosedGotRes; fn != nil {
+					fn()
+				}
+			default:
+				re = responseAndError{err: errClosed}
+			}
+			break WaitResponse
+		case <-respHeaderTimer:
+			pc.close()
+			re = responseAndError{err: errTimeout}
+			break WaitResponse
+		case re = <-resc:
+			break WaitResponse
+		}
+	}
+
+	if re.err != nil {
+		pc.t.setReqCanceler(req.Request, nil)
+	}
+	return re.res, re.err
+}
+
+// markBroken marks a connection as broken (so it's not reused).
+// It differs from close in that it doesn't close the underlying
+// connection for use when it's still being read.
+func (pc *persistConn) markBroken() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.broken = true
+}
+
+func (pc *persistConn) close() {
+	pc.lk.Lock()
+	defer pc.lk.Unlock()
+	pc.closeLocked()
+}
+
+func (pc *persistConn) closeLocked() {
+	pc.broken = true
+	if !pc.closed {
+		pc.conn.Close()
+		pc.closed = true
+		close(pc.closech)
+	}
+	pc.mutateHeaderFunc = nil
+}
+
+var portMap = map[string]string{
+	"http":  "80",
+	"https": "443",
+}
+
+// canonicalAddr returns url.Host but always with a ":port" suffix
+func canonicalAddr(url *url.URL) string {
+	addr := url.Host
+	if !hasPort(addr) {
+		return addr + ":" + portMap[url.Scheme]
+	}
+	return addr
+}
+
+// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
+// once, right before its final (error-producing) Read or Close call
+// returns. fn should return the new error to return from Read or Close.
+//
+// If earlyCloseFn is non-nil and Close is called before io.EOF is
+// seen, earlyCloseFn is called instead of fn, and its return value is
+// the return value from Close.
+type bodyEOFSignal struct {
+	body         io.ReadCloser
+	mu           sync.Mutex        // guards following 4 fields
+	closed       bool              // whether Close has been called
+	rerr         error             // sticky Read error
+	fn           func(error) error // err will be nil on Read io.EOF
+	earlyCloseFn func() error      // optional alt Close func used if io.EOF not seen
+}
+
+func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
+	es.mu.Lock()
+	closed, rerr := es.closed, es.rerr
+	es.mu.Unlock()
+	if closed {
+		return 0, errors.New("http: read on closed response body")
+	}
+	if rerr != nil {
+		return 0, rerr
+	}
+
+	n, err = es.body.Read(p)
+	if err != nil {
+		es.mu.Lock()
+		defer es.mu.Unlock()
+		if es.rerr == nil {
+			es.rerr = err
+		}
+		err = es.condfn(err)
+	}
+	return
+}
+
+func (es *bodyEOFSignal) Close() error {
+	es.mu.Lock()
+	defer es.mu.Unlock()
+	if es.closed {
+		return nil
+	}
+	es.closed = true
+	if es.earlyCloseFn != nil && es.rerr != io.EOF {
+		return es.earlyCloseFn()
+	}
+	err := es.body.Close()
+	return es.condfn(err)
+}
+
+// caller must hold es.mu.
+func (es *bodyEOFSignal) condfn(err error) error {
+	if es.fn == nil {
+		return err
+	}
+	err = es.fn(err)
+	es.fn = nil
+	return err
+}
+
+type readerAndCloser struct {
+	io.Reader
+	io.Closer
+}
+
+type tlsHandshakeTimeoutError struct{}
+
+func (tlsHandshakeTimeoutError) Timeout() bool   { return true }
+func (tlsHandshakeTimeoutError) Temporary() bool { return true }
+func (tlsHandshakeTimeoutError) Error() string   { return "net/http: TLS handshake timeout" }
+
+type noteEOFReader struct {
+	r      io.Reader
+	sawEOF *bool
+}
+
+func (nr noteEOFReader) Read(p []byte) (n int, err error) {
+	n, err = nr.r.Read(p)
+	if err == io.EOF {
+		*nr.sawEOF = true
+	}
+	return
+}
+
+// fakeLocker is a sync.Locker which does nothing. It's used to guard
+// test-only fields when not under test, to avoid runtime atomic
+// overhead.
+type fakeLocker struct{}
+
+func (fakeLocker) Lock()   {}
+func (fakeLocker) Unlock() {}