From 87ac7c77c0e7ca1da242982ef4bc92940e5d681a Mon Sep 17 00:00:00 2001 From: Evan Shaw Date: Wed, 27 Apr 2011 12:34:34 -0700 Subject: [PATCH] http/fcgi: New package R=golang-dev, bradfitzgo, bradfitzwork, nigeltao, rog CC=golang-dev https://golang.org/cl/4271078 --- src/pkg/Makefile | 1 + src/pkg/http/fcgi/Makefile | 12 ++ src/pkg/http/fcgi/child.go | 328 +++++++++++++++++++++++++++++++++ src/pkg/http/fcgi/fcgi.go | 271 +++++++++++++++++++++++++++ src/pkg/http/fcgi/fcgi_test.go | 114 ++++++++++++ 5 files changed, 726 insertions(+) create mode 100644 src/pkg/http/fcgi/Makefile create mode 100644 src/pkg/http/fcgi/child.go create mode 100644 src/pkg/http/fcgi/fcgi.go create mode 100644 src/pkg/http/fcgi/fcgi_test.go diff --git a/src/pkg/Makefile b/src/pkg/Makefile index 44d4473fcbf..b046064a6fa 100644 --- a/src/pkg/Makefile +++ b/src/pkg/Makefile @@ -100,6 +100,7 @@ DIRS=\ html\ http\ http/cgi\ + http/fcgi\ http/pprof\ http/httptest\ image\ diff --git a/src/pkg/http/fcgi/Makefile b/src/pkg/http/fcgi/Makefile new file mode 100644 index 00000000000..bc01cdea9e1 --- /dev/null +++ b/src/pkg/http/fcgi/Makefile @@ -0,0 +1,12 @@ +# Copyright 2011 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +include ../../../Make.inc + +TARG=http/fcgi +GOFILES=\ + child.go\ + fcgi.go\ + +include ../../../Make.pkg diff --git a/src/pkg/http/fcgi/child.go b/src/pkg/http/fcgi/child.go new file mode 100644 index 00000000000..114052bee9e --- /dev/null +++ b/src/pkg/http/fcgi/child.go @@ -0,0 +1,328 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package fcgi + +// This file implements FastCGI from the perspective of a child process. + +import ( + "fmt" + "http" + "io" + "net" + "os" + "strconv" + "strings" + "time" +) + +// request holds the state for an in-progress request. As soon as it's complete, +// it's converted to an http.Request. +type request struct { + pw *io.PipeWriter + reqId uint16 + params map[string]string + buf [1024]byte + rawParams []byte + keepConn bool +} + +func newRequest(reqId uint16, flags uint8) *request { + r := &request{ + reqId: reqId, + params: map[string]string{}, + keepConn: flags&flagKeepConn != 0, + } + r.rawParams = r.buf[:0] + return r +} + +// TODO(eds): copied from http/cgi +var skipHeader = map[string]bool{ + "HTTP_HOST": true, + "HTTP_REFERER": true, + "HTTP_USER_AGENT": true, +} + +// httpRequest converts r to an http.Request. +// TODO(eds): this is very similar to http/cgi's requestFromEnvironment +func (r *request) httpRequest(body io.ReadCloser) (*http.Request, os.Error) { + req := &http.Request{ + Method: r.params["REQUEST_METHOD"], + RawURL: r.params["REQUEST_URI"], + Body: body, + Header: http.Header{}, + Trailer: http.Header{}, + Proto: r.params["SERVER_PROTOCOL"], + } + + var ok bool + req.ProtoMajor, req.ProtoMinor, ok = http.ParseHTTPVersion(req.Proto) + if !ok { + return nil, os.NewError("fcgi: invalid HTTP version") + } + + req.Host = r.params["HTTP_HOST"] + req.Referer = r.params["HTTP_REFERER"] + req.UserAgent = r.params["HTTP_USER_AGENT"] + + if lenstr := r.params["CONTENT_LENGTH"]; lenstr != "" { + clen, err := strconv.Atoi64(r.params["CONTENT_LENGTH"]) + if err != nil { + return nil, os.NewError("fcgi: bad CONTENT_LENGTH parameter: " + lenstr) + } + req.ContentLength = clen + } + + if req.Host != "" { + req.RawURL = "http://" + req.Host + r.params["REQUEST_URI"] + url, err := http.ParseURL(req.RawURL) + if err != nil { + return nil, os.NewError("fcgi: failed to parse host and REQUEST_URI into a URL: " + req.RawURL) + } + req.URL = url + } + if req.URL == nil { + req.RawURL = r.params["REQUEST_URI"] + url, err := http.ParseURL(req.RawURL) + if err != nil { + return nil, os.NewError("fcgi: failed to parse REQUEST_URI into a URL: " + req.RawURL) + } + req.URL = url + } + + for key, val := range r.params { + if strings.HasPrefix(key, "HTTP_") && !skipHeader[key] { + req.Header.Add(strings.Replace(key[5:], "_", "-", -1), val) + } + } + return req, nil +} + +// parseParams reads an encoded []byte into Params. +func (r *request) parseParams() { + text := r.rawParams + r.rawParams = nil + for len(text) > 0 { + keyLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + valLen, n := readSize(text) + if n == 0 { + return + } + text = text[n:] + key := readString(text, keyLen) + text = text[keyLen:] + val := readString(text, valLen) + text = text[valLen:] + r.params[key] = val + } +} + +// response implements http.ResponseWriter. +type response struct { + req *request + header http.Header + w *bufWriter + wroteHeader bool +} + +func newResponse(c *child, req *request) *response { + return &response{ + req: req, + header: http.Header{}, + w: newWriter(c.conn, typeStdout, req.reqId), + } +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(data []byte) (int, os.Error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + return r.w.Write(data) +} + +func (r *response) WriteHeader(code int) { + if r.wroteHeader { + return + } + r.wroteHeader = true + if code == http.StatusNotModified { + // Must not have body. + r.header.Del("Content-Type") + r.header.Del("Content-Length") + r.header.Del("Transfer-Encoding") + } else if r.header.Get("Content-Type") == "" { + r.header.Set("Content-Type", "text/html; charset=utf-8") + } + + if r.header.Get("Date") == "" { + r.header.Set("Date", time.UTC().Format(http.TimeFormat)) + } + + fmt.Fprintf(r.w, "Status: %d %s\r\n", code, http.StatusText(code)) + // TODO(eds): this is duplicated in http and http/cgi + for k, vv := range r.header { + for _, v := range vv { + v = strings.Replace(v, "\n", "", -1) + v = strings.Replace(v, "\r", "", -1) + v = strings.TrimSpace(v) + fmt.Fprintf(r.w, "%s: %s\r\n", k, v) + } + } + r.w.WriteString("\r\n") +} + +func (r *response) Flush() { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + r.w.Flush() +} + +func (r *response) Close() os.Error { + r.Flush() + return r.w.Close() +} + +type child struct { + conn *conn + handler http.Handler +} + +func newChild(rwc net.Conn, handler http.Handler) *child { + return &child{newConn(rwc), handler} +} + +func (c *child) serve() { + requests := map[uint16]*request{} + defer c.conn.Close() + var rec record + var br beginRequest + for { + if err := rec.read(c.conn.rwc); err != nil { + return + } + + req, ok := requests[rec.h.Id] + if !ok && rec.h.Type != typeBeginRequest && rec.h.Type != typeGetValues { + // The spec says to ignore unknown request IDs. + continue + } + if ok && rec.h.Type == typeBeginRequest { + // The server is trying to begin a request with the same ID + // as an in-progress request. This is an error. + return + } + + switch rec.h.Type { + case typeBeginRequest: + if err := br.read(rec.content()); err != nil { + return + } + if br.role != roleResponder { + c.conn.writeEndRequest(rec.h.Id, 0, statusUnknownRole) + break + } + requests[rec.h.Id] = newRequest(rec.h.Id, br.flags) + case typeParams: + // NOTE(eds): Technically a key-value pair can straddle the boundary + // between two packets. We buffer until we've received all parameters. + if len(rec.content()) > 0 { + req.rawParams = append(req.rawParams, rec.content()...) + break + } + req.parseParams() + case typeStdin: + content := rec.content() + if req.pw == nil { + var body io.ReadCloser + if len(content) > 0 { + // body could be an io.LimitReader, but it shouldn't matter + // as long as both sides are behaving. + body, req.pw = io.Pipe() + } + go c.serveRequest(req, body) + } + if len(content) > 0 { + // TODO(eds): This blocks until the handler reads from the pipe. + // If the handler takes a long time, it might be a problem. + req.pw.Write(content) + } else if req.pw != nil { + req.pw.Close() + } + case typeGetValues: + values := map[string]string{"FCGI_MPXS_CONNS": "1"} + c.conn.writePairs(0, typeGetValuesResult, values) + case typeData: + // If the filter role is implemented, read the data stream here. + case typeAbortRequest: + requests[rec.h.Id] = nil, false + c.conn.writeEndRequest(rec.h.Id, 0, statusRequestComplete) + if !req.keepConn { + // connection will close upon return + return + } + default: + b := make([]byte, 8) + b[0] = rec.h.Type + c.conn.writeRecord(typeUnknownType, 0, b) + } + } +} + +func (c *child) serveRequest(req *request, body io.ReadCloser) { + r := newResponse(c, req) + httpReq, err := req.httpRequest(body) + if err != nil { + // there was an error reading the request + r.WriteHeader(http.StatusInternalServerError) + c.conn.writeRecord(typeStderr, req.reqId, []byte(err.String())) + } else { + c.handler.ServeHTTP(r, httpReq) + } + if body != nil { + body.Close() + } + r.Close() + c.conn.writeEndRequest(req.reqId, 0, statusRequestComplete) + if !req.keepConn { + c.conn.Close() + } +} + +// Serve accepts incoming FastCGI connections on the listener l, creating a new +// service thread for each. The service threads read requests and then call handler +// to reply to them. +// If l is nil, Serve accepts connections on stdin. +// If handler is nil, http.DefaultServeMux is used. +func Serve(l net.Listener, handler http.Handler) os.Error { + if l == nil { + var err os.Error + l, err = net.FileListener(os.Stdin) + if err != nil { + return err + } + defer l.Close() + } + if handler == nil { + handler = http.DefaultServeMux + } + for { + rw, err := l.Accept() + if err != nil { + return err + } + c := newChild(rw, handler) + go c.serve() + } + panic("unreachable") +} diff --git a/src/pkg/http/fcgi/fcgi.go b/src/pkg/http/fcgi/fcgi.go new file mode 100644 index 00000000000..8e2e1cd3cb3 --- /dev/null +++ b/src/pkg/http/fcgi/fcgi.go @@ -0,0 +1,271 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package fcgi implements the FastCGI protocol. +// Currently only the responder role is supported. +// The protocol is defined at http://www.fastcgi.com/drupal/node/6?q=node/22 +package fcgi + +// This file defines the raw protocol and some utilities used by the child and +// the host. + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "os" + "sync" +) + +const ( + // Packet Types + typeBeginRequest = iota + 1 + typeAbortRequest + typeEndRequest + typeParams + typeStdin + typeStdout + typeStderr + typeData + typeGetValues + typeGetValuesResult + typeUnknownType +) + +// keep the connection between web-server and responder open after request +const flagKeepConn = 1 + +const ( + maxWrite = 65535 // maximum record body + maxPad = 255 +) + +const ( + roleResponder = iota + 1 // only Responders are implemented. + roleAuthorizer + roleFilter +) + +const ( + statusRequestComplete = iota + statusCantMultiplex + statusOverloaded + statusUnknownRole +) + +const headerLen = 8 + +type header struct { + Version uint8 + Type uint8 + Id uint16 + ContentLength uint16 + PaddingLength uint8 + Reserved uint8 +} + +type beginRequest struct { + role uint16 + flags uint8 + reserved [5]uint8 +} + +func (br *beginRequest) read(content []byte) os.Error { + if len(content) != 8 { + return os.NewError("fcgi: invalid begin request record") + } + br.role = binary.BigEndian.Uint16(content) + br.flags = content[2] + return nil +} + +// for padding so we don't have to allocate all the time +// not synchronized because we don't care what the contents are +var pad [maxPad]byte + +func (h *header) init(recType uint8, reqId uint16, contentLength int) { + h.Version = 1 + h.Type = recType + h.Id = reqId + h.ContentLength = uint16(contentLength) + h.PaddingLength = uint8(-contentLength & 7) +} + +// conn sends records over rwc +type conn struct { + mutex sync.Mutex + rwc io.ReadWriteCloser + + // to avoid allocations + buf bytes.Buffer + h header +} + +func newConn(rwc io.ReadWriteCloser) *conn { + return &conn{rwc: rwc} +} + +func (c *conn) Close() os.Error { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.rwc.Close() +} + +type record struct { + h header + buf [maxWrite + maxPad]byte +} + +func (rec *record) read(r io.Reader) (err os.Error) { + if err = binary.Read(r, binary.BigEndian, &rec.h); err != nil { + return err + } + if rec.h.Version != 1 { + return os.NewError("fcgi: invalid header version") + } + n := int(rec.h.ContentLength) + int(rec.h.PaddingLength) + if _, err = io.ReadFull(r, rec.buf[:n]); err != nil { + return err + } + return nil +} + +func (r *record) content() []byte { + return r.buf[:r.h.ContentLength] +} + +// writeRecord writes and sends a single record. +func (c *conn) writeRecord(recType uint8, reqId uint16, b []byte) os.Error { + c.mutex.Lock() + defer c.mutex.Unlock() + c.buf.Reset() + c.h.init(recType, reqId, len(b)) + if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil { + return err + } + if _, err := c.buf.Write(b); err != nil { + return err + } + if _, err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil { + return err + } + _, err := c.rwc.Write(c.buf.Bytes()) + return err +} + +func (c *conn) writeBeginRequest(reqId uint16, role uint16, flags uint8) os.Error { + b := [8]byte{byte(role >> 8), byte(role), flags} + return c.writeRecord(typeBeginRequest, reqId, b[:]) +} + +func (c *conn) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) os.Error { + b := make([]byte, 8) + binary.BigEndian.PutUint32(b, uint32(appStatus)) + b[4] = protocolStatus + return c.writeRecord(typeEndRequest, reqId, b) +} + +func (c *conn) writePairs(recType uint8, reqId uint16, pairs map[string]string) os.Error { + w := newWriter(c, recType, reqId) + b := make([]byte, 8) + for k, v := range pairs { + n := encodeSize(b, uint32(len(k))) + n += encodeSize(b[n:], uint32(len(k))) + if _, err := w.Write(b[:n]); err != nil { + return err + } + if _, err := w.WriteString(k); err != nil { + return err + } + if _, err := w.WriteString(v); err != nil { + return err + } + } + w.Close() + return nil +} + +func readSize(s []byte) (uint32, int) { + if len(s) == 0 { + return 0, 0 + } + size, n := uint32(s[0]), 1 + if size&(1<<7) != 0 { + if len(s) < 4 { + return 0, 0 + } + n = 4 + size = binary.BigEndian.Uint32(s) + size &^= 1 << 31 + } + return size, n +} + +func readString(s []byte, size uint32) string { + if size > uint32(len(s)) { + return "" + } + return string(s[:size]) +} + +func encodeSize(b []byte, size uint32) int { + if size > 127 { + size |= 1 << 31 + binary.BigEndian.PutUint32(b, size) + return 4 + } + b[0] = byte(size) + return 1 +} + +// bufWriter encapsulates bufio.Writer but also closes the underlying stream when +// Closed. +type bufWriter struct { + closer io.Closer + *bufio.Writer +} + +func (w *bufWriter) Close() os.Error { + if err := w.Writer.Flush(); err != nil { + w.closer.Close() + return err + } + return w.closer.Close() +} + +func newWriter(c *conn, recType uint8, reqId uint16) *bufWriter { + s := &streamWriter{c: c, recType: recType, reqId: reqId} + w, _ := bufio.NewWriterSize(s, maxWrite) + return &bufWriter{s, w} +} + +// streamWriter abstracts out the separation of a stream into discrete records. +// It only writes maxWrite bytes at a time. +type streamWriter struct { + c *conn + recType uint8 + reqId uint16 +} + +func (w *streamWriter) Write(p []byte) (int, os.Error) { + nn := 0 + for len(p) > 0 { + n := len(p) + if n > maxWrite { + n = maxWrite + } + if err := w.c.writeRecord(w.recType, w.reqId, p[:n]); err != nil { + return nn, err + } + nn += n + p = p[n:] + } + return nn, nil +} + +func (w *streamWriter) Close() os.Error { + // send empty record to close the stream + return w.c.writeRecord(w.recType, w.reqId, nil) +} diff --git a/src/pkg/http/fcgi/fcgi_test.go b/src/pkg/http/fcgi/fcgi_test.go new file mode 100644 index 00000000000..16a6243295e --- /dev/null +++ b/src/pkg/http/fcgi/fcgi_test.go @@ -0,0 +1,114 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package fcgi + +import ( + "bytes" + "io" + "os" + "testing" +) + +var sizeTests = []struct { + size uint32 + bytes []byte +}{ + {0, []byte{0x00}}, + {127, []byte{0x7F}}, + {128, []byte{0x80, 0x00, 0x00, 0x80}}, + {1000, []byte{0x80, 0x00, 0x03, 0xE8}}, + {33554431, []byte{0x81, 0xFF, 0xFF, 0xFF}}, +} + +func TestSize(t *testing.T) { + b := make([]byte, 4) + for i, test := range sizeTests { + n := encodeSize(b, test.size) + if !bytes.Equal(b[:n], test.bytes) { + t.Errorf("%d expected %x, encoded %x", i, test.bytes, b) + } + size, n := readSize(test.bytes) + if size != test.size { + t.Errorf("%d expected %d, read %d", i, test.size, size) + } + if len(test.bytes) != n { + t.Errorf("%d did not consume all the bytes", i) + } + } +} + +var streamTests = []struct { + desc string + recType uint8 + reqId uint16 + content []byte + raw []byte +}{ + {"single record", typeStdout, 1, nil, + []byte{1, typeStdout, 0, 1, 0, 0, 0, 0}, + }, + // this data will have to be split into two records + {"two records", typeStdin, 300, make([]byte, 66000), + bytes.Join([][]byte{ + // header for the first record + []byte{1, typeStdin, 0x01, 0x2C, 0xFF, 0xFF, 1, 0}, + make([]byte, 65536), + // header for the second + []byte{1, typeStdin, 0x01, 0x2C, 0x01, 0xD1, 7, 0}, + make([]byte, 472), + // header for the empty record + []byte{1, typeStdin, 0x01, 0x2C, 0, 0, 0, 0}, + }, + nil), + }, +} + +type nilCloser struct { + io.ReadWriter +} + +func (c *nilCloser) Close() os.Error { return nil } + +func TestStreams(t *testing.T) { + var rec record +outer: + for _, test := range streamTests { + buf := bytes.NewBuffer(test.raw) + var content []byte + for buf.Len() > 0 { + if err := rec.read(buf); err != nil { + t.Errorf("%s: error reading record: %v", test.desc, err) + continue outer + } + content = append(content, rec.content()...) + } + if rec.h.Type != test.recType { + t.Errorf("%s: got type %d expected %d", test.desc, rec.h.Type, test.recType) + continue + } + if rec.h.Id != test.reqId { + t.Errorf("%s: got request ID %d expected %d", test.desc, rec.h.Id, test.reqId) + continue + } + if !bytes.Equal(content, test.content) { + t.Errorf("%s: read wrong content", test.desc) + continue + } + buf.Reset() + c := newConn(&nilCloser{buf}) + w := newWriter(c, test.recType, test.reqId) + if _, err := w.Write(test.content); err != nil { + t.Errorf("%s: error writing record: %v", test.desc, err) + continue + } + if err := w.Close(); err != nil { + t.Errorf("%s: error closing stream: %v", test.desc, err) + continue + } + if !bytes.Equal(buf.Bytes(), test.raw) { + t.Errorf("%s: wrote wrong content", test.desc) + } + } +}