2009-02-16 17:32:30 -07:00
|
|
|
// Copyright 2009 The Go Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
2010-02-01 18:43:15 -07:00
|
|
|
// Pipe adapter to connect code expecting an io.Reader
|
|
|
|
// with code expecting an io.Writer.
|
2009-02-16 17:32:30 -07:00
|
|
|
|
|
|
|
package io
|
|
|
|
|
|
|
|
import (
|
2009-12-15 16:35:38 -07:00
|
|
|
"os"
|
2010-04-27 11:17:17 -06:00
|
|
|
"runtime"
|
2009-12-15 16:35:38 -07:00
|
|
|
"sync"
|
2009-02-16 17:32:30 -07:00
|
|
|
)
|
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
type pipeResult struct {
|
|
|
|
n int
|
|
|
|
err os.Error
|
|
|
|
}
|
|
|
|
|
2009-02-16 17:32:30 -07:00
|
|
|
// Shared pipe structure.
|
|
|
|
type pipe struct {
|
2010-04-27 11:17:17 -06:00
|
|
|
// Reader sends on cr1, receives on cr2.
|
|
|
|
// Writer does the same on cw1, cw2.
|
|
|
|
r1, w1 chan []byte
|
|
|
|
r2, w2 chan pipeResult
|
|
|
|
|
|
|
|
rclose chan os.Error // read close; error to return to writers
|
|
|
|
wclose chan os.Error // write close; error to return to readers
|
|
|
|
|
|
|
|
done chan int // read or write half is done
|
|
|
|
}
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
func (p *pipe) run() {
|
|
|
|
var (
|
|
|
|
rb []byte // pending Read
|
|
|
|
wb []byte // pending Write
|
|
|
|
wn int // amount written so far from wb
|
|
|
|
rerr os.Error // if read end is closed, error to send to writers
|
|
|
|
werr os.Error // if write end is closed, error to send to readers
|
|
|
|
r1 chan []byte // p.cr1 or nil depending on whether Read is ok
|
|
|
|
w1 chan []byte // p.cw1 or nil depending on whether Write is ok
|
|
|
|
ndone int
|
|
|
|
)
|
|
|
|
|
|
|
|
// Read and Write are enabled at the start.
|
|
|
|
r1 = p.r1
|
|
|
|
w1 = p.w1
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-p.done:
|
|
|
|
if ndone++; ndone == 2 {
|
|
|
|
// both reader and writer are gone
|
io: Avoid another race condition in pipes.
Goroutine 1:
Call Read on read half of pipe, entering pipeHalf.rw.
Check ioclosed field, which is false.
Send data to p.c1
Wait for response on p.c2.
Goroutine 2:
Call Close on read half of pipe, entering pipeHalf.close.
Set closed field.
Send error to p.cclose.
Set ioclosed field.
Send 1 to p.done.
Return and exit goroutine.
Goroutine 3:
This is the goroutine running pipe.run, and for some reason
it has started late.
Read error from p.rclose; set rerr and continue.
Read 1 from p.done; increment ndone and continue.
Read data from r1 (sent by goroutine 1); set r1 = nil and continue
Now goroutine 1 is waiting for a response, and goroutine 3 is
waiting for something else to happen.
This patch fixes the race by having the runner check whether
the read half is closed when it is asked for read data, and
similarly for the corresponding race on the write half.
This patch also fixes the similar race in which ndone gets
bumped up to 2 while there is a reader or writer waiting.
There is still another race to fix. It is possible for the
read half and the write half to both be closed, and for the
runner goroutine to exit, all before the runner goroutine sees
the request from a reader. E.g., in the above, have goroutine
2 also close the write half, and have goroutine 3 see both
done messages before it sees the request from goroutine 1.
R=rsc
CC=golang-dev
https://golang.org/cl/1862045
2010-07-21 11:57:46 -06:00
|
|
|
// close out any existing i/o
|
|
|
|
if r1 == nil {
|
|
|
|
p.r2 <- pipeResult{0, os.EINVAL}
|
|
|
|
}
|
|
|
|
if w1 == nil {
|
|
|
|
p.w2 <- pipeResult{0, os.EINVAL}
|
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
return
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
case rerr = <-p.rclose:
|
|
|
|
if w1 == nil {
|
|
|
|
// finish pending Write
|
|
|
|
p.w2 <- pipeResult{wn, rerr}
|
|
|
|
wn = 0
|
|
|
|
w1 = p.w1 // allow another Write
|
|
|
|
}
|
|
|
|
if r1 == nil {
|
|
|
|
// Close of read side during Read.
|
|
|
|
// finish pending Read with os.EINVAL.
|
|
|
|
p.r2 <- pipeResult{0, os.EINVAL}
|
|
|
|
r1 = p.r1 // allow another Read
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
case werr = <-p.wclose:
|
|
|
|
if r1 == nil {
|
|
|
|
// finish pending Read
|
|
|
|
p.r2 <- pipeResult{0, werr}
|
|
|
|
r1 = p.r1 // allow another Read
|
|
|
|
}
|
|
|
|
if w1 == nil {
|
|
|
|
// Close of write side during Write.
|
|
|
|
// finish pending Write with os.EINVAL.
|
|
|
|
p.w2 <- pipeResult{wn, os.EINVAL}
|
|
|
|
wn = 0
|
|
|
|
w1 = p.w1 // allow another Write
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
case rb = <-r1:
|
|
|
|
if werr != nil {
|
|
|
|
// write end is closed
|
|
|
|
p.r2 <- pipeResult{0, werr}
|
|
|
|
continue
|
|
|
|
}
|
io: Avoid another race condition in pipes.
Goroutine 1:
Call Read on read half of pipe, entering pipeHalf.rw.
Check ioclosed field, which is false.
Send data to p.c1
Wait for response on p.c2.
Goroutine 2:
Call Close on read half of pipe, entering pipeHalf.close.
Set closed field.
Send error to p.cclose.
Set ioclosed field.
Send 1 to p.done.
Return and exit goroutine.
Goroutine 3:
This is the goroutine running pipe.run, and for some reason
it has started late.
Read error from p.rclose; set rerr and continue.
Read 1 from p.done; increment ndone and continue.
Read data from r1 (sent by goroutine 1); set r1 = nil and continue
Now goroutine 1 is waiting for a response, and goroutine 3 is
waiting for something else to happen.
This patch fixes the race by having the runner check whether
the read half is closed when it is asked for read data, and
similarly for the corresponding race on the write half.
This patch also fixes the similar race in which ndone gets
bumped up to 2 while there is a reader or writer waiting.
There is still another race to fix. It is possible for the
read half and the write half to both be closed, and for the
runner goroutine to exit, all before the runner goroutine sees
the request from a reader. E.g., in the above, have goroutine
2 also close the write half, and have goroutine 3 see both
done messages before it sees the request from goroutine 1.
R=rsc
CC=golang-dev
https://golang.org/cl/1862045
2010-07-21 11:57:46 -06:00
|
|
|
if rerr != nil {
|
|
|
|
// read end is closed
|
|
|
|
p.r2 <- pipeResult{0, os.EINVAL}
|
|
|
|
continue
|
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
r1 = nil // disable Read until this one is done
|
|
|
|
case wb = <-w1:
|
|
|
|
if rerr != nil {
|
|
|
|
// read end is closed
|
|
|
|
p.w2 <- pipeResult{0, rerr}
|
|
|
|
continue
|
|
|
|
}
|
io: Avoid another race condition in pipes.
Goroutine 1:
Call Read on read half of pipe, entering pipeHalf.rw.
Check ioclosed field, which is false.
Send data to p.c1
Wait for response on p.c2.
Goroutine 2:
Call Close on read half of pipe, entering pipeHalf.close.
Set closed field.
Send error to p.cclose.
Set ioclosed field.
Send 1 to p.done.
Return and exit goroutine.
Goroutine 3:
This is the goroutine running pipe.run, and for some reason
it has started late.
Read error from p.rclose; set rerr and continue.
Read 1 from p.done; increment ndone and continue.
Read data from r1 (sent by goroutine 1); set r1 = nil and continue
Now goroutine 1 is waiting for a response, and goroutine 3 is
waiting for something else to happen.
This patch fixes the race by having the runner check whether
the read half is closed when it is asked for read data, and
similarly for the corresponding race on the write half.
This patch also fixes the similar race in which ndone gets
bumped up to 2 while there is a reader or writer waiting.
There is still another race to fix. It is possible for the
read half and the write half to both be closed, and for the
runner goroutine to exit, all before the runner goroutine sees
the request from a reader. E.g., in the above, have goroutine
2 also close the write half, and have goroutine 3 see both
done messages before it sees the request from goroutine 1.
R=rsc
CC=golang-dev
https://golang.org/cl/1862045
2010-07-21 11:57:46 -06:00
|
|
|
if werr != nil {
|
|
|
|
// write end is closed
|
|
|
|
p.w2 <- pipeResult{0, os.EINVAL}
|
|
|
|
continue
|
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
w1 = nil // disable Write until this one is done
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
|
|
|
|
if r1 == nil && w1 == nil {
|
|
|
|
// Have rb and wb. Execute.
|
|
|
|
n := copy(rb, wb)
|
|
|
|
wn += n
|
|
|
|
wb = wb[n:]
|
|
|
|
|
|
|
|
// Finish Read.
|
|
|
|
p.r2 <- pipeResult{n, nil}
|
|
|
|
r1 = p.r1 // allow another Read
|
|
|
|
|
|
|
|
// Maybe finish Write.
|
|
|
|
if len(wb) == 0 {
|
|
|
|
p.w2 <- pipeResult{wn, nil}
|
|
|
|
wn = 0
|
|
|
|
w1 = p.w1 // allow another Write
|
|
|
|
}
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
// Read/write halves of the pipe.
|
|
|
|
// They are separate structures for two reasons:
|
|
|
|
// 1. If one end becomes garbage without being Closed,
|
|
|
|
// its finalizer can Close so that the other end
|
|
|
|
// does not hang indefinitely.
|
|
|
|
// 2. Clients cannot use interface conversions on the
|
|
|
|
// read end to find the Write method, and vice versa.
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
type pipeHalf struct {
|
|
|
|
c1 chan []byte
|
|
|
|
c2 chan pipeResult
|
|
|
|
cclose chan os.Error
|
|
|
|
done chan int
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
lock sync.Mutex
|
|
|
|
closed bool
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
io sync.Mutex
|
|
|
|
ioclosed bool
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
func (p *pipeHalf) rw(data []byte) (n int, err os.Error) {
|
|
|
|
// Run i/o operation.
|
|
|
|
// Check ioclosed flag under lock to make sure we're still allowed to do i/o.
|
|
|
|
p.io.Lock()
|
|
|
|
if p.ioclosed {
|
2010-06-30 14:14:46 -06:00
|
|
|
p.io.Unlock()
|
2009-11-09 13:07:39 -07:00
|
|
|
return 0, os.EINVAL
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
2010-06-30 14:14:46 -06:00
|
|
|
p.io.Unlock()
|
2010-04-27 11:17:17 -06:00
|
|
|
p.c1 <- data
|
|
|
|
res := <-p.c2
|
|
|
|
return res.n, res.err
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
func (p *pipeHalf) close(err os.Error) os.Error {
|
|
|
|
// Close pipe half.
|
|
|
|
// Only first call to close does anything.
|
|
|
|
p.lock.Lock()
|
|
|
|
if p.closed {
|
|
|
|
p.lock.Unlock()
|
2009-11-09 13:07:39 -07:00
|
|
|
return os.EINVAL
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
p.closed = true
|
|
|
|
p.lock.Unlock()
|
2010-02-01 18:43:15 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
// First, send the close notification.
|
|
|
|
p.cclose <- err
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
// Runner is now responding to rw operations
|
|
|
|
// with os.EINVAL. Cut off future rw operations
|
|
|
|
// by setting ioclosed flag.
|
|
|
|
p.io.Lock()
|
|
|
|
p.ioclosed = true
|
|
|
|
p.io.Unlock()
|
|
|
|
|
|
|
|
// With ioclosed set, there will be no more rw operations
|
|
|
|
// working on the channels.
|
|
|
|
// Tell the runner we won't be bothering it anymore.
|
|
|
|
p.done <- 1
|
|
|
|
|
|
|
|
// Successfully torn down; can disable finalizer.
|
|
|
|
runtime.SetFinalizer(p, nil)
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2009-12-15 16:35:38 -07:00
|
|
|
return nil
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
func (p *pipeHalf) finalizer() {
|
|
|
|
p.close(os.EINVAL)
|
|
|
|
}
|
|
|
|
|
2009-02-16 17:32:30 -07:00
|
|
|
|
2009-06-06 22:51:05 -06:00
|
|
|
// A PipeReader is the read half of a pipe.
|
|
|
|
type PipeReader struct {
|
2010-04-27 11:17:17 -06:00
|
|
|
pipeHalf
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2009-06-06 22:51:05 -06:00
|
|
|
// Read implements the standard Read interface:
|
|
|
|
// it reads data from the pipe, blocking until a writer
|
|
|
|
// arrives or the write end is closed.
|
|
|
|
// If the write end is closed with an error, that error is
|
|
|
|
// returned as err; otherwise err is nil.
|
|
|
|
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
|
2010-04-27 11:17:17 -06:00
|
|
|
return r.rw(data)
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2009-06-06 22:51:05 -06:00
|
|
|
// Close closes the reader; subsequent writes to the
|
|
|
|
// write half of the pipe will return the error os.EPIPE.
|
|
|
|
func (r *PipeReader) Close() os.Error {
|
2010-04-27 11:17:17 -06:00
|
|
|
return r.CloseWithError(nil)
|
2009-06-06 22:51:05 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
// CloseWithError closes the reader; subsequent writes
|
2010-04-27 11:17:17 -06:00
|
|
|
// to the write half of the pipe will return the error err.
|
|
|
|
func (r *PipeReader) CloseWithError(err os.Error) os.Error {
|
|
|
|
if err == nil {
|
|
|
|
err = os.EPIPE
|
|
|
|
}
|
|
|
|
return r.close(err)
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2010-04-27 11:17:17 -06:00
|
|
|
// A PipeWriter is the write half of a pipe.
|
2009-06-06 22:51:05 -06:00
|
|
|
type PipeWriter struct {
|
2010-04-27 11:17:17 -06:00
|
|
|
pipeHalf
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2009-06-06 22:51:05 -06:00
|
|
|
// Write implements the standard Write interface:
|
|
|
|
// it writes data to the pipe, blocking until readers
|
|
|
|
// have consumed all the data or the read end is closed.
|
|
|
|
// If the read end is closed with an error, that err is
|
|
|
|
// returned as err; otherwise err is os.EPIPE.
|
|
|
|
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
|
2010-04-27 11:17:17 -06:00
|
|
|
return w.rw(data)
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2009-06-06 22:51:05 -06:00
|
|
|
// Close closes the writer; subsequent reads from the
|
2010-04-27 11:17:17 -06:00
|
|
|
// read half of the pipe will return no bytes and os.EOF.
|
2009-06-06 22:51:05 -06:00
|
|
|
func (w *PipeWriter) Close() os.Error {
|
2010-04-27 11:17:17 -06:00
|
|
|
return w.CloseWithError(nil)
|
2009-06-06 22:51:05 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
// CloseWithError closes the writer; subsequent reads from the
|
2010-04-27 11:17:17 -06:00
|
|
|
// read half of the pipe will return no bytes and the error err.
|
|
|
|
func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
|
|
|
|
if err == nil {
|
|
|
|
err = os.EOF
|
|
|
|
}
|
|
|
|
return w.close(err)
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|
|
|
|
|
2009-03-06 04:43:44 -07:00
|
|
|
// Pipe creates a synchronous in-memory pipe.
|
2009-06-06 22:51:05 -06:00
|
|
|
// It can be used to connect code expecting an io.Reader
|
2009-05-08 12:22:57 -06:00
|
|
|
// with code expecting an io.Writer.
|
2009-06-06 22:51:05 -06:00
|
|
|
// Reads on one end are matched with writes on the other,
|
|
|
|
// copying data directly between the two; there is no internal buffering.
|
|
|
|
func Pipe() (*PipeReader, *PipeWriter) {
|
2010-02-01 18:43:15 -07:00
|
|
|
p := &pipe{
|
2010-04-27 11:17:17 -06:00
|
|
|
r1: make(chan []byte),
|
|
|
|
r2: make(chan pipeResult),
|
|
|
|
w1: make(chan []byte),
|
|
|
|
w2: make(chan pipeResult),
|
|
|
|
rclose: make(chan os.Error),
|
|
|
|
wclose: make(chan os.Error),
|
|
|
|
done: make(chan int),
|
2010-02-01 18:43:15 -07:00
|
|
|
}
|
2010-04-27 11:17:17 -06:00
|
|
|
go p.run()
|
|
|
|
|
|
|
|
// NOTE: Cannot use composite literal here:
|
|
|
|
// pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
|
|
|
|
// because this implicitly copies the pipeHalf, which copies the inner mutex.
|
|
|
|
|
|
|
|
r := new(PipeReader)
|
|
|
|
r.c1 = p.r1
|
|
|
|
r.c2 = p.r2
|
|
|
|
r.cclose = p.rclose
|
|
|
|
r.done = p.done
|
|
|
|
// TODO(rsc): Should be able to write
|
|
|
|
// runtime.SetFinalizer(r, (*PipeReader).finalizer)
|
|
|
|
// but 6g doesn't see the finalizer method.
|
|
|
|
runtime.SetFinalizer(&r.pipeHalf, (*pipeHalf).finalizer)
|
|
|
|
|
|
|
|
w := new(PipeWriter)
|
|
|
|
w.c1 = p.w1
|
|
|
|
w.c2 = p.w2
|
|
|
|
w.cclose = p.wclose
|
|
|
|
w.done = p.done
|
|
|
|
// TODO(rsc): Should be able to write
|
|
|
|
// runtime.SetFinalizer(w, (*PipeWriter).finalizer)
|
|
|
|
// but 6g doesn't see the finalizer method.
|
|
|
|
runtime.SetFinalizer(&w.pipeHalf, (*pipeHalf).finalizer)
|
|
|
|
|
|
|
|
return r, w
|
2009-02-16 17:32:30 -07:00
|
|
|
}
|