diff --git a/playground/socket/socket.go b/playground/socket/socket.go index 76527aeec4..87abf56371 100644 --- a/playground/socket/socket.go +++ b/playground/socket/socket.go @@ -31,7 +31,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "unicode/utf8" @@ -129,6 +128,7 @@ func socketHandler(c *websocket.Conn) { } } }() + defer close(out) // Start and kill processes and handle errors. proc := make(map[string]*process) @@ -139,8 +139,7 @@ func socketHandler(c *websocket.Conn) { case "run": log.Println("running snippet from:", c.Request().RemoteAddr) proc[m.Id].Kill() - lOut := limiter(in, out) - proc[m.Id] = startProcess(m.Id, m.Body, lOut, m.Options) + proc[m.Id] = startProcess(m.Id, m.Body, out, m.Options) case "kill": proc[m.Id].Kill() } @@ -160,7 +159,6 @@ func socketHandler(c *websocket.Conn) { // process represents a running process. type process struct { - id string out chan<- *Message done chan struct{} // closed when wait completes run *exec.Cmd @@ -169,12 +167,19 @@ type process struct { // startProcess builds and runs the given program, sending its output // and end event as Messages on the provided channel. -func startProcess(id, body string, out chan<- *Message, opt *Options) *process { - p := &process{ - id: id, - out: out, - done: make(chan struct{}), - } +func startProcess(id, body string, dest chan<- *Message, opt *Options) *process { + var ( + done = make(chan struct{}) + out = make(chan *Message) + p = &process{out: out, done: done} + ) + go func() { + defer close(done) + for m := range buffer(limiter(out, p)) { + m.Id = id + dest <- m + } + }() var err error if path, args := shebang(body); path != "" { if RunScripts { @@ -189,13 +194,115 @@ func startProcess(id, body string, out chan<- *Message, opt *Options) *process { p.end(err) return nil } - go p.wait() + go func() { + p.end(p.run.Wait()) + }() return p } +// end sends an "end" message to the client, containing the process id and the +// given error value. It also removes the binary, if present. +func (p *process) end(err error) { + if p.bin != "" { + defer os.Remove(p.bin) + } + m := &Message{Kind: "end"} + if err != nil { + m.Body = err.Error() + } + p.out <- m + close(p.out) +} + +// A killer provides a mechanism to terminate a process. +// The Kill method returns only once the process has exited. +type killer interface { + Kill() +} + +// limiter returns a channel that wraps the given channel. +// It receives Messages from the given channel and sends them to the returned +// channel until it passes msgLimit messages, at which point it will kill the +// process and pass only the "end" message. +// When the given channel is closed, or when the "end" message is received, +// it closes the returned channel. +func limiter(in <-chan *Message, p killer) <-chan *Message { + out := make(chan *Message) + go func() { + defer close(out) + n := 0 + for m := range in { + switch { + case n < msgLimit || m.Kind == "end": + out <- m + if m.Kind == "end" { + return + } + case n == msgLimit: + // Kill in a goroutine as Kill will not return + // until the process' output has been + // processed, and we're doing that in this loop. + go p.Kill() + default: + continue // don't increment + } + n++ + } + }() + return out +} + +// buffer returns a channel that wraps the given channel. It receives messages +// from the given channel and sends them to the returned channel. +// Message bodies are gathered over the period msgDelay and coalesced into a +// single Message before they are passed on. +// When the given channel is closed, buffer flushes the remaining buffered +// messages and closes the returned channel. +func buffer(in <-chan *Message) <-chan *Message { + out := make(chan *Message) + go func() { + defer close(out) + buf := make(map[string][]byte) // [kind]buffer + flush := func() { + for kind, b := range buf { + if len(b) == 0 { + continue + } + out <- &Message{Kind: kind, Body: safeString(b)} + buf[kind] = b[:0] // recycle buffer + } + } + t := time.NewTimer(msgDelay) + var tc <-chan time.Time + for { + select { + case m, ok := <-in: + if !ok { + flush() + return + } + if m.Kind == "end" { + flush() + out <- m + return + } + buf[m.Kind] = append(buf[m.Kind], m.Body...) + if tc == nil { + tc = t.C + t.Reset(msgDelay) + } + case <-tc: + flush() + tc = nil + } + } + }() + return out +} + // Kill stops the process if it is running and waits for it to exit. func (p *process) Kill() { - if p == nil { + if p == nil || p.run == nil { return } p.run.Process.Kill() @@ -224,8 +331,8 @@ func (p *process) startProcess(path string, args []string, body string) error { Path: path, Args: args, Stdin: strings.NewReader(body), - Stdout: &messageWriter{id: p.id, kind: "stdout", out: p.out}, - Stderr: &messageWriter{id: p.id, kind: "stderr", out: p.out}, + Stdout: &messageWriter{kind: "stdout", out: p.out}, + Stderr: &messageWriter{kind: "stderr", out: p.out}, } if err := cmd.Start(); err != nil { return err @@ -261,7 +368,7 @@ func (p *process) start(body string, opt *Options) error { args := []string{"go", "build", "-tags", "OMIT"} if opt != nil && opt.Race { p.out <- &Message{ - Id: p.id, Kind: "stderr", + Kind: "stderr", Body: "Running with race detector.\n", } args = append(args, "-race") @@ -298,35 +405,14 @@ func (p *process) start(body string, opt *Options) error { return nil } -// wait waits for the running process to complete -// and sends its error state to the client. -func (p *process) wait() { - p.end(p.run.Wait()) - close(p.done) // unblock waiting Kill calls -} - -// end sends an "end" message to the client, containing the process id and the -// given error value. It also removes the binary. -func (p *process) end(err error) { - if p.bin != "" { - defer os.Remove(p.bin) - } - m := &Message{Id: p.id, Kind: "end"} - if err != nil { - m.Body = err.Error() - } - // Wait for any outstanding reads to finish (potential race here). - time.AfterFunc(4*msgDelay, func() { p.out <- m }) -} - // cmd builds an *exec.Cmd that writes its standard output and error to the // process' output channel. func (p *process) cmd(dir string, args ...string) *exec.Cmd { cmd := exec.Command(args[0], args[1:]...) cmd.Dir = dir cmd.Env = Environ() - cmd.Stdout = &messageWriter{id: p.id, kind: "stdout", out: p.out} - cmd.Stderr = &messageWriter{id: p.id, kind: "stderr", out: p.out} + cmd.Stdout = &messageWriter{kind: "stdout", out: p.out} + cmd.Stderr = &messageWriter{kind: "stderr", out: p.out} return cmd } @@ -387,33 +473,15 @@ func packageName(body string) (string, error) { // messageWriter is an io.Writer that converts all writes to Message sends on // the out channel with the specified id and kind. type messageWriter struct { - id, kind string - out chan<- *Message - - mu sync.Mutex - buf []byte - send *time.Timer + kind string + out chan<- *Message } func (w *messageWriter) Write(b []byte) (n int, err error) { - // Buffer writes that occur in a short period to send as one Message. - w.mu.Lock() - w.buf = append(w.buf, b...) - if w.send == nil { - w.send = time.AfterFunc(msgDelay, w.sendNow) - } - w.mu.Unlock() + w.out <- &Message{Kind: w.kind, Body: safeString(b)} return len(b), nil } -func (w *messageWriter) sendNow() { - w.mu.Lock() - body := safeString(w.buf) - w.buf, w.send = nil, nil - w.mu.Unlock() - w.out <- &Message{Id: w.id, Kind: w.kind, Body: body} -} - // safeString returns b as a valid UTF-8 string. func safeString(b []byte) string { if utf8.Valid(b) { @@ -428,30 +496,6 @@ func safeString(b []byte) string { return buf.String() } -// limiter returns a channel that wraps dest. Messages sent to the channel are -// sent to dest. After msgLimit Messages have been passed on, a "kill" Message -// is sent to the kill channel, and only "end" messages are passed. -func limiter(kill chan<- *Message, dest chan<- *Message) chan<- *Message { - ch := make(chan *Message) - go func() { - n := 0 - for m := range ch { - switch { - case n < msgLimit || m.Kind == "end": - dest <- m - if m.Kind == "end" { - return - } - case n == msgLimit: - // process produced too much output. Kill it. - kill <- &Message{Id: m.Id, Kind: "kill"} - } - n++ - } - }() - return ch -} - var tmpdir string func init() {