package nhttp import ( "bufio" "context" "errors" "fmt" "io" "math" "net" _http "net/http" "net/url" "strings" "sync" "sync/atomic" "time" "golang.org/x/net/http/httpguts" ) type connReader struct { conn *conn mu sync.Mutex // guards following hasByte bool byteBuf [1]byte cond *sync.Cond inRead bool aborted bool // set true before conn.rwc deadline is set to past remain int64 // bytes remaining } // aLongTimeAgo is a non-zero time, far in the past, used for // immediate cancellation of network operations. var aLongTimeAgo = time.Unix(1, 0) func (cr *connReader) lock() { cr.mu.Lock() if cr.cond == nil { cr.cond = sync.NewCond(&cr.mu) } } func (cr *connReader) unlock() { cr.mu.Unlock() } func (cr *connReader) abortPendingRead() { cr.lock() defer cr.unlock() if !cr.inRead { return } cr.aborted = true cr.conn.rwc.SetReadDeadline(aLongTimeAgo) for cr.inRead { cr.cond.Wait() } cr.conn.rwc.SetReadDeadline(time.Time{}) } func (cr *connReader) startBackgroundRead() { cr.lock() defer cr.unlock() if cr.inRead { panic("invalid concurrent Body.Read call") } if cr.hasByte { return } cr.inRead = true cr.conn.rwc.SetReadDeadline(time.Time{}) go cr.backgroundRead() } func (cr *connReader) backgroundRead() { n, err := cr.conn.rwc.Read(cr.byteBuf[:]) cr.lock() if n == 1 { cr.hasByte = true // We were past the end of the previous request's body already // (since we wouldn't be in a background read otherwise), so // this is a pipelined HTTP request. Prior to Go 1.11 we used to // send on the CloseNotify channel and cancel the context here, // but the behavior was documented as only "may", and we only // did that because that's how CloseNotify accidentally behaved // in very early Go releases prior to context support. Once we // added context support, people used a Handler's // Request.Context() and passed it along. Having that context // cancel on pipelined HTTP requests caused problems. // Fortunately, almost nothing uses HTTP/1.x pipelining. // Unfortunately, apt-get does, or sometimes does. // New Go 1.11 behavior: don't fire CloseNotify or cancel // contexts on pipelined requests. Shouldn't affect people, but // fixes cases like Issue 23921. This does mean that a client // closing their TCP connection after sending a pipelined // request won't cancel the context, but we'll catch that on any // write failure (in checkConnErrorWriter.Write). // If the server never writes, yes, there are still contrived // server & client behaviors where this fails to ever cancel the // context, but that's kinda why HTTP/1.x pipelining died // anyway. } if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() { // Ignore this error. It's the expected error from // another goroutine calling abortPendingRead. } else if err != nil { cr.handleReadError(err) } cr.aborted = false cr.inRead = false cr.unlock() cr.cond.Broadcast() } func (cr *connReader) handleReadError(_ error) { cr.conn.cancelCtx() cr.closeNotify() } // may be called from multiple goroutines. func (cr *connReader) closeNotify() { res := cr.conn.curReq.Load() if res != nil && atomic.CompareAndSwapInt32(&res.didCloseNotify, 0, 1) { res.closeNotifyCh <- true } } func (cr *connReader) Read(p []byte) (n int, err error) { cr.lock() if cr.hitReadLimit() { cr.unlock() return 0, io.EOF } if len(p) == 0 { cr.unlock() return 0, nil } if int64(len(p)) > cr.remain { p = p[:cr.remain] } if cr.hasByte { p[0] = cr.byteBuf[0] cr.hasByte = false cr.unlock() return 1, nil } cr.inRead = true cr.unlock() n, err = cr.conn.rwc.Read(p) cr.lock() cr.inRead = false if err != nil { cr.handleReadError(err) } cr.remain -= int64(n) cr.unlock() cr.cond.Broadcast() return n, err } func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain } func (cr *connReader) setInfiniteReadLimit() { cr.remain = math.MaxInt64 } func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 } // chunkWriter writes to a response's conn buffer, and is the writer // wrapped by the response.w buffered writer. // // chunkWriter also is responsible for finalizing the Header, including // conditionally setting the Content-Type and setting a Content-Length // in cases where the handler's final output is smaller than the buffer // size. It also conditionally adds chunk headers, when in chunking mode. // // See the comment above (*response).Write for the entire write flow. type chunkWriter struct { res *response // header is either nil or a deep clone of res.handlerHeader // at the time of res.writeHeader, if res.writeHeader is // called and extra buffering is being done to calculate // Content-Type and/or Content-Length. header _http.Header // wroteHeader tells whether the header's been written to "the // wire" (or rather: w.conn.buf). this is unlike // (*response).wroteHeader, which tells only whether it was // logically written. wroteHeader bool // set by the writeHeader method: chunking bool // using chunked transfer encoding for reply body } // TimeFormat is the time format to use when generating times in HTTP // headers. It is like time.RFC1123 but hard-codes GMT as the time // zone. The time being formatted must be in UTC for Format to // generate the correct format. // // For parsing this time format, see ParseTime. const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT" // A response represents the server side of an HTTP response. type response struct { conn *conn req *_http.Request // request for this response reqBody io.ReadCloser cancelCtx context.CancelFunc // when ServeHTTP exits wroteHeader bool // a non-1xx header has been (logically) written wroteContinue bool // 100 Continue response was written wants10KeepAlive bool // HTTP/1.0 w/ Connection "keep-alive" wantsClose bool // HTTP request has Connection "close" // canWriteContinue is a boolean value accessed as an atomic int32 // that says whether or not a 100 Continue header can be written // to the connection. // writeContinueMu must be held while writing the header. // These two fields together synchronize the body reader // (the expectContinueReader, which wants to write 100 Continue) // against the main writer. canWriteContinue atomicBool writeContinueMu sync.Mutex w *bufio.Writer // buffers output in chunks to chunkWriter cw chunkWriter // handlerHeader is the Header that Handlers get access to, // which may be retained and mutated even after WriteHeader. // handlerHeader is copied into cw.header at WriteHeader // time, and privately mutated thereafter. handlerHeader _http.Header calledHeader bool // handler accessed handlerHeader via Header written int64 // number of bytes written in body contentLength int64 // explicitly-declared Content-Length; or -1 status int // status code passed to WriteHeader // close connection after this reply. set on request and // updated after response from handler if there's a // "Connection: keep-alive" response header and a // Content-Length. closeAfterReply bool // requestBodyLimitHit is set by requestTooLarge when // maxBytesReader hits its max size. It is checked in // WriteHeader, to make sure we don't consume the // remaining request body to try to advance to the next HTTP // request. Instead, when this is set, we stop reading // subsequent requests on this connection and stop reading // input from it. requestBodyLimitHit bool // trailers are the headers to be sent after the handler // finishes writing the body. This field is initialized from // the Trailer response header when the response header is // written. trailers []string handlerDone atomicBool // set true when the handler exits // Buffers for Date, Content-Length, and status code dateBuf [len(TimeFormat)]byte clenBuf [10]byte statusBuf [3]byte // closeNotifyCh is the channel returned by CloseNotify. // TODO(bradfitz): this is currently (for Go 1.8) always // non-nil. Make this lazily-created again as it used to be? closeNotifyCh chan bool didCloseNotify int32 // atomic (only 0->1 winner should send) } // http1ServerSupportsRequest reports whether Go's HTTP/1.x server // supports the given request. func http1ServerSupportsRequest(req *_http.Request) bool { if req.ProtoMajor == 1 { return true } // Accept "PRI * HTTP/2.0" upgrade requests, so Handlers can // wire up their own HTTP/2 upgrades. if req.ProtoMajor == 2 && req.ProtoMinor == 0 && req.Method == "PRI" && req.RequestURI == "*" { return true } // Reject HTTP/0.x, and all other HTTP/2+ requests (which // aren't encoded in ASCII anyway). return false } var errTooLarge = errors.New("nhttp: request too large") // Read next request from connection. func (c *conn) readRequest(ctx context.Context) (w *response, err error) { var ( wholeReqDeadline time.Time // or zero if none hdrDeadline time.Time // or zero if none ) c.r.setReadLimit(c.server.initialReadLimitSize()) if c.lastMethod == "POST" { // RFC 7230 section 3 tolerance for old buggy clients. peek, _ := c.bufr.Peek(4) // ReadRequest will get err below c.bufr.Discard(numLeadingCRorLF(peek)) } req, err := readRequest(c.bufr) if err != nil { if c.r.hitReadLimit() { return nil, errTooLarge } return nil, err } if !http1ServerSupportsRequest(req) { return nil, errors.New("unsupported protocol version") } c.lastMethod = req.Method c.r.setInfiniteReadLimit() hosts, haveHost := req.Header["Host"] if req.ProtoAtLeast(1, 1) && (!haveHost || len(hosts) == 0) && req.Method != "CONNECT" { return nil, errors.New("missing required Host header") } if len(hosts) == 1 && !httpguts.ValidHostHeader(hosts[0]) { return nil, errors.New("malformed Host header") } for k, vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { return nil, errors.New("invalid header name") } for _, v := range vv { if !httpguts.ValidHeaderFieldValue(v) { return nil, errors.New("invalid header value") } } } delete(req.Header, "Host") ctx, cancelCtx := context.WithCancel(ctx) req = req.WithContext(ctx) req.RemoteAddr = c.remoteAddr req.TLS = c.tlsState if body, ok := req.Body.(*body); ok { body.doEarlyClose = true } // Adjust the read deadline if necessary. if !hdrDeadline.Equal(wholeReqDeadline) { c.rwc.SetReadDeadline(wholeReqDeadline) } w = &response{ conn: c, cancelCtx: cancelCtx, req: req, reqBody: req.Body, handlerHeader: make(_http.Header), contentLength: -1, closeNotifyCh: make(chan bool, 1), // We populate these ahead of time so we're not // reading from req.Header after their Handler starts // and maybe mutates it (Issue 14940) wants10KeepAlive: wantsHttp10KeepAlive(req), wantsClose: wantsClose(req), } w.cw.res = w w.w = newBufioWriterSize(&w.cw, 2048) return w, nil } func wantsHttp10KeepAlive(r *_http.Request) bool { if r.ProtoMajor != 1 || r.ProtoMinor != 0 { return false } return hasToken(r.Header.Get("Connection"), "keep-alive") } func wantsClose(r *_http.Request) bool { if r.Close { return true } return hasToken(r.Header.Get("Connection"), "close") } func readRequest(b *bufio.Reader) (req *_http.Request, err error) { tp := newTextprotoReader(b) req = new(_http.Request) // First line: GET /index.html HTTP/1.0 var s string if s, err = tp.ReadLine(); err != nil { return nil, err } defer func() { putTextprotoReader(tp) if err == io.EOF { err = io.ErrUnexpectedEOF } }() var ok bool req.Method, req.RequestURI, req.Proto, ok = parseRequestLine(s) if !ok { return nil, errors.New("malformed HTTP request") } rawurl := req.RequestURI if req.ProtoMajor, req.ProtoMinor, ok = _http.ParseHTTPVersion(req.Proto); !ok { return nil, errors.New("malformed HTTP version") } // CONNECT requests are used two different ways, and neither uses a full URL: // The standard use is to tunnel HTTPS through an HTTP proxy. // It looks like "CONNECT www.google.com:443 HTTP/1.1", and the parameter is // just the authority section of a URL. This information should go in req.URL.Host. // // The net/rpc package also uses CONNECT, but there the parameter is a path // that starts with a slash. It can be parsed with the regular URL parser, // and the path will end up in req.URL.Path, where it needs to be in order for // RPC to work. justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(rawurl, "/") if justAuthority { //goland:noinspection ALL rawurl = "http://" + rawurl } if req.URL, err = url.ParseRequestURI(rawurl); err != nil { return nil, err } if justAuthority { // Strip the bogus "nhttp://" back off. req.URL.Scheme = "" } // Subsequent lines: Key: value. mimeHeader, err := tp.ReadMIMEHeader() if err != nil { return nil, err } req.Header = _http.Header(mimeHeader) // RFC 7230, section 5.3: Must treat // GET /index.html HTTP/1.1 // Host: www.google.com // and // GET http://www.google.com/index.html HTTP/1.1 // Host: doesntmatter // the same. In the second case, any Host line is ignored. req.Host = req.URL.Host if req.Host == "" { req.Host = req.Header.Get("Host") } fixPragmaCacheControl(req.Header) req.Close = shouldClose(req.ProtoMajor, req.ProtoMinor, req.Header, false) err = readTransfer(req, b) if err != nil { return nil, err } return req, nil } // parseRequestLine parses "GET /foo HTTP/1.1" into its three parts. func parseRequestLine(line string) (method, requestURI, proto string, ok bool) { method, rest, ok1 := strings.Cut(line, " ") requestURI, proto, ok2 := strings.Cut(rest, " ") if !ok1 || !ok2 { return "", "", "", false } return method, requestURI, proto, true } func expectsContinue(r *_http.Request) bool { return hasToken(r.Header.Get("Expect"), "100-continue") } // requestBodyRemains reports whether future calls to Read // on rc might yield more data. func requestBodyRemains(rc io.ReadCloser) bool { if rc == NoBody { return false } switch v := rc.(type) { case *expectContinueReader: return requestBodyRemains(v.readCloser) case *body: return v.bodyRemains() default: panic("unexpected type " + fmt.Sprintf("%T", rc)) } } func registerOnHitEOF(rc io.ReadCloser, fn func()) { switch v := rc.(type) { case *expectContinueReader: registerOnHitEOF(v.readCloser, fn) case *body: v.registerOnHitEOF(fn) default: panic("unexpected type " + fmt.Sprintf("%T", rc)) } }