mirror of
https://github.com/golang/go
synced 2024-10-05 15:51:22 -06:00
c2ec9583a0
R=gri DELTA=1359 (138 added, 32 deleted, 1189 changed) OCL=35408 CL=35420
228 lines
5.0 KiB
Go
228 lines
5.0 KiB
Go
// Copyright 2009 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Pipe adapter to connect code expecting an io.Read
|
|
// with code expecting an io.Write.
|
|
|
|
package io
|
|
|
|
import (
|
|
"os";
|
|
"sync";
|
|
)
|
|
|
|
type pipeReturn struct {
|
|
n int;
|
|
err os.Error;
|
|
}
|
|
|
|
// Shared pipe structure.
|
|
type pipe struct {
|
|
rclosed bool; // Read end closed?
|
|
rerr os.Error; // Error supplied to CloseReader
|
|
wclosed bool; // Write end closed?
|
|
werr os.Error; // Error supplied to CloseWriter
|
|
wpend []byte; // Written data waiting to be read.
|
|
wtot int; // Bytes consumed so far in current write.
|
|
cr chan []byte; // Write sends data here...
|
|
cw chan pipeReturn; // ... and reads the n, err back from here.
|
|
}
|
|
|
|
func (p *pipe) Read(data []byte) (n int, err os.Error) {
|
|
if p == nil || p.rclosed {
|
|
return 0, os.EINVAL;
|
|
}
|
|
|
|
// Wait for next write block if necessary.
|
|
if p.wpend == nil {
|
|
if !p.wclosed {
|
|
p.wpend = <-p.cr;
|
|
}
|
|
if p.wpend == nil {
|
|
return 0, p.werr;
|
|
}
|
|
p.wtot = 0;
|
|
}
|
|
|
|
// Read from current write block.
|
|
n = len(data);
|
|
if n > len(p.wpend) {
|
|
n = len(p.wpend);
|
|
}
|
|
for i := 0; i < n; i++ {
|
|
data[i] = p.wpend[i];
|
|
}
|
|
p.wtot += n;
|
|
p.wpend = p.wpend[n : len(p.wpend)];
|
|
|
|
// If write block is done, finish the write.
|
|
if len(p.wpend) == 0 {
|
|
p.wpend = nil;
|
|
p.cw <- pipeReturn{p.wtot, nil};
|
|
p.wtot = 0;
|
|
}
|
|
|
|
return n, nil;
|
|
}
|
|
|
|
func (p *pipe) Write(data []byte) (n int, err os.Error) {
|
|
if p == nil || p.wclosed {
|
|
return 0, os.EINVAL;
|
|
}
|
|
if p.rclosed {
|
|
return 0, p.rerr;
|
|
}
|
|
|
|
// Send data to reader.
|
|
p.cr <- data;
|
|
|
|
// Wait for reader to finish copying it.
|
|
res := <-p.cw;
|
|
return res.n, res.err;
|
|
}
|
|
|
|
func (p *pipe) CloseReader(rerr os.Error) os.Error {
|
|
if p == nil || p.rclosed {
|
|
return os.EINVAL;
|
|
}
|
|
|
|
// Stop any future writes.
|
|
p.rclosed = true;
|
|
if rerr == nil {
|
|
rerr = os.EPIPE;
|
|
}
|
|
p.rerr = rerr;
|
|
|
|
// Stop the current write.
|
|
if !p.wclosed {
|
|
p.cw <- pipeReturn{p.wtot, rerr};
|
|
}
|
|
|
|
return nil;
|
|
}
|
|
|
|
func (p *pipe) CloseWriter(werr os.Error) os.Error {
|
|
if werr == nil {
|
|
werr = os.EOF;
|
|
}
|
|
if p == nil || p.wclosed {
|
|
return os.EINVAL;
|
|
}
|
|
|
|
// Stop any future reads.
|
|
p.wclosed = true;
|
|
p.werr = werr;
|
|
|
|
// Stop the current read.
|
|
if !p.rclosed {
|
|
p.cr <- nil;
|
|
}
|
|
|
|
return nil;
|
|
}
|
|
|
|
// Read/write halves of the pipe.
|
|
// They are separate structures for two reasons:
|
|
// 1. If one end becomes garbage without being Closed,
|
|
// its finisher can Close so that the other end
|
|
// does not hang indefinitely.
|
|
// 2. Clients cannot use interface conversions on the
|
|
// read end to find the Write method, and vice versa.
|
|
|
|
// A PipeReader is the read half of a pipe.
|
|
type PipeReader struct {
|
|
lock sync.Mutex;
|
|
p *pipe;
|
|
}
|
|
|
|
// Read implements the standard Read interface:
|
|
// it reads data from the pipe, blocking until a writer
|
|
// arrives or the write end is closed.
|
|
// If the write end is closed with an error, that error is
|
|
// returned as err; otherwise err is nil.
|
|
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
|
|
r.lock.Lock();
|
|
defer r.lock.Unlock();
|
|
|
|
return r.p.Read(data);
|
|
}
|
|
|
|
// Close closes the reader; subsequent writes to the
|
|
// write half of the pipe will return the error os.EPIPE.
|
|
func (r *PipeReader) Close() os.Error {
|
|
r.lock.Lock();
|
|
defer r.lock.Unlock();
|
|
|
|
return r.p.CloseReader(nil);
|
|
}
|
|
|
|
// CloseWithError closes the reader; subsequent writes
|
|
// to the write half of the pipe will return the error rerr.
|
|
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
|
|
r.lock.Lock();
|
|
defer r.lock.Unlock();
|
|
|
|
return r.p.CloseReader(rerr);
|
|
}
|
|
|
|
func (r *PipeReader) finish() {
|
|
r.Close();
|
|
}
|
|
|
|
// Write half of pipe.
|
|
type PipeWriter struct {
|
|
lock sync.Mutex;
|
|
p *pipe;
|
|
}
|
|
|
|
// Write implements the standard Write interface:
|
|
// it writes data to the pipe, blocking until readers
|
|
// have consumed all the data or the read end is closed.
|
|
// If the read end is closed with an error, that err is
|
|
// returned as err; otherwise err is os.EPIPE.
|
|
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
|
|
w.lock.Lock();
|
|
defer w.lock.Unlock();
|
|
|
|
return w.p.Write(data);
|
|
}
|
|
|
|
// Close closes the writer; subsequent reads from the
|
|
// read half of the pipe will return no bytes and a nil error.
|
|
func (w *PipeWriter) Close() os.Error {
|
|
w.lock.Lock();
|
|
defer w.lock.Unlock();
|
|
|
|
return w.p.CloseWriter(nil);
|
|
}
|
|
|
|
// CloseWithError closes the writer; subsequent reads from the
|
|
// read half of the pipe will return no bytes and the error werr.
|
|
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
|
|
w.lock.Lock();
|
|
defer w.lock.Unlock();
|
|
|
|
return w.p.CloseWriter(werr);
|
|
}
|
|
|
|
func (w *PipeWriter) finish() {
|
|
w.Close();
|
|
}
|
|
|
|
// Pipe creates a synchronous in-memory pipe.
|
|
// It can be used to connect code expecting an io.Reader
|
|
// with code expecting an io.Writer.
|
|
// Reads on one end are matched with writes on the other,
|
|
// copying data directly between the two; there is no internal buffering.
|
|
func Pipe() (*PipeReader, *PipeWriter) {
|
|
p := new(pipe);
|
|
p.cr = make(chan []byte, 1);
|
|
p.cw = make(chan pipeReturn, 1);
|
|
r := new(PipeReader);
|
|
r.p = p;
|
|
w := new(PipeWriter);
|
|
w.p = p;
|
|
return r, w;
|
|
}
|