diff --git a/src/lib/io/pipe.go b/src/lib/io/pipe.go index 5f9e7a488c0..1a443ddcec2 100644 --- a/src/lib/io/pipe.go +++ b/src/lib/io/pipe.go @@ -21,7 +21,9 @@ type pipeReturn struct { // Shared pipe structure. type pipe struct { rclosed bool; // Read end closed? + rerr os.Error; // Error supplied to CloseReader wclosed bool; // Write end closed? + werr os.Error; // Error supplied to CloseWriter wpend []byte; // Written data waiting to be read. wtot int; // Bytes consumed so far in current write. cr chan []byte; // Write sends data here... @@ -39,7 +41,7 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) { p.wpend = <-p.cr; } if p.wpend == nil { - return 0, nil; + return 0, p.werr; } p.wtot = 0; } @@ -70,7 +72,7 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) { return 0, os.EINVAL; } if p.rclosed { - return 0, os.EPIPE; + return 0, p.rerr; } // Send data to reader. @@ -81,29 +83,34 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) { return res.n, res.err; } -func (p *pipe) CloseReader() os.Error { +func (p *pipe) CloseReader(rerr os.Error) os.Error { if p == nil || p.rclosed { return os.EINVAL; } // Stop any future writes. p.rclosed = true; + if rerr == nil { + rerr = os.EPIPE; + } + p.rerr = rerr; // Stop the current write. if !p.wclosed { - p.cw <- pipeReturn{p.wtot, os.EPIPE}; + p.cw <- pipeReturn{p.wtot, rerr}; } return nil; } -func (p *pipe) CloseWriter() os.Error { +func (p *pipe) CloseWriter(werr os.Error) os.Error { if p == nil || p.wclosed { return os.EINVAL; } // Stop any future reads. p.wclosed = true; + p.werr = werr; // Stop the current read. if !p.rclosed { @@ -121,70 +128,98 @@ func (p *pipe) CloseWriter() os.Error { // 2. Clients cannot use interface conversions on the // read end to find the Write method, and vice versa. -// Read half of pipe. -type pipeRead struct { +// A PipeReader is the read half of a pipe. +type PipeReader struct { lock sync.Mutex; p *pipe; } -func (r *pipeRead) Read(data []byte) (n int, err os.Error) { +// Read implements the standard Read interface: +// it reads data from the pipe, blocking until a writer +// arrives or the write end is closed. +// If the write end is closed with an error, that error is +// returned as err; otherwise err is nil. +func (r *PipeReader) Read(data []byte) (n int, err os.Error) { r.lock.Lock(); defer r.lock.Unlock(); return r.p.Read(data); } -func (r *pipeRead) Close() os.Error { +// Close closes the reader; subsequent writes to the +// write half of the pipe will return the error os.EPIPE. +func (r *PipeReader) Close() os.Error { r.lock.Lock(); defer r.lock.Unlock(); - return r.p.CloseReader(); + return r.p.CloseReader(nil); } -func (r *pipeRead) finish() { +// CloseWithError closes the reader; subsequent writes +// to the write half of the pipe will return the error rerr. +func (r *PipeReader) CloseWithError(rerr os.Error) os.Error { + r.lock.Lock(); + defer r.lock.Unlock(); + + return r.p.CloseReader(rerr); +} + +func (r *PipeReader) finish() { r.Close(); } // Write half of pipe. -type pipeWrite struct { +type PipeWriter struct { lock sync.Mutex; p *pipe; } -func (w *pipeWrite) Write(data []byte) (n int, err os.Error) { +// Write implements the standard Write interface: +// it writes data to the pipe, blocking until readers +// have consumed all the data or the read end is closed. +// If the read end is closed with an error, that err is +// returned as err; otherwise err is os.EPIPE. +func (w *PipeWriter) Write(data []byte) (n int, err os.Error) { w.lock.Lock(); defer w.lock.Unlock(); return w.p.Write(data); } -func (w *pipeWrite) Close() os.Error { +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and a nil error. +func (w *PipeWriter) Close() os.Error { w.lock.Lock(); defer w.lock.Unlock(); - return w.p.CloseWriter(); + return w.p.CloseWriter(nil); } -func (w *pipeWrite) finish() { +// CloseWithError closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and the error werr. +func (w *PipeWriter) CloseWithError(werr os.Error) os.Error { + w.lock.Lock(); + defer w.lock.Unlock(); + + return w.p.CloseWriter(werr); +} + +func (w *PipeWriter) finish() { w.Close(); } // Pipe creates a synchronous in-memory pipe. -// Used to connect code expecting an io.Reader +// It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. -// -// Reads on one end are matched by writes on the other. -// Writes don't complete until all the data has been -// written or the read end is closed. Reads return -// any available data or block until the next write -// or the write end is closed. -func Pipe() (io.ReadCloser, io.WriteCloser) { +// Reads on one end are matched with writes on the other, +// copying data directly between the two; there is no internal buffering. +func Pipe() (*PipeReader, *PipeWriter) { p := new(pipe); p.cr = make(chan []byte, 1); p.cw = make(chan pipeReturn, 1); - r := new(pipeRead); + r := new(PipeReader); r.p = p; - w := new(pipeWrite); + w := new(PipeWriter); w.p = p; return r, w; } diff --git a/src/lib/io/pipe_test.go b/src/lib/io/pipe_test.go index 3df66962854..277f445250c 100644 --- a/src/lib/io/pipe_test.go +++ b/src/lib/io/pipe_test.go @@ -5,6 +5,7 @@ package io import ( + "fmt"; "io"; "os"; "testing"; @@ -132,72 +133,93 @@ func TestPipe3(t *testing.T) { // Test read after/before writer close. -func delayClose(t *testing.T, cl Closer, ch chan int) { - time.Sleep(1000*1000); // 1 ms - if err := cl.Close(); err != nil { +type closer interface { + CloseWithError(os.Error) os.Error; + Close() os.Error; +} + +type pipeTest struct { + async bool; + err os.Error; + closeWithError bool; +} + +func (p pipeTest) String() string { + return fmt.Sprintf("async=%v err=%v closeWithError=%v", p.async, p.err, p.closeWithError); +} + +var pipeTests = []pipeTest { + pipeTest{ true, nil, false }, + pipeTest{ true, nil, true }, + pipeTest{ true, io.ErrShortWrite, true }, + pipeTest{ false, nil, false }, + pipeTest{ false, nil, true }, + pipeTest{ false, io.ErrShortWrite, true }, +} + +func delayClose(t *testing.T, cl closer, ch chan int, tt pipeTest) { + time.Sleep(1e6); // 1 ms + var err os.Error; + if tt.closeWithError { + err = cl.CloseWithError(tt.err); + } else { + err = cl.Close(); + } + if err != nil { t.Errorf("delayClose: %v", err); } ch <- 0; } -func testPipeReadClose(t *testing.T, async bool) { - c := make(chan int, 1); - r, w := Pipe(); - if async { - go delayClose(t, w, c); - } else { - delayClose(t, w, c); - } - var buf = make([]byte, 64); - n, err := r.Read(buf); - <-c; - if err != nil { - t.Errorf("read from closed pipe: %v", err); - } - if n != 0 { - t.Errorf("read on closed pipe returned %d", n); - } - if err = r.Close(); err != nil { - t.Errorf("r.Close: %v", err); +func TestPipeReadClose(t *testing.T) { + for _, tt := range pipeTests { + c := make(chan int, 1); + r, w := Pipe(); + if tt.async { + go delayClose(t, w, c, tt); + } else { + delayClose(t, w, c, tt); + } + var buf = make([]byte, 64); + n, err := r.Read(buf); + <-c; + if err != tt.err { + t.Errorf("read from closed pipe: %v want %v", err, tt.err); + } + if n != 0 { + t.Errorf("read on closed pipe returned %d", n); + } + if err = r.Close(); err != nil { + t.Errorf("r.Close: %v", err); + } } } // Test write after/before reader close. -func testPipeWriteClose(t *testing.T, async bool) { - c := make(chan int, 1); - r, w := Pipe(); - if async { - go delayClose(t, r, c); - } else { - delayClose(t, r, c); - } - n, err := WriteString(w, "hello, world"); - <-c; - if err != os.EPIPE { - t.Errorf("write on closed pipe: %v", err); - } - if n != 0 { - t.Errorf("write on closed pipe returned %d", n); - } - if err = w.Close(); err != nil { - t.Errorf("w.Close: %v", err); +func TestPipeWriteClose(t *testing.T) { + for _, tt := range pipeTests { + c := make(chan int, 1); + r, w := Pipe(); + if tt.async { + go delayClose(t, r, c, tt); + } else { + delayClose(t, r, c, tt); + } + n, err := WriteString(w, "hello, world"); + <-c; + expect := tt.err; + if expect == nil { + expect = os.EPIPE; + } + if err != expect { + t.Errorf("write on closed pipe: %v want %v", err, expect); + } + if n != 0 { + t.Errorf("write on closed pipe returned %d", n); + } + if err = w.Close(); err != nil { + t.Errorf("w.Close: %v", err); + } } } - -func TestPipeReadCloseAsync(t *testing.T) { - testPipeReadClose(t, true); -} - -func TestPipeReadCloseSync(t *testing.T) { - testPipeReadClose(t, false); -} - -func TestPipeWriteCloseAsync(t *testing.T) { - testPipeWriteClose(t, true); -} - -func TestPipeWriteCloseSync(t *testing.T) { - testPipeWriteClose(t, false); -} -