wfc-server/nhttp/request.go

509 lines
14 KiB
Go

package nhttp
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"math"
"net"
_http "net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http/httpguts"
)
type connReader struct {
conn *conn
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
// aLongTimeAgo is a non-zero time, far in the past, used for
// immediate cancellation of network operations.
var aLongTimeAgo = time.Unix(1, 0)
func (cr *connReader) lock() {
cr.mu.Lock()
if cr.cond == nil {
cr.cond = sync.NewCond(&cr.mu)
}
}
func (cr *connReader) unlock() { cr.mu.Unlock() }
func (cr *connReader) abortPendingRead() {
cr.lock()
defer cr.unlock()
if !cr.inRead {
return
}
cr.aborted = true
cr.conn.rwc.SetReadDeadline(aLongTimeAgo)
for cr.inRead {
cr.cond.Wait()
}
cr.conn.rwc.SetReadDeadline(time.Time{})
}
func (cr *connReader) startBackgroundRead() {
cr.lock()
defer cr.unlock()
if cr.inRead {
panic("invalid concurrent Body.Read call")
}
if cr.hasByte {
return
}
cr.inRead = true
cr.conn.rwc.SetReadDeadline(time.Time{})
go cr.backgroundRead()
}
func (cr *connReader) backgroundRead() {
n, err := cr.conn.rwc.Read(cr.byteBuf[:])
cr.lock()
if n == 1 {
cr.hasByte = true
// We were past the end of the previous request's body already
// (since we wouldn't be in a background read otherwise), so
// this is a pipelined HTTP request. Prior to Go 1.11 we used to
// send on the CloseNotify channel and cancel the context here,
// but the behavior was documented as only "may", and we only
// did that because that's how CloseNotify accidentally behaved
// in very early Go releases prior to context support. Once we
// added context support, people used a Handler's
// Request.Context() and passed it along. Having that context
// cancel on pipelined HTTP requests caused problems.
// Fortunately, almost nothing uses HTTP/1.x pipelining.
// Unfortunately, apt-get does, or sometimes does.
// New Go 1.11 behavior: don't fire CloseNotify or cancel
// contexts on pipelined requests. Shouldn't affect people, but
// fixes cases like Issue 23921. This does mean that a client
// closing their TCP connection after sending a pipelined
// request won't cancel the context, but we'll catch that on any
// write failure (in checkConnErrorWriter.Write).
// If the server never writes, yes, there are still contrived
// server & client behaviors where this fails to ever cancel the
// context, but that's kinda why HTTP/1.x pipelining died
// anyway.
}
if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
// Ignore this error. It's the expected error from
// another goroutine calling abortPendingRead.
} else if err != nil {
cr.handleReadError(err)
}
cr.aborted = false
cr.inRead = false
cr.unlock()
cr.cond.Broadcast()
}
func (cr *connReader) handleReadError(_ error) {
cr.conn.cancelCtx()
cr.closeNotify()
}
// may be called from multiple goroutines.
func (cr *connReader) closeNotify() {
res := cr.conn.curReq.Load()
if res != nil && atomic.CompareAndSwapInt32(&res.didCloseNotify, 0, 1) {
res.closeNotifyCh <- true
}
}
func (cr *connReader) Read(p []byte) (n int, err error) {
cr.lock()
if cr.hitReadLimit() {
cr.unlock()
return 0, io.EOF
}
if len(p) == 0 {
cr.unlock()
return 0, nil
}
if int64(len(p)) > cr.remain {
p = p[:cr.remain]
}
if cr.hasByte {
p[0] = cr.byteBuf[0]
cr.hasByte = false
cr.unlock()
return 1, nil
}
cr.inRead = true
cr.unlock()
n, err = cr.conn.rwc.Read(p)
cr.lock()
cr.inRead = false
if err != nil {
cr.handleReadError(err)
}
cr.remain -= int64(n)
cr.unlock()
cr.cond.Broadcast()
return n, err
}
func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain }
func (cr *connReader) setInfiniteReadLimit() { cr.remain = math.MaxInt64 }
func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 }
// chunkWriter writes to a response's conn buffer, and is the writer
// wrapped by the response.w buffered writer.
//
// chunkWriter also is responsible for finalizing the Header, including
// conditionally setting the Content-Type and setting a Content-Length
// in cases where the handler's final output is smaller than the buffer
// size. It also conditionally adds chunk headers, when in chunking mode.
//
// See the comment above (*response).Write for the entire write flow.
type chunkWriter struct {
res *response
// header is either nil or a deep clone of res.handlerHeader
// at the time of res.writeHeader, if res.writeHeader is
// called and extra buffering is being done to calculate
// Content-Type and/or Content-Length.
header _http.Header
// wroteHeader tells whether the header's been written to "the
// wire" (or rather: w.conn.buf). this is unlike
// (*response).wroteHeader, which tells only whether it was
// logically written.
wroteHeader bool
// set by the writeHeader method:
chunking bool // using chunked transfer encoding for reply body
}
// TimeFormat is the time format to use when generating times in HTTP
// headers. It is like time.RFC1123 but hard-codes GMT as the time
// zone. The time being formatted must be in UTC for Format to
// generate the correct format.
//
// For parsing this time format, see ParseTime.
const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
// A response represents the server side of an HTTP response.
type response struct {
conn *conn
req *_http.Request // request for this response
reqBody io.ReadCloser
cancelCtx context.CancelFunc // when ServeHTTP exits
wroteHeader bool // a non-1xx header has been (logically) written
wroteContinue bool // 100 Continue response was written
wants10KeepAlive bool // HTTP/1.0 w/ Connection "keep-alive"
wantsClose bool // HTTP request has Connection "close"
// canWriteContinue is a boolean value accessed as an atomic int32
// that says whether or not a 100 Continue header can be written
// to the connection.
// writeContinueMu must be held while writing the header.
// These two fields together synchronize the body reader
// (the expectContinueReader, which wants to write 100 Continue)
// against the main writer.
canWriteContinue AtomicBool
writeContinueMu sync.Mutex
w *bufio.Writer // buffers output in chunks to chunkWriter
cw chunkWriter
// handlerHeader is the Header that Handlers get access to,
// which may be retained and mutated even after WriteHeader.
// handlerHeader is copied into cw.header at WriteHeader
// time, and privately mutated thereafter.
handlerHeader _http.Header
calledHeader bool // handler accessed handlerHeader via Header
written int64 // number of bytes written in body
contentLength int64 // explicitly-declared Content-Length; or -1
status int // status code passed to WriteHeader
// close connection after this reply. set on request and
// updated after response from handler if there's a
// "Connection: keep-alive" response header and a
// Content-Length.
closeAfterReply bool
// requestBodyLimitHit is set by requestTooLarge when
// maxBytesReader hits its max size. It is checked in
// WriteHeader, to make sure we don't consume the
// remaining request body to try to advance to the next HTTP
// request. Instead, when this is set, we stop reading
// subsequent requests on this connection and stop reading
// input from it.
requestBodyLimitHit bool
// trailers are the headers to be sent after the handler
// finishes writing the body. This field is initialized from
// the Trailer response header when the response header is
// written.
trailers []string
handlerDone AtomicBool // set true when the handler exits
// Buffers for Date, Content-Length, and status code
dateBuf [len(TimeFormat)]byte
clenBuf [10]byte
statusBuf [3]byte
// closeNotifyCh is the channel returned by CloseNotify.
// TODO(bradfitz): this is currently (for Go 1.8) always
// non-nil. Make this lazily-created again as it used to be?
closeNotifyCh chan bool
didCloseNotify int32 // atomic (only 0->1 winner should send)
}
// http1ServerSupportsRequest reports whether Go's HTTP/1.x server
// supports the given request.
func http1ServerSupportsRequest(req *_http.Request) bool {
if req.ProtoMajor == 1 {
return true
}
// Accept "PRI * HTTP/2.0" upgrade requests, so Handlers can
// wire up their own HTTP/2 upgrades.
if req.ProtoMajor == 2 && req.ProtoMinor == 0 &&
req.Method == "PRI" && req.RequestURI == "*" {
return true
}
// Reject HTTP/0.x, and all other HTTP/2+ requests (which
// aren't encoded in ASCII anyway).
return false
}
var errTooLarge = errors.New("nhttp: request too large")
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
var (
wholeReqDeadline time.Time // or zero if none
hdrDeadline time.Time // or zero if none
)
c.r.setReadLimit(c.server.initialReadLimitSize())
if c.lastMethod == "POST" {
// RFC 7230 section 3 tolerance for old buggy clients.
peek, _ := c.bufr.Peek(4) // ReadRequest will get err below
c.bufr.Discard(numLeadingCRorLF(peek))
}
req, err := readRequest(c.bufr)
if err != nil {
if c.r.hitReadLimit() {
return nil, errTooLarge
}
return nil, err
}
if !http1ServerSupportsRequest(req) {
return nil, errors.New("unsupported protocol version")
}
c.lastMethod = req.Method
c.r.setInfiniteReadLimit()
hosts, haveHost := req.Header["Host"]
if req.ProtoAtLeast(1, 1) && (!haveHost || len(hosts) == 0) && req.Method != "CONNECT" {
return nil, errors.New("missing required Host header")
}
if len(hosts) == 1 && !httpguts.ValidHostHeader(hosts[0]) {
return nil, errors.New("malformed Host header")
}
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, errors.New("invalid header name")
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, errors.New("invalid header value")
}
}
}
delete(req.Header, "Host")
ctx, cancelCtx := context.WithCancel(ctx)
req = req.WithContext(ctx)
req.RemoteAddr = c.remoteAddr
req.TLS = c.tlsState
if body, ok := req.Body.(*body); ok {
body.doEarlyClose = true
}
// Adjust the read deadline if necessary.
if !hdrDeadline.Equal(wholeReqDeadline) {
c.rwc.SetReadDeadline(wholeReqDeadline)
}
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(_http.Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: wantsHttp10KeepAlive(req),
wantsClose: wantsClose(req),
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, 2048)
return w, nil
}
func wantsHttp10KeepAlive(r *_http.Request) bool {
if r.ProtoMajor != 1 || r.ProtoMinor != 0 {
return false
}
return hasToken(r.Header.Get("Connection"), "keep-alive")
}
func wantsClose(r *_http.Request) bool {
if r.Close {
return true
}
return hasToken(r.Header.Get("Connection"), "close")
}
func readRequest(b *bufio.Reader) (req *_http.Request, err error) {
tp := newTextprotoReader(b)
req = new(_http.Request)
// First line: GET /index.html HTTP/1.0
var s string
if s, err = tp.ReadLine(); err != nil {
return nil, err
}
defer func() {
putTextprotoReader(tp)
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
var ok bool
req.Method, req.RequestURI, req.Proto, ok = parseRequestLine(s)
if !ok {
return nil, errors.New("malformed HTTP request")
}
rawurl := req.RequestURI
if req.ProtoMajor, req.ProtoMinor, ok = _http.ParseHTTPVersion(req.Proto); !ok {
return nil, errors.New("malformed HTTP version")
}
// CONNECT requests are used two different ways, and neither uses a full URL:
// The standard use is to tunnel HTTPS through an HTTP proxy.
// It looks like "CONNECT www.google.com:443 HTTP/1.1", and the parameter is
// just the authority section of a URL. This information should go in req.URL.Host.
//
// The net/rpc package also uses CONNECT, but there the parameter is a path
// that starts with a slash. It can be parsed with the regular URL parser,
// and the path will end up in req.URL.Path, where it needs to be in order for
// RPC to work.
justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(rawurl, "/")
if justAuthority {
//goland:noinspection ALL
rawurl = "http://" + rawurl
}
if req.URL, err = url.ParseRequestURI(rawurl); err != nil {
return nil, err
}
if justAuthority {
// Strip the bogus "nhttp://" back off.
req.URL.Scheme = ""
}
// Subsequent lines: Key: value.
mimeHeader, err := tp.ReadMIMEHeader()
if err != nil {
return nil, err
}
req.Header = _http.Header(mimeHeader)
// RFC 7230, section 5.3: Must treat
// GET /index.html HTTP/1.1
// Host: www.google.com
// and
// GET http://www.google.com/index.html HTTP/1.1
// Host: doesntmatter
// the same. In the second case, any Host line is ignored.
req.Host = req.URL.Host
if req.Host == "" {
req.Host = req.Header.Get("Host")
}
fixPragmaCacheControl(req.Header)
req.Close = shouldClose(req.ProtoMajor, req.ProtoMinor, req.Header, false)
err = readTransfer(req, b)
if err != nil {
return nil, err
}
return req, nil
}
// parseRequestLine parses "GET /foo HTTP/1.1" into its three parts.
func parseRequestLine(line string) (method, requestURI, proto string, ok bool) {
method, rest, ok1 := strings.Cut(line, " ")
requestURI, proto, ok2 := strings.Cut(rest, " ")
if !ok1 || !ok2 {
return "", "", "", false
}
return method, requestURI, proto, true
}
func expectsContinue(r *_http.Request) bool {
return hasToken(r.Header.Get("Expect"), "100-continue")
}
// requestBodyRemains reports whether future calls to Read
// on rc might yield more data.
func requestBodyRemains(rc io.ReadCloser) bool {
if rc == NoBody {
return false
}
switch v := rc.(type) {
case *expectContinueReader:
return requestBodyRemains(v.readCloser)
case *body:
return v.bodyRemains()
default:
panic("unexpected type " + fmt.Sprintf("%T", rc))
}
}
func registerOnHitEOF(rc io.ReadCloser, fn func()) {
switch v := rc.(type) {
case *expectContinueReader:
registerOnHitEOF(v.readCloser, fn)
case *body:
v.registerOnHitEOF(fn)
default:
panic("unexpected type " + fmt.Sprintf("%T", rc))
}
}