1
0
mirror of https://github.com/golang/go synced 2024-11-18 14:54:40 -07:00

playground/socket: fix race condition and goroutine leak

Fixes golang/go#11507

Change-Id: Ifa0a34df02433b3f8a5fb60c4df4eca47750646e
Reviewed-on: https://go-review.googlesource.com/16468
Reviewed-by: jcd . <jcd@golang.org>
This commit is contained in:
Andrew Gerrand 2015-10-29 19:28:03 +11:00
parent 23f9896d9e
commit 14fd3daae1

View File

@ -31,7 +31,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
@ -129,6 +128,7 @@ func socketHandler(c *websocket.Conn) {
}
}
}()
defer close(out)
// Start and kill processes and handle errors.
proc := make(map[string]*process)
@ -139,8 +139,7 @@ func socketHandler(c *websocket.Conn) {
case "run":
log.Println("running snippet from:", c.Request().RemoteAddr)
proc[m.Id].Kill()
lOut := limiter(in, out)
proc[m.Id] = startProcess(m.Id, m.Body, lOut, m.Options)
proc[m.Id] = startProcess(m.Id, m.Body, out, m.Options)
case "kill":
proc[m.Id].Kill()
}
@ -160,7 +159,6 @@ func socketHandler(c *websocket.Conn) {
// process represents a running process.
type process struct {
id string
out chan<- *Message
done chan struct{} // closed when wait completes
run *exec.Cmd
@ -169,12 +167,19 @@ type process struct {
// startProcess builds and runs the given program, sending its output
// and end event as Messages on the provided channel.
func startProcess(id, body string, out chan<- *Message, opt *Options) *process {
p := &process{
id: id,
out: out,
done: make(chan struct{}),
}
func startProcess(id, body string, dest chan<- *Message, opt *Options) *process {
var (
done = make(chan struct{})
out = make(chan *Message)
p = &process{out: out, done: done}
)
go func() {
defer close(done)
for m := range buffer(limiter(out, p)) {
m.Id = id
dest <- m
}
}()
var err error
if path, args := shebang(body); path != "" {
if RunScripts {
@ -189,13 +194,115 @@ func startProcess(id, body string, out chan<- *Message, opt *Options) *process {
p.end(err)
return nil
}
go p.wait()
go func() {
p.end(p.run.Wait())
}()
return p
}
// end sends an "end" message to the client, containing the process id and the
// given error value. It also removes the binary, if present.
func (p *process) end(err error) {
if p.bin != "" {
defer os.Remove(p.bin)
}
m := &Message{Kind: "end"}
if err != nil {
m.Body = err.Error()
}
p.out <- m
close(p.out)
}
// A killer provides a mechanism to terminate a process.
// The Kill method returns only once the process has exited.
type killer interface {
Kill()
}
// limiter returns a channel that wraps the given channel.
// It receives Messages from the given channel and sends them to the returned
// channel until it passes msgLimit messages, at which point it will kill the
// process and pass only the "end" message.
// When the given channel is closed, or when the "end" message is received,
// it closes the returned channel.
func limiter(in <-chan *Message, p killer) <-chan *Message {
out := make(chan *Message)
go func() {
defer close(out)
n := 0
for m := range in {
switch {
case n < msgLimit || m.Kind == "end":
out <- m
if m.Kind == "end" {
return
}
case n == msgLimit:
// Kill in a goroutine as Kill will not return
// until the process' output has been
// processed, and we're doing that in this loop.
go p.Kill()
default:
continue // don't increment
}
n++
}
}()
return out
}
// buffer returns a channel that wraps the given channel. It receives messages
// from the given channel and sends them to the returned channel.
// Message bodies are gathered over the period msgDelay and coalesced into a
// single Message before they are passed on.
// When the given channel is closed, buffer flushes the remaining buffered
// messages and closes the returned channel.
func buffer(in <-chan *Message) <-chan *Message {
out := make(chan *Message)
go func() {
defer close(out)
buf := make(map[string][]byte) // [kind]buffer
flush := func() {
for kind, b := range buf {
if len(b) == 0 {
continue
}
out <- &Message{Kind: kind, Body: safeString(b)}
buf[kind] = b[:0] // recycle buffer
}
}
t := time.NewTimer(msgDelay)
var tc <-chan time.Time
for {
select {
case m, ok := <-in:
if !ok {
flush()
return
}
if m.Kind == "end" {
flush()
out <- m
return
}
buf[m.Kind] = append(buf[m.Kind], m.Body...)
if tc == nil {
tc = t.C
t.Reset(msgDelay)
}
case <-tc:
flush()
tc = nil
}
}
}()
return out
}
// Kill stops the process if it is running and waits for it to exit.
func (p *process) Kill() {
if p == nil {
if p == nil || p.run == nil {
return
}
p.run.Process.Kill()
@ -224,8 +331,8 @@ func (p *process) startProcess(path string, args []string, body string) error {
Path: path,
Args: args,
Stdin: strings.NewReader(body),
Stdout: &messageWriter{id: p.id, kind: "stdout", out: p.out},
Stderr: &messageWriter{id: p.id, kind: "stderr", out: p.out},
Stdout: &messageWriter{kind: "stdout", out: p.out},
Stderr: &messageWriter{kind: "stderr", out: p.out},
}
if err := cmd.Start(); err != nil {
return err
@ -261,7 +368,7 @@ func (p *process) start(body string, opt *Options) error {
args := []string{"go", "build", "-tags", "OMIT"}
if opt != nil && opt.Race {
p.out <- &Message{
Id: p.id, Kind: "stderr",
Kind: "stderr",
Body: "Running with race detector.\n",
}
args = append(args, "-race")
@ -298,35 +405,14 @@ func (p *process) start(body string, opt *Options) error {
return nil
}
// wait waits for the running process to complete
// and sends its error state to the client.
func (p *process) wait() {
p.end(p.run.Wait())
close(p.done) // unblock waiting Kill calls
}
// end sends an "end" message to the client, containing the process id and the
// given error value. It also removes the binary.
func (p *process) end(err error) {
if p.bin != "" {
defer os.Remove(p.bin)
}
m := &Message{Id: p.id, Kind: "end"}
if err != nil {
m.Body = err.Error()
}
// Wait for any outstanding reads to finish (potential race here).
time.AfterFunc(4*msgDelay, func() { p.out <- m })
}
// cmd builds an *exec.Cmd that writes its standard output and error to the
// process' output channel.
func (p *process) cmd(dir string, args ...string) *exec.Cmd {
cmd := exec.Command(args[0], args[1:]...)
cmd.Dir = dir
cmd.Env = Environ()
cmd.Stdout = &messageWriter{id: p.id, kind: "stdout", out: p.out}
cmd.Stderr = &messageWriter{id: p.id, kind: "stderr", out: p.out}
cmd.Stdout = &messageWriter{kind: "stdout", out: p.out}
cmd.Stderr = &messageWriter{kind: "stderr", out: p.out}
return cmd
}
@ -387,33 +473,15 @@ func packageName(body string) (string, error) {
// messageWriter is an io.Writer that converts all writes to Message sends on
// the out channel with the specified id and kind.
type messageWriter struct {
id, kind string
out chan<- *Message
mu sync.Mutex
buf []byte
send *time.Timer
kind string
out chan<- *Message
}
func (w *messageWriter) Write(b []byte) (n int, err error) {
// Buffer writes that occur in a short period to send as one Message.
w.mu.Lock()
w.buf = append(w.buf, b...)
if w.send == nil {
w.send = time.AfterFunc(msgDelay, w.sendNow)
}
w.mu.Unlock()
w.out <- &Message{Kind: w.kind, Body: safeString(b)}
return len(b), nil
}
func (w *messageWriter) sendNow() {
w.mu.Lock()
body := safeString(w.buf)
w.buf, w.send = nil, nil
w.mu.Unlock()
w.out <- &Message{Id: w.id, Kind: w.kind, Body: body}
}
// safeString returns b as a valid UTF-8 string.
func safeString(b []byte) string {
if utf8.Valid(b) {
@ -428,30 +496,6 @@ func safeString(b []byte) string {
return buf.String()
}
// limiter returns a channel that wraps dest. Messages sent to the channel are
// sent to dest. After msgLimit Messages have been passed on, a "kill" Message
// is sent to the kill channel, and only "end" messages are passed.
func limiter(kill chan<- *Message, dest chan<- *Message) chan<- *Message {
ch := make(chan *Message)
go func() {
n := 0
for m := range ch {
switch {
case n < msgLimit || m.Kind == "end":
dest <- m
if m.Kind == "end" {
return
}
case n == msgLimit:
// process produced too much output. Kill it.
kill <- &Message{Id: m.Id, Kind: "kill"}
}
n++
}
}()
return ch
}
var tmpdir string
func init() {