diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go index 909989ae6ad..fe706344680 100644 --- a/src/pkg/io/pipe.go +++ b/src/pkg/io/pipe.go @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Pipe adapter to connect code expecting an io.Read -// with code expecting an io.Write. +// Pipe adapter to connect code expecting an io.Reader +// with code expecting an io.Writer. package io @@ -12,54 +12,43 @@ import ( "sync" ) -type pipeReturn struct { - n int - err os.Error -} - // Shared pipe structure. type pipe struct { - rclosed bool // Read end closed? - rerr os.Error // Error supplied to CloseReader - wclosed bool // Write end closed? - werr os.Error // Error supplied to CloseWriter - wpend []byte // Written data waiting to be read. - wtot int // Bytes consumed so far in current write. - cr chan []byte // Write sends data here... - cw chan pipeReturn // ... and reads the n, err back from here. + rclosed bool // Read end closed? + rerr os.Error // Error supplied to CloseReader + wclosed bool // Write end closed? + werr os.Error // Error supplied to CloseWriter + wpend []byte // Written data waiting to be read. + wtot int // Bytes consumed so far in current write. + cw chan []byte // Write sends data here... + cr chan bool // ... and reads a done notification from here. } func (p *pipe) Read(data []byte) (n int, err os.Error) { - if p == nil || p.rclosed { + if p.rclosed { return 0, os.EINVAL } // Wait for next write block if necessary. if p.wpend == nil { - if !p.wclosed { - p.wpend = <-p.cr + if !closed(p.cw) { + p.wpend = <-p.cw } - if p.wclosed { + if closed(p.cw) { return 0, p.werr } p.wtot = 0 } // Read from current write block. - n = len(data) - if n > len(p.wpend) { - n = len(p.wpend) - } - for i := 0; i < n; i++ { - data[i] = p.wpend[i] - } + n = copy(data, p.wpend) p.wtot += n p.wpend = p.wpend[n:] // If write block is done, finish the write. if len(p.wpend) == 0 { p.wpend = nil - p.cw <- pipeReturn{p.wtot, nil} + p.cr <- true p.wtot = 0 } @@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) { } func (p *pipe) Write(data []byte) (n int, err os.Error) { - if p == nil || p.wclosed { + if p.wclosed { return 0, os.EINVAL } - if p.rclosed { + if closed(p.cr) { return 0, p.rerr } - // Send data to reader. - p.cr <- data + // Send write to reader. + p.cw <- data // Wait for reader to finish copying it. - res := <-p.cw - return res.n, res.err + <-p.cr + if closed(p.cr) { + _, _ = <-p.cw // undo send if reader is gone + return 0, p.rerr + } + return len(data), nil } func (p *pipe) CloseReader(rerr os.Error) os.Error { - if p == nil || p.rclosed { + if p.rclosed { return os.EINVAL } - - // Stop any future writes. p.rclosed = true + + // Wake up writes. if rerr == nil { rerr = os.EPIPE } p.rerr = rerr - - // Stop the current write. - if !p.wclosed { - p.cw <- pipeReturn{p.wtot, rerr} - } - + close(p.cr) return nil } func (p *pipe) CloseWriter(werr os.Error) os.Error { + if p.wclosed { + return os.EINVAL + } + p.wclosed = true + + // Wake up reads. if werr == nil { werr = os.EOF } - if p == nil || p.wclosed { - return os.EINVAL - } - - // Stop any future reads. - p.wclosed = true p.werr = werr - - // Stop the current read. - if !p.rclosed { - p.cr <- nil - } - + close(p.cw) return nil } @@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() } // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. func Pipe() (*PipeReader, *PipeWriter) { - p := new(pipe) - p.cr = make(chan []byte, 1) - p.cw = make(chan pipeReturn, 1) - r := new(PipeReader) - r.p = p - w := new(PipeWriter) - w.p = p - return r, w + p := &pipe{ + cw: make(chan []byte, 1), + cr: make(chan bool, 1), + } + return &PipeReader{p: p}, &PipeWriter{p: p} }