From cc62bed075ed2a7d1f65f2d65e55af1ffab7cdba Mon Sep 17 00:00:00 2001 From: Russ Cox Date: Tue, 27 Apr 2010 10:17:17 -0700 Subject: [PATCH] pipe: implementation #3; this time for sure! Added goroutine; got simpler. Fixes deadlock when doing Read+Close or Write+Close on same end. R=r, cw CC=golang-dev https://golang.org/cl/994043 --- src/pkg/io/pipe.go | 340 +++++++++++++++++++++++++--------------- src/pkg/io/pipe_test.go | 12 ++ 2 files changed, 227 insertions(+), 125 deletions(-) diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go index fe706344680..79221bd4978 100644 --- a/src/pkg/io/pipe.go +++ b/src/pkg/io/pipe.go @@ -9,114 +9,190 @@ package io import ( "os" + "runtime" "sync" ) +type pipeResult struct { + n int + err os.Error +} + // Shared pipe structure. type pipe struct { - rclosed bool // Read end closed? - rerr os.Error // Error supplied to CloseReader - wclosed bool // Write end closed? - werr os.Error // Error supplied to CloseWriter - wpend []byte // Written data waiting to be read. - wtot int // Bytes consumed so far in current write. - cw chan []byte // Write sends data here... - cr chan bool // ... and reads a done notification from here. + // Reader sends on cr1, receives on cr2. + // Writer does the same on cw1, cw2. + r1, w1 chan []byte + r2, w2 chan pipeResult + + rclose chan os.Error // read close; error to return to writers + wclose chan os.Error // write close; error to return to readers + + done chan int // read or write half is done } -func (p *pipe) Read(data []byte) (n int, err os.Error) { - if p.rclosed { - return 0, os.EINVAL - } +func (p *pipe) run() { + var ( + rb []byte // pending Read + wb []byte // pending Write + wn int // amount written so far from wb + rerr os.Error // if read end is closed, error to send to writers + werr os.Error // if write end is closed, error to send to readers + r1 chan []byte // p.cr1 or nil depending on whether Read is ok + w1 chan []byte // p.cw1 or nil depending on whether Write is ok + ndone int + ) - // Wait for next write block if necessary. - if p.wpend == nil { - if !closed(p.cw) { - p.wpend = <-p.cw + // Read and Write are enabled at the start. + r1 = p.r1 + w1 = p.w1 + + for { + select { + case <-p.done: + if ndone++; ndone == 2 { + // both reader and writer are gone + return + } + continue + case rerr = <-p.rclose: + if w1 == nil { + // finish pending Write + p.w2 <- pipeResult{wn, rerr} + wn = 0 + w1 = p.w1 // allow another Write + } + if r1 == nil { + // Close of read side during Read. + // finish pending Read with os.EINVAL. + p.r2 <- pipeResult{0, os.EINVAL} + r1 = p.r1 // allow another Read + } + continue + case werr = <-p.wclose: + if r1 == nil { + // finish pending Read + p.r2 <- pipeResult{0, werr} + r1 = p.r1 // allow another Read + } + if w1 == nil { + // Close of write side during Write. + // finish pending Write with os.EINVAL. + p.w2 <- pipeResult{wn, os.EINVAL} + wn = 0 + w1 = p.w1 // allow another Write + } + continue + case rb = <-r1: + if werr != nil { + // write end is closed + p.r2 <- pipeResult{0, werr} + continue + } + r1 = nil // disable Read until this one is done + case wb = <-w1: + if rerr != nil { + // read end is closed + p.w2 <- pipeResult{0, rerr} + continue + } + w1 = nil // disable Write until this one is done } - if closed(p.cw) { - return 0, p.werr + + if r1 == nil && w1 == nil { + // Have rb and wb. Execute. + n := copy(rb, wb) + wn += n + wb = wb[n:] + + // Finish Read. + p.r2 <- pipeResult{n, nil} + r1 = p.r1 // allow another Read + + // Maybe finish Write. + if len(wb) == 0 { + p.w2 <- pipeResult{wn, nil} + wn = 0 + w1 = p.w1 // allow another Write + } } - p.wtot = 0 } - - // Read from current write block. - n = copy(data, p.wpend) - p.wtot += n - p.wpend = p.wpend[n:] - - // If write block is done, finish the write. - if len(p.wpend) == 0 { - p.wpend = nil - p.cr <- true - p.wtot = 0 - } - - return n, nil -} - -func (p *pipe) Write(data []byte) (n int, err os.Error) { - if p.wclosed { - return 0, os.EINVAL - } - if closed(p.cr) { - return 0, p.rerr - } - - // Send write to reader. - p.cw <- data - - // Wait for reader to finish copying it. - <-p.cr - if closed(p.cr) { - _, _ = <-p.cw // undo send if reader is gone - return 0, p.rerr - } - return len(data), nil -} - -func (p *pipe) CloseReader(rerr os.Error) os.Error { - if p.rclosed { - return os.EINVAL - } - p.rclosed = true - - // Wake up writes. - if rerr == nil { - rerr = os.EPIPE - } - p.rerr = rerr - close(p.cr) - return nil -} - -func (p *pipe) CloseWriter(werr os.Error) os.Error { - if p.wclosed { - return os.EINVAL - } - p.wclosed = true - - // Wake up reads. - if werr == nil { - werr = os.EOF - } - p.werr = werr - close(p.cw) - return nil } // Read/write halves of the pipe. // They are separate structures for two reasons: // 1. If one end becomes garbage without being Closed, -// its finisher can Close so that the other end +// its finalizer can Close so that the other end // does not hang indefinitely. // 2. Clients cannot use interface conversions on the // read end to find the Write method, and vice versa. +type pipeHalf struct { + c1 chan []byte + c2 chan pipeResult + cclose chan os.Error + done chan int + + lock sync.Mutex + closed bool + + io sync.Mutex + ioclosed bool +} + +func (p *pipeHalf) rw(data []byte) (n int, err os.Error) { + // Run i/o operation. + // Check ioclosed flag under lock to make sure we're still allowed to do i/o. + p.io.Lock() + defer p.io.Unlock() + if p.ioclosed { + return 0, os.EINVAL + } + p.c1 <- data + res := <-p.c2 + return res.n, res.err +} + +func (p *pipeHalf) close(err os.Error) os.Error { + // Close pipe half. + // Only first call to close does anything. + p.lock.Lock() + if p.closed { + p.lock.Unlock() + return os.EINVAL + } + p.closed = true + p.lock.Unlock() + + // First, send the close notification. + p.cclose <- err + + // Runner is now responding to rw operations + // with os.EINVAL. Cut off future rw operations + // by setting ioclosed flag. + p.io.Lock() + p.ioclosed = true + p.io.Unlock() + + // With ioclosed set, there will be no more rw operations + // working on the channels. + // Tell the runner we won't be bothering it anymore. + p.done <- 1 + + // Successfully torn down; can disable finalizer. + runtime.SetFinalizer(p, nil) + + return nil +} + +func (p *pipeHalf) finalizer() { + p.close(os.EINVAL) +} + + // A PipeReader is the read half of a pipe. type PipeReader struct { - lock sync.Mutex - p *pipe + pipeHalf } // Read implements the standard Read interface: @@ -125,36 +201,27 @@ type PipeReader struct { // If the write end is closed with an error, that error is // returned as err; otherwise err is nil. func (r *PipeReader) Read(data []byte) (n int, err os.Error) { - r.lock.Lock() - defer r.lock.Unlock() - - return r.p.Read(data) + return r.rw(data) } // Close closes the reader; subsequent writes to the // write half of the pipe will return the error os.EPIPE. func (r *PipeReader) Close() os.Error { - r.lock.Lock() - defer r.lock.Unlock() - - return r.p.CloseReader(nil) + return r.CloseWithError(nil) } // CloseWithError closes the reader; subsequent writes -// to the write half of the pipe will return the error rerr. -func (r *PipeReader) CloseWithError(rerr os.Error) os.Error { - r.lock.Lock() - defer r.lock.Unlock() - - return r.p.CloseReader(rerr) +// to the write half of the pipe will return the error err. +func (r *PipeReader) CloseWithError(err os.Error) os.Error { + if err == nil { + err = os.EPIPE + } + return r.close(err) } -func (r *PipeReader) finish() { r.Close() } - -// Write half of pipe. +// A PipeWriter is the write half of a pipe. type PipeWriter struct { - lock sync.Mutex - p *pipe + pipeHalf } // Write implements the standard Write interface: @@ -163,32 +230,24 @@ type PipeWriter struct { // If the read end is closed with an error, that err is // returned as err; otherwise err is os.EPIPE. func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { - w.lock.Lock() - defer w.lock.Unlock() - - return w.p.Write(data) + return w.rw(data) } // Close closes the writer; subsequent reads from the -// read half of the pipe will return no bytes and a nil error. +// read half of the pipe will return no bytes and os.EOF. func (w *PipeWriter) Close() os.Error { - w.lock.Lock() - defer w.lock.Unlock() - - return w.p.CloseWriter(nil) + return w.CloseWithError(nil) } // CloseWithError closes the writer; subsequent reads from the -// read half of the pipe will return no bytes and the error werr. -func (w *PipeWriter) CloseWithError(werr os.Error) os.Error { - w.lock.Lock() - defer w.lock.Unlock() - - return w.p.CloseWriter(werr) +// read half of the pipe will return no bytes and the error err. +func (w *PipeWriter) CloseWithError(err os.Error) os.Error { + if err == nil { + err = os.EOF + } + return w.close(err) } -func (w *PipeWriter) finish() { w.Close() } - // Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. @@ -196,8 +255,39 @@ func (w *PipeWriter) finish() { w.Close() } // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { p := &pipe{ - cw: make(chan []byte, 1), - cr: make(chan bool, 1), + r1: make(chan []byte), + r2: make(chan pipeResult), + w1: make(chan []byte), + w2: make(chan pipeResult), + rclose: make(chan os.Error), + wclose: make(chan os.Error), + done: make(chan int), } - return &PipeReader{p: p}, &PipeWriter{p: p} + go p.run() + + // NOTE: Cannot use composite literal here: + // pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone} + // because this implicitly copies the pipeHalf, which copies the inner mutex. + + r := new(PipeReader) + r.c1 = p.r1 + r.c2 = p.r2 + r.cclose = p.rclose + r.done = p.done + // TODO(rsc): Should be able to write + // runtime.SetFinalizer(r, (*PipeReader).finalizer) + // but 6g doesn't see the finalizer method. + runtime.SetFinalizer(&r.pipeHalf, (*pipeHalf).finalizer) + + w := new(PipeWriter) + w.c1 = p.w1 + w.c2 = p.w2 + w.cclose = p.wclose + w.done = p.done + // TODO(rsc): Should be able to write + // runtime.SetFinalizer(w, (*PipeWriter).finalizer) + // but 6g doesn't see the finalizer method. + runtime.SetFinalizer(&w.pipeHalf, (*pipeHalf).finalizer) + + return r, w } diff --git a/src/pkg/io/pipe_test.go b/src/pkg/io/pipe_test.go index 27eb061d48f..902d7a0a3fd 100644 --- a/src/pkg/io/pipe_test.go +++ b/src/pkg/io/pipe_test.go @@ -207,6 +207,18 @@ func TestPipeReadClose(t *testing.T) { } } +// Test close on Read side during Read. +func TestPipeReadClose2(t *testing.T) { + c := make(chan int, 1) + r, _ := Pipe() + go delayClose(t, r, c, pipeTest{}) + n, err := r.Read(make([]byte, 64)) + <-c + if n != 0 || err != os.EINVAL { + t.Errorf("read from closed pipe: %v, %v want %v, %v", n, err, 0, os.EINVAL) + } +} + // Test write after/before reader close. func TestPipeWriteClose(t *testing.T) {