// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Pipe adapter to connect code expecting an io.Read // with code expecting an io.Write. package io import ( "io"; "os"; "sync"; ) type pipeReturn struct { n int; err os.Error; } // Shared pipe structure. type pipe struct { rclosed bool; // Read end closed? wclosed bool; // Write end closed? wpend []byte; // Written data waiting to be read. wtot int; // Bytes consumed so far in current write. cr chan []byte; // Write sends data here... cw chan pipeReturn; // ... and reads the n, err back from here. } func (p *pipe) Read(data []byte) (n int, err os.Error) { if p == nil || p.rclosed { return 0, os.EINVAL; } // Wait for next write block if necessary. if p.wpend == nil { if !p.wclosed { p.wpend = <-p.cr; } if p.wpend == nil { return 0, nil; } p.wtot = 0; } // Read from current write block. n = len(data); if n > len(p.wpend) { n = len(p.wpend); } for i := 0; i < n; i++ { data[i] = p.wpend[i]; } p.wtot += n; p.wpend = p.wpend[n:len(p.wpend)]; // If write block is done, finish the write. if len(p.wpend) == 0 { p.wpend = nil; p.cw <- pipeReturn{p.wtot, nil}; p.wtot = 0; } return n, nil; } func (p *pipe) Write(data []byte) (n int, err os.Error) { if p == nil || p.wclosed { return 0, os.EINVAL; } if p.rclosed { return 0, os.EPIPE; } // Send data to reader. p.cr <- data; // Wait for reader to finish copying it. res := <-p.cw; return res.n, res.err; } func (p *pipe) CloseReader() os.Error { if p == nil || p.rclosed { return os.EINVAL; } // Stop any future writes. p.rclosed = true; // Stop the current write. if !p.wclosed { p.cw <- pipeReturn{p.wtot, os.EPIPE}; } return nil; } func (p *pipe) CloseWriter() os.Error { if p == nil || p.wclosed { return os.EINVAL; } // Stop any future reads. p.wclosed = true; // Stop the current read. if !p.rclosed { p.cr <- nil; } return nil; } // Read/write halves of the pipe. // They are separate structures for two reasons: // 1. If one end becomes garbage without being Closed, // its finisher can Close so that the other end // does not hang indefinitely. // 2. Clients cannot use interface conversions on the // read end to find the Write method, and vice versa. // Read half of pipe. type pipeRead struct { lock sync.Mutex; p *pipe; } func (r *pipeRead) Read(data []byte) (n int, err os.Error) { r.lock.Lock(); defer r.lock.Unlock(); return r.p.Read(data); } func (r *pipeRead) Close() os.Error { r.lock.Lock(); defer r.lock.Unlock(); return r.p.CloseReader(); } func (r *pipeRead) finish() { r.Close(); } // Write half of pipe. type pipeWrite struct { lock sync.Mutex; p *pipe; } func (w *pipeWrite) Write(data []byte) (n int, err os.Error) { w.lock.Lock(); defer w.lock.Unlock(); return w.p.Write(data); } func (w *pipeWrite) Close() os.Error { w.lock.Lock(); defer w.lock.Unlock(); return w.p.CloseWriter(); } func (w *pipeWrite) finish() { w.Close(); } // Pipe creates a synchronous in-memory pipe. // Used to connect code expecting an io.Reader // with code expecting an io.Writer. // // Reads on one end are matched by writes on the other. // Writes don't complete until all the data has been // written or the read end is closed. Reads return // any available data or block until the next write // or the write end is closed. func Pipe() (io.ReadCloser, io.WriteCloser) { p := new(pipe); p.cr = make(chan []byte, 1); p.cw = make(chan pipeReturn, 1); r := new(pipeRead); r.p = p; w := new(pipeWrite); w.p = p; return r, w; }