1
0
mirror of https://github.com/golang/go synced 2024-11-06 04:16:11 -07:00
go/src/io/pipe.go
Kirill Smelkov d296c3235d io: fix PipeWriter.Close to wake up Writes
Since commit cc62bed0 (CL 994043) the pipe deadlock when doing
Read+Close or Write+Close on same end was fixed, alas with test for
Read+Close case only.

Then commit 6d6f3381 (CL 4252057) made a thinko: in the writer path
p.werr is checked for != nil and then err is set but there is no break
from waiting loop unlike break is there in similar condition for reader.
Together with having only Read+Close case tested that made it to leave
reintroduced Write+Close deadlock unnoticed.

Fix it.

Implicitly this also fixes net.Pipe to conform to semantic of net.Conn
interface where Close is documented to unblock any blocked Read or Write
operations.

No test added to net/ since net.Pipe tests are "Assuming that the
underlying io.Pipe implementation is solid and we're just testing the
net wrapping". The test added in this patch should be enough to cover
the breakage.

Fixes #18401
Updates #18170

Change-Id: I9e9460b3fd7d220bbe60b726accf86f352aed8d4
Reviewed-on: https://go-review.googlesource.com/34637
Run-TryBot: Russ Cox <rsc@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Russ Cox <rsc@golang.org>
2016-12-21 15:08:26 +00:00

199 lines
4.7 KiB
Go

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Writer.
package io
import (
"errors"
"sync"
)
// ErrClosedPipe is the error used for read or write operations on a closed pipe.
var ErrClosedPipe = errors.New("io: read/write on closed pipe")
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
rl sync.Mutex // gates readers one at a time
wl sync.Mutex // gates writers one at a time
l sync.Mutex // protects remaining fields
data []byte // data remaining in pending write
rwait sync.Cond // waiting reader
wwait sync.Cond // waiting writer
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
}
func (p *pipe) read(b []byte) (n int, err error) {
// One reader at a time.
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return 0, ErrClosedPipe
}
if p.data != nil {
break
}
if p.werr != nil {
return 0, p.werr
}
p.rwait.Wait()
}
n = copy(b, p.data)
p.data = p.data[n:]
if len(p.data) == 0 {
p.data = nil
p.wwait.Signal()
}
return
}
var zero [0]byte
func (p *pipe) write(b []byte) (n int, err error) {
// pipe uses nil to mean not available
if b == nil {
b = zero[:]
}
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
if p.werr != nil {
err = ErrClosedPipe
return
}
p.data = b
p.rwait.Signal()
for {
if p.data == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = ErrClosedPipe
break
}
p.wwait.Wait()
}
n = len(b) - len(p.data)
p.data = nil // in case of rerr or werr
return
}
func (p *pipe) rclose(err error) {
if err == nil {
err = ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err error) {
if err == nil {
err = EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// A PipeReader is the read half of a pipe.
type PipeReader struct {
p *pipe
}
// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.read(data)
}
// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error ErrClosedPipe.
func (r *PipeReader) Close() error {
return r.CloseWithError(nil)
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
func (r *PipeReader) CloseWithError(err error) error {
r.p.rclose(err)
return nil
}
// A PipeWriter is the write half of a pipe.
type PipeWriter struct {
p *pipe
}
// Write implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *PipeWriter) Write(data []byte) (n int, err error) {
return w.p.write(data)
}
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *PipeWriter) Close() error {
return w.CloseWithError(nil)
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// CloseWithError always returns nil.
func (w *PipeWriter) CloseWithError(err error) error {
w.p.wclose(err)
return nil
}
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the PipeWriter blocks until it has satisfied
// one or more Reads from the PipeReader that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReader{p}
w := &PipeWriter{p}
return r, w
}