1
0
mirror of https://github.com/golang/go synced 2024-11-23 01:40:03 -07:00

old/netchan: delete as part of move to go.exp/old/netchan

R=golang-dev, minux.ma
CC=golang-dev
https://golang.org/cl/7450050
This commit is contained in:
Rob Pike 2013-03-02 11:45:22 -08:00
parent 0acda4e87d
commit aa36afed3b
4 changed files with 0 additions and 1472 deletions

View File

@ -1,338 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"encoding/gob"
"errors"
"io"
"reflect"
"sync"
"time"
)
// The direction of a connection from the client's perspective.
type Dir int
const (
Recv Dir = iota
Send
)
func (dir Dir) String() string {
switch dir {
case Recv:
return "Recv"
case Send:
return "Send"
}
return "???"
}
// Payload types
const (
payRequest = iota // request structure follows
payError // error structure follows
payData // user payload follows
payAck // acknowledgement; no payload
payClosed // channel is now closed
payAckSend // payload has been delivered.
)
// A header is sent as a prefix to every transmission. It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
Id int
PayloadType int
SeqNum int64
}
// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages, with space for size buffered values. If count is -1, it means unlimited.
type request struct {
Name string
Count int64
Size int
Dir Dir
}
// Sent with a header to report an error.
type error_ struct {
Error string
}
// Used to unify management of acknowledgements for import and export.
type unackedCounter interface {
unackedCount() int64
ack() int64
seq() int64
}
// A channel and its direction.
type chanDir struct {
ch reflect.Value
dir Dir
}
// clientSet contains the objects and methods needed for tracking
// clients of an exporter and draining outstanding messages.
type clientSet struct {
mu sync.Mutex // protects access to channel and client maps
names map[string]*chanDir
clients map[unackedCounter]bool
}
// Mutex-protected encoder and decoder pair.
type encDec struct {
decLock sync.Mutex
dec *gob.Decoder
encLock sync.Mutex
enc *gob.Encoder
}
func newEncDec(conn io.ReadWriter) *encDec {
return &encDec{
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(conn),
}
}
// Decode an item from the connection.
func (ed *encDec) decode(value reflect.Value) error {
ed.decLock.Lock()
err := ed.dec.DecodeValue(value)
if err != nil {
// TODO: tear down connection?
}
ed.decLock.Unlock()
return err
}
// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
ed.encLock.Lock()
hdr.PayloadType = payloadType
err := ed.enc.Encode(hdr)
if err == nil {
if payload != nil {
err = ed.enc.Encode(payload)
}
}
if err != nil {
// TODO: tear down connection if there is an error?
}
ed.encLock.Unlock()
return err
}
// See the comment for Exporter.Drain.
func (cs *clientSet) drain(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for {
pending := false
cs.mu.Lock()
// Any messages waiting for a client?
for _, chDir := range cs.names {
if chDir.ch.Len() > 0 {
pending = true
}
}
// Any unacknowledged messages?
for client := range cs.clients {
n := client.unackedCount()
if n > 0 { // Check for > rather than != just to be safe.
pending = true
break
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Now().After(deadline) {
return errors.New("timeout")
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// See the comment for Exporter.Sync.
func (cs *clientSet) sync(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
// seq remembers the clients and their seqNum at point of entry.
seq := make(map[unackedCounter]int64)
cs.mu.Lock()
for client := range cs.clients {
seq[client] = client.seq()
}
cs.mu.Unlock()
for {
pending := false
cs.mu.Lock()
// Any unacknowledged messages? Look only at clients that existed
// when we started and are still in this client set.
for client := range seq {
if _, ok := cs.clients[client]; ok {
if client.ack() < seq[client] {
pending = true
break
}
}
}
cs.mu.Unlock()
if !pending {
break
}
if timeout > 0 && time.Now().After(deadline) {
return errors.New("timeout")
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// A netChan represents a channel imported or exported
// on a single connection. Flow is controlled by the receiving
// side by sending payAckSend messages when values
// are delivered into the local channel.
type netChan struct {
*chanDir
name string
id int
size int // buffer size of channel.
closed bool
// sender-specific state
ackCh chan bool // buffered with space for all the acks we need
space int // available space.
// receiver-specific state
sendCh chan reflect.Value // buffered channel of values received from other end.
ed *encDec // so that we can send acks.
count int64 // number of values still to receive.
}
// Create a new netChan with the given name (only used for
// messages), id, direction, buffer size, and count.
// The connection to the other side is represented by ed.
func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
if c.dir == Send {
c.ackCh = make(chan bool, size)
c.space = size
}
return c
}
// Close the channel.
func (nch *netChan) close() {
if nch.closed {
return
}
if nch.dir == Recv {
if nch.sendCh != nil {
// If the sender goroutine is active, close the channel to it.
// It will close nch.ch when it can.
close(nch.sendCh)
} else {
nch.ch.Close()
}
} else {
nch.ch.Close()
close(nch.ackCh)
}
nch.closed = true
}
// Send message from remote side to local receiver.
func (nch *netChan) send(val reflect.Value) {
if nch.dir != Recv {
panic("send on wrong direction of channel")
}
if nch.sendCh == nil {
// If possible, do local send directly and ack immediately.
if nch.ch.TrySend(val) {
nch.sendAck()
return
}
// Start sender goroutine to manage delayed delivery of values.
nch.sendCh = make(chan reflect.Value, nch.size)
go nch.sender()
}
select {
case nch.sendCh <- val:
// ok
default:
// TODO: should this be more resilient?
panic("netchan: remote sender sent more values than allowed")
}
}
// sendAck sends an acknowledgment that a message has left
// the channel's buffer. If the messages remaining to be sent
// will fit in the channel's buffer, then we don't
// need to send an ack.
func (nch *netChan) sendAck() {
if nch.count < 0 || nch.count > int64(nch.size) {
nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
}
if nch.count > 0 {
nch.count--
}
}
// The sender process forwards items from the sending queue
// to the destination channel, acknowledging each item.
func (nch *netChan) sender() {
if nch.dir != Recv {
panic("sender on wrong direction of channel")
}
// When Exporter.Hangup is called, the underlying channel is closed,
// and so we may get a "too many operations on closed channel" error
// if there are outstanding messages in sendCh.
// Make sure that this doesn't panic the whole program.
defer func() {
if r := recover(); r != nil {
// TODO check that r is "too many operations", otherwise re-panic.
}
}()
for v := range nch.sendCh {
nch.ch.Send(v)
nch.sendAck()
}
nch.ch.Close()
}
// Receive value from local side for sending to remote side.
func (nch *netChan) recv() (val reflect.Value, ok bool) {
if nch.dir != Send {
panic("recv on wrong direction of channel")
}
if nch.space == 0 {
// Wait for buffer space.
<-nch.ackCh
nch.space++
}
nch.space--
return nch.ch.Recv()
}
// acked is called when the remote side indicates that
// a value has been delivered.
func (nch *netChan) acked() {
if nch.dir != Send {
panic("recv on wrong direction of channel")
}
select {
case nch.ackCh <- true:
// ok
default:
// TODO: should this be more resilient?
panic("netchan: remote receiver sent too many acks")
}
}

View File

@ -1,400 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
Package netchan implements type-safe networked channels:
it allows the two ends of a channel to appear on different
computers connected by a network. It does this by transporting
data sent to a channel on one machine so it can be recovered
by a receive of a channel of the same type on the other.
An exporter publishes a set of channels by name. An importer
connects to the exporting machine and imports the channels
by name. After importing the channels, the two machines can
use the channels in the usual way.
Networked channels are not synchronized; they always behave
as if they are buffered channels of at least one element.
*/
package netchan
// BUG: can't use range clause to receive when using ImportNValues to limit the count.
import (
"errors"
"io"
"log"
"net"
"reflect"
"strconv"
"sync"
"time"
)
// Export
// expLog is a logging convenience function. The first argument must be a string.
func expLog(args ...interface{}) {
args[0] = "netchan export: " + args[0].(string)
log.Print(args...)
}
// An Exporter allows a set of channels to be published on a single
// network port. A single machine may have multiple Exporters
// but they must use different ports.
type Exporter struct {
*clientSet
}
type expClient struct {
*encDec
exp *Exporter
chans map[int]*netChan // channels in use by client
mu sync.Mutex // protects remaining fields
errored bool // client has been sent an error
seqNum int64 // sequences messages sent to client; has value of highest sent
ackNum int64 // highest sequence number acknowledged
seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
client := new(expClient)
client.exp = exp
client.encDec = newEncDec(conn)
client.seqNum = 0
client.ackNum = 0
client.chans = make(map[int]*netChan)
return client
}
func (client *expClient) sendError(hdr *header, err string) {
error := &error_{err}
expLog("sending error to client:", error.Error)
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
client.mu.Lock()
client.errored = true
client.mu.Unlock()
}
func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
exp := client.exp
exp.mu.Lock()
ech, ok := exp.names[name]
exp.mu.Unlock()
if !ok {
client.sendError(hdr, "no such channel: "+name)
return nil
}
if ech.dir != dir {
client.sendError(hdr, "wrong direction for channel: "+name)
return nil
}
nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
client.chans[hdr.Id] = nch
return nch
}
func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
nch := client.chans[hdr.Id]
if nch == nil {
return nil
}
if nch.dir != dir {
client.sendError(hdr, "wrong direction for channel: "+nch.name)
}
return nch
}
// The function run manages sends and receives for a single client. For each
// (client Recv) request, this will launch a serveRecv goroutine to deliver
// the data for that channel, while (client Send) requests are handled as
// data arrives from the client.
func (client *expClient) run() {
hdr := new(header)
hdrValue := reflect.ValueOf(hdr)
req := new(request)
reqValue := reflect.ValueOf(req)
error := new(error_)
for {
*hdr = header{}
if err := client.decode(hdrValue); err != nil {
if err != io.EOF {
expLog("error decoding client header:", err)
}
break
}
switch hdr.PayloadType {
case payRequest:
*req = request{}
if err := client.decode(reqValue); err != nil {
expLog("error decoding client request:", err)
break
}
if req.Size < 1 {
panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
}
switch req.Dir {
case Recv:
// look up channel before calling serveRecv to
// avoid a lock around client.chans.
if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
go client.serveRecv(nch, *hdr, req.Count)
}
case Send:
client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
// The actual sends will have payload type payData.
// TODO: manage the count?
default:
error.Error = "request: can't handle channel direction"
expLog(error.Error, req.Dir)
client.encode(hdr, payError, error)
}
case payData:
client.serveSend(*hdr)
case payClosed:
client.serveClosed(*hdr)
case payAck:
client.mu.Lock()
if client.ackNum != hdr.SeqNum-1 {
// Since the sequence number is incremented and the message is sent
// in a single instance of locking client.mu, the messages are guaranteed
// to be sent in order. Therefore receipt of acknowledgement N means
// all messages <=N have been seen by the recipient. We check anyway.
expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
}
if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
client.ackNum = hdr.SeqNum
}
client.mu.Unlock()
case payAckSend:
if nch := client.getChan(hdr, Send); nch != nil {
nch.acked()
}
default:
log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
}
}
client.exp.delClient(client)
}
// Send all the data on a single channel to a client asking for a Recv.
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
for {
val, ok := nch.recv()
if !ok {
if err := client.encode(&hdr, payClosed, nil); err != nil {
expLog("error encoding server closed message:", err)
}
break
}
// We hold the lock during transmission to guarantee messages are
// sent in sequence number order. Also, we increment first so the
// value of client.SeqNum is the value of the highest used sequence
// number, not one beyond.
client.mu.Lock()
client.seqNum++
hdr.SeqNum = client.seqNum
client.seqLock.Lock() // guarantee ordering of messages
client.mu.Unlock()
err := client.encode(&hdr, payData, val.Interface())
client.seqLock.Unlock()
if err != nil {
expLog("error encoding client response:", err)
client.sendError(&hdr, err.Error())
break
}
// Negative count means run forever.
if count >= 0 {
if count--; count <= 0 {
break
}
}
}
}
// Receive and deliver locally one item from a client asking for a Send
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveSend(hdr header) {
nch := client.getChan(&hdr, Recv)
if nch == nil {
return
}
// Create a new value for each received item.
val := reflect.New(nch.ch.Type().Elem()).Elem()
if err := client.decode(val); err != nil {
expLog("value decode:", err, "; type ", nch.ch.Type())
return
}
nch.send(val)
}
// Report that client has closed the channel that is sending to us.
// The header is passed by value to avoid issues of overwriting.
func (client *expClient) serveClosed(hdr header) {
nch := client.getChan(&hdr, Recv)
if nch == nil {
return
}
nch.close()
}
func (client *expClient) unackedCount() int64 {
client.mu.Lock()
n := client.seqNum - client.ackNum
client.mu.Unlock()
return n
}
func (client *expClient) seq() int64 {
client.mu.Lock()
n := client.seqNum
client.mu.Unlock()
return n
}
func (client *expClient) ack() int64 {
client.mu.Lock()
n := client.seqNum
client.mu.Unlock()
return n
}
// Serve waits for incoming connections on the listener
// and serves the Exporter's channels on each.
// It blocks until the listener is closed.
func (exp *Exporter) Serve(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
expLog("listen:", err)
break
}
go exp.ServeConn(conn)
}
}
// ServeConn exports the Exporter's channels on conn.
// It blocks until the connection is terminated.
func (exp *Exporter) ServeConn(conn io.ReadWriter) {
exp.addClient(conn).run()
}
// NewExporter creates a new Exporter that exports a set of channels.
func NewExporter() *Exporter {
e := &Exporter{
clientSet: &clientSet{
names: make(map[string]*chanDir),
clients: make(map[unackedCounter]bool),
},
}
return e
}
// ListenAndServe exports the exporter's channels through the
// given network and local address defined as in net.Listen.
func (exp *Exporter) ListenAndServe(network, localaddr string) error {
listener, err := net.Listen(network, localaddr)
if err != nil {
return err
}
go exp.Serve(listener)
return nil
}
// addClient creates a new expClient and records its existence
func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
client := newClient(exp, conn)
exp.mu.Lock()
exp.clients[client] = true
exp.mu.Unlock()
return client
}
// delClient forgets the client existed
func (exp *Exporter) delClient(client *expClient) {
exp.mu.Lock()
delete(exp.clients, client)
exp.mu.Unlock()
}
// Drain waits until all messages sent from this exporter/importer, including
// those not yet sent to any client and possibly including those sent while
// Drain was executing, have been received by the importer. In short, it
// waits until all the exporter's messages have been received by a client.
// If the timeout is positive and Drain takes longer than that to complete,
// an error is returned.
func (exp *Exporter) Drain(timeout time.Duration) error {
// This wrapper function is here so the method's comment will appear in godoc.
return exp.clientSet.drain(timeout)
}
// Sync waits until all clients of the exporter have received the messages
// that were sent at the time Sync was invoked. Unlike Drain, it does not
// wait for messages sent while it is running or messages that have not been
// dispatched to any client. If the timeout is positive and Sync takes longer
// than that to complete, an error is returned.
func (exp *Exporter) Sync(timeout time.Duration) error {
// This wrapper function is here so the method's comment will appear in godoc.
return exp.clientSet.sync(timeout)
}
func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
chanType := reflect.TypeOf(chT)
if chanType.Kind() != reflect.Chan {
return reflect.Value{}, errors.New("not a channel")
}
if dir != Send && dir != Recv {
return reflect.Value{}, errors.New("unknown channel direction")
}
switch chanType.ChanDir() {
case reflect.BothDir:
case reflect.SendDir:
if dir != Recv {
return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
}
case reflect.RecvDir:
if dir != Send {
return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
}
}
return reflect.ValueOf(chT), nil
}
// Export exports a channel of a given type and specified direction. The
// channel to be exported is provided in the call and may be of arbitrary
// channel type.
// Despite the literal signature, the effective signature is
// Export(name string, chT chan T, dir Dir)
func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
ch, err := checkChan(chT, dir)
if err != nil {
return err
}
exp.mu.Lock()
defer exp.mu.Unlock()
_, present := exp.names[name]
if present {
return errors.New("channel name already being exported:" + name)
}
exp.names[name] = &chanDir{ch, dir}
return nil
}
// Hangup disassociates the named channel from the Exporter and closes
// the channel. Messages in flight for the channel may be dropped.
func (exp *Exporter) Hangup(name string) error {
exp.mu.Lock()
chDir, ok := exp.names[name]
if ok {
delete(exp.names, name)
}
// TODO drop all instances of channel from client sets
exp.mu.Unlock()
if !ok {
return errors.New("netchan export: hangup: no such channel: " + name)
}
chDir.ch.Close()
return nil
}

View File

@ -1,287 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"errors"
"io"
"log"
"net"
"reflect"
"sync"
"time"
)
// Import
// impLog is a logging convenience function. The first argument must be a string.
func impLog(args ...interface{}) {
args[0] = "netchan import: " + args[0].(string)
log.Print(args...)
}
// An Importer allows a set of channels to be imported from a single
// remote machine/network port. A machine may have multiple
// importers, even from the same machine/network port.
type Importer struct {
*encDec
chanLock sync.Mutex // protects access to channel map
names map[string]*netChan
chans map[int]*netChan
errors chan error
maxId int
mu sync.Mutex // protects remaining fields
unacked int64 // number of unacknowledged sends.
seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
// NewImporter creates a new Importer object to import a set of channels
// from the given connection. The Exporter must be available and serving when
// the Importer is created.
func NewImporter(conn io.ReadWriter) *Importer {
imp := new(Importer)
imp.encDec = newEncDec(conn)
imp.chans = make(map[int]*netChan)
imp.names = make(map[string]*netChan)
imp.errors = make(chan error, 10)
imp.unacked = 0
go imp.run()
return imp
}
// Import imports a set of channels from the given network and address.
func Import(network, remoteaddr string) (*Importer, error) {
conn, err := net.Dial(network, remoteaddr)
if err != nil {
return nil, err
}
return NewImporter(conn), nil
}
// shutdown closes all channels for which we are receiving data from the remote side.
func (imp *Importer) shutdown() {
imp.chanLock.Lock()
for _, ich := range imp.chans {
if ich.dir == Recv {
ich.close()
}
}
imp.chanLock.Unlock()
}
// Handle the data from a single imported data stream, which will
// have the form
// (response, data)*
// The response identifies by name which channel is transmitting data.
func (imp *Importer) run() {
// Loop on responses; requests are sent by ImportNValues()
hdr := new(header)
hdrValue := reflect.ValueOf(hdr)
ackHdr := new(header)
err := new(error_)
errValue := reflect.ValueOf(err)
for {
*hdr = header{}
if e := imp.decode(hdrValue); e != nil {
if e != io.EOF {
impLog("header:", e)
imp.shutdown()
}
return
}
switch hdr.PayloadType {
case payData:
// done lower in loop
case payError:
if e := imp.decode(errValue); e != nil {
impLog("error:", e)
return
}
if err.Error != "" {
impLog("response error:", err.Error)
select {
case imp.errors <- errors.New(err.Error):
continue // errors are not acknowledged
default:
imp.shutdown()
return
}
}
case payClosed:
nch := imp.getChan(hdr.Id, false)
if nch != nil {
nch.close()
}
continue // closes are not acknowledged.
case payAckSend:
// we can receive spurious acks if the channel is
// hung up, so we ask getChan to ignore any errors.
nch := imp.getChan(hdr.Id, true)
if nch != nil {
nch.acked()
imp.mu.Lock()
imp.unacked--
imp.mu.Unlock()
}
continue
default:
impLog("unexpected payload type:", hdr.PayloadType)
return
}
nch := imp.getChan(hdr.Id, false)
if nch == nil {
continue
}
if nch.dir != Recv {
impLog("cannot happen: receive from non-Recv channel")
return
}
// Acknowledge receipt
ackHdr.Id = hdr.Id
ackHdr.SeqNum = hdr.SeqNum
imp.encode(ackHdr, payAck, nil)
// Create a new value for each received item.
value := reflect.New(nch.ch.Type().Elem()).Elem()
if e := imp.decode(value); e != nil {
impLog("importer value decode:", e)
return
}
nch.send(value)
}
}
func (imp *Importer) getChan(id int, errOk bool) *netChan {
imp.chanLock.Lock()
ich := imp.chans[id]
imp.chanLock.Unlock()
if ich == nil {
if !errOk {
impLog("unknown id in netchan request: ", id)
}
return nil
}
return ich
}
// Errors returns a channel from which transmission and protocol errors
// can be read. Clients of the importer are not required to read the error
// channel for correct execution. However, if too many errors occur
// without being read from the error channel, the importer will shut down.
func (imp *Importer) Errors() chan error {
return imp.errors
}
// Import imports a channel of the given type, size and specified direction.
// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error {
return imp.ImportNValues(name, chT, dir, size, -1)
}
// ImportNValues imports a channel of the given type and specified
// direction and then receives or transmits up to n values on that
// channel. A value of n==-1 implies an unbounded number of values. The
// channel will have buffer space for size values, or 1 value if size < 1.
// The channel to be bound to the remote site's channel is provided
// in the call and may be of arbitrary channel type.
// Despite the literal signature, the effective signature is
// ImportNValues(name string, chT chan T, dir Dir, size, n int) error
// Example usage:
// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
// if err != nil { log.Fatal(err) }
// ch := make(chan myType)
// err = imp.ImportNValues("name", ch, Recv, 1, 1)
// if err != nil { log.Fatal(err) }
// fmt.Printf("%+v\n", <-ch)
func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error {
ch, err := checkChan(chT, dir)
if err != nil {
return err
}
imp.chanLock.Lock()
defer imp.chanLock.Unlock()
_, present := imp.names[name]
if present {
return errors.New("channel name already being imported:" + name)
}
if size < 1 {
size = 1
}
id := imp.maxId
imp.maxId++
nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
imp.names[name] = nch
imp.chans[id] = nch
// Tell the other side about this channel.
hdr := &header{Id: id}
req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
if err = imp.encode(hdr, payRequest, req); err != nil {
impLog("request encode:", err)
return err
}
if dir == Send {
go func() {
for i := 0; n == -1 || i < n; i++ {
val, ok := nch.recv()
if !ok {
if err = imp.encode(hdr, payClosed, nil); err != nil {
impLog("error encoding client closed message:", err)
}
return
}
// We hold the lock during transmission to guarantee messages are
// sent in order.
imp.mu.Lock()
imp.unacked++
imp.seqLock.Lock()
imp.mu.Unlock()
if err = imp.encode(hdr, payData, val.Interface()); err != nil {
impLog("error encoding client send:", err)
return
}
imp.seqLock.Unlock()
}
}()
}
return nil
}
// Hangup disassociates the named channel from the Importer and closes
// the channel. Messages in flight for the channel may be dropped.
func (imp *Importer) Hangup(name string) error {
imp.chanLock.Lock()
defer imp.chanLock.Unlock()
nc := imp.names[name]
if nc == nil {
return errors.New("netchan import: hangup: no such channel: " + name)
}
delete(imp.names, name)
delete(imp.chans, nc.id)
nc.close()
return nil
}
func (imp *Importer) unackedCount() int64 {
imp.mu.Lock()
n := imp.unacked
imp.mu.Unlock()
return n
}
// Drain waits until all messages sent from this exporter/importer, including
// those not yet sent to any server and possibly including those sent while
// Drain was executing, have been received by the exporter. In short, it
// waits until all the importer's messages have been received.
// If the timeout (measured in nanoseconds) is positive and Drain takes
// longer than that to complete, an error is returned.
func (imp *Importer) Drain(timeout int64) error {
deadline := time.Now().Add(time.Duration(timeout))
for imp.unackedCount() > 0 {
if timeout > 0 && time.Now().After(deadline) {
return errors.New("timeout")
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

View File

@ -1,447 +0,0 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"net"
"strings"
"testing"
"time"
)
const count = 10 // number of items in most tests
const closeCount = 5 // number of items when sender closes early
const base = 23
func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) {
ch := make(chan int)
err := exp.Export("exportedSend", ch, Send)
if err != nil {
t.Fatal("exportSend:", err)
}
go func() {
for i := 0; i < n; i++ {
ch <- base + i
}
close(ch)
if done != nil {
done <- true
}
}()
}
func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) {
ch := make(chan int)
err := exp.Export("exportedRecv", ch, Recv)
expDone <- true
if err != nil {
t.Fatal("exportReceive:", err)
}
for i := 0; i < count; i++ {
v, ok := <-ch
if !ok {
if i != closeCount {
t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i)
}
break
}
if v != base+i {
t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v)
}
}
}
func importSend(imp *Importer, n int, t *testing.T, done chan bool) {
ch := make(chan int)
err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1)
if err != nil {
t.Fatal("importSend:", err)
}
go func() {
for i := 0; i < n; i++ {
ch <- base + i
}
close(ch)
if done != nil {
done <- true
}
}()
}
func importReceive(imp *Importer, t *testing.T, done chan bool) {
ch := make(chan int)
err := imp.ImportNValues("exportedSend", ch, Recv, 3, count)
if err != nil {
t.Fatal("importReceive:", err)
}
for i := 0; i < count; i++ {
v, ok := <-ch
if !ok {
if i != closeCount {
t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i)
}
break
}
if v != base+i {
t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v)
}
}
if done != nil {
done <- true
}
}
func TestExportSendImportReceive(t *testing.T) {
exp, imp := pair(t)
exportSend(exp, count, t, nil)
importReceive(imp, t, nil)
}
func TestExportReceiveImportSend(t *testing.T) {
exp, imp := pair(t)
expDone := make(chan bool)
done := make(chan bool)
go func() {
exportReceive(exp, t, expDone)
done <- true
}()
<-expDone
importSend(imp, count, t, nil)
<-done
}
func TestClosingExportSendImportReceive(t *testing.T) {
exp, imp := pair(t)
exportSend(exp, closeCount, t, nil)
importReceive(imp, t, nil)
}
func TestClosingImportSendExportReceive(t *testing.T) {
exp, imp := pair(t)
expDone := make(chan bool)
done := make(chan bool)
go func() {
exportReceive(exp, t, expDone)
done <- true
}()
<-expDone
importSend(imp, closeCount, t, nil)
<-done
}
func TestErrorForIllegalChannel(t *testing.T) {
exp, imp := pair(t)
// Now export a channel.
ch := make(chan int, 1)
err := exp.Export("aChannel", ch, Send)
if err != nil {
t.Fatal("export:", err)
}
ch <- 1234
close(ch)
// Now try to import a different channel.
ch = make(chan int)
err = imp.Import("notAChannel", ch, Recv, 1)
if err != nil {
t.Fatal("import:", err)
}
// Expect an error now. Start a timeout.
timeout := make(chan bool, 1) // buffered so closure will not hang around.
go func() {
time.Sleep(10 * time.Second) // very long, to give even really slow machines a chance.
timeout <- true
}()
select {
case err = <-imp.Errors():
if strings.Index(err.Error(), "no such channel") < 0 {
t.Error("wrong error for nonexistent channel:", err)
}
case <-timeout:
t.Error("import of nonexistent channel did not receive an error")
}
}
// Not a great test but it does at least invoke Drain.
func TestExportDrain(t *testing.T) {
exp, imp := pair(t)
done := make(chan bool)
go func() {
exportSend(exp, closeCount, t, nil)
done <- true
}()
<-done
go importReceive(imp, t, done)
exp.Drain(0)
<-done
}
// Not a great test but it does at least invoke Drain.
func TestImportDrain(t *testing.T) {
exp, imp := pair(t)
expDone := make(chan bool)
go exportReceive(exp, t, expDone)
<-expDone
importSend(imp, closeCount, t, nil)
imp.Drain(0)
}
// Not a great test but it does at least invoke Sync.
func TestExportSync(t *testing.T) {
exp, imp := pair(t)
done := make(chan bool)
exportSend(exp, closeCount, t, nil)
go importReceive(imp, t, done)
exp.Sync(0)
<-done
}
// Test hanging up the send side of an export.
// TODO: test hanging up the receive side of an export.
func TestExportHangup(t *testing.T) {
exp, imp := pair(t)
ech := make(chan int)
err := exp.Export("exportedSend", ech, Send)
if err != nil {
t.Fatal("export:", err)
}
// Prepare to receive two values. We'll actually deliver only one.
ich := make(chan int)
err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2)
if err != nil {
t.Fatal("import exportedSend:", err)
}
// Send one value, receive it.
const Value = 1234
ech <- Value
v := <-ich
if v != Value {
t.Fatal("expected", Value, "got", v)
}
// Now hang up the channel. Importer should see it close.
exp.Hangup("exportedSend")
v, ok := <-ich
if ok {
t.Fatal("expected channel to be closed; got value", v)
}
}
// Test hanging up the send side of an import.
// TODO: test hanging up the receive side of an import.
func TestImportHangup(t *testing.T) {
exp, imp := pair(t)
ech := make(chan int)
err := exp.Export("exportedRecv", ech, Recv)
if err != nil {
t.Fatal("export:", err)
}
// Prepare to Send two values. We'll actually deliver only one.
ich := make(chan int)
err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2)
if err != nil {
t.Fatal("import exportedRecv:", err)
}
// Send one value, receive it.
const Value = 1234
ich <- Value
v := <-ech
if v != Value {
t.Fatal("expected", Value, "got", v)
}
// Now hang up the channel. Exporter should see it close.
imp.Hangup("exportedRecv")
v, ok := <-ech
if ok {
t.Fatal("expected channel to be closed; got value", v)
}
}
// loop back exportedRecv to exportedSend,
// but receive a value from ctlch before starting the loop.
func exportLoopback(exp *Exporter, t *testing.T) {
inch := make(chan int)
if err := exp.Export("exportedRecv", inch, Recv); err != nil {
t.Fatal("exportRecv")
}
outch := make(chan int)
if err := exp.Export("exportedSend", outch, Send); err != nil {
t.Fatal("exportSend")
}
ctlch := make(chan int)
if err := exp.Export("exportedCtl", ctlch, Recv); err != nil {
t.Fatal("exportRecv")
}
go func() {
<-ctlch
for i := 0; i < count; i++ {
x := <-inch
if x != base+i {
t.Errorf("exportLoopback expected %d; got %d", i, x)
}
outch <- x
}
}()
}
// This test checks that channel operations can proceed
// even when other concurrent operations are blocked.
func TestIndependentSends(t *testing.T) {
if testing.Short() {
t.Logf("disabled test during -short")
return
}
exp, imp := pair(t)
exportLoopback(exp, t)
importSend(imp, count, t, nil)
done := make(chan bool)
go importReceive(imp, t, done)
// wait for export side to try to deliver some values.
time.Sleep(250 * time.Millisecond)
ctlch := make(chan int)
if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil {
t.Fatal("importSend:", err)
}
ctlch <- 0
<-done
}
// This test cross-connects a pair of exporter/importer pairs.
type value struct {
I int
Source string
}
func TestCrossConnect(t *testing.T) {
e1, i1 := pair(t)
e2, i2 := pair(t)
crossExport(e1, e2, t)
crossImport(i1, i2, t)
}
// Export side of cross-traffic.
func crossExport(e1, e2 *Exporter, t *testing.T) {
s := make(chan value)
err := e1.Export("exportedSend", s, Send)
if err != nil {
t.Fatal("exportSend:", err)
}
r := make(chan value)
err = e2.Export("exportedReceive", r, Recv)
if err != nil {
t.Fatal("exportReceive:", err)
}
go crossLoop("export", s, r, t)
}
// Import side of cross-traffic.
func crossImport(i1, i2 *Importer, t *testing.T) {
s := make(chan value)
err := i2.Import("exportedReceive", s, Send, 2)
if err != nil {
t.Fatal("import of exportedReceive:", err)
}
r := make(chan value)
err = i1.Import("exportedSend", r, Recv, 2)
if err != nil {
t.Fatal("import of exported Send:", err)
}
crossLoop("import", s, r, t)
}
// Cross-traffic: send and receive 'count' numbers.
func crossLoop(name string, s, r chan value, t *testing.T) {
for si, ri := 0, 0; si < count && ri < count; {
select {
case s <- value{si, name}:
si++
case v := <-r:
if v.I != ri {
t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v)
}
ri++
}
}
}
const flowCount = 100
// test flow control from exporter to importer.
func TestExportFlowControl(t *testing.T) {
if testing.Short() {
t.Logf("disabled test during -short")
return
}
exp, imp := pair(t)
sendDone := make(chan bool, 1)
exportSend(exp, flowCount, t, sendDone)
ch := make(chan int)
err := imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
if err != nil {
t.Fatal("importReceive:", err)
}
testFlow(sendDone, ch, flowCount, t)
}
// test flow control from importer to exporter.
func TestImportFlowControl(t *testing.T) {
if testing.Short() {
t.Logf("disabled test during -short")
return
}
exp, imp := pair(t)
ch := make(chan int)
err := exp.Export("exportedRecv", ch, Recv)
if err != nil {
t.Fatal("importReceive:", err)
}
sendDone := make(chan bool, 1)
importSend(imp, flowCount, t, sendDone)
testFlow(sendDone, ch, flowCount, t)
}
func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
go func() {
time.Sleep(500 * time.Millisecond)
sendDone <- false
}()
if <-sendDone {
t.Fatal("send did not block")
}
n := 0
for i := range ch {
t.Log("after blocking, got value ", i)
n++
}
if n != N {
t.Fatalf("expected %d values; got %d", N, n)
}
}
func pair(t *testing.T) (*Exporter, *Importer) {
c0, c1 := net.Pipe()
exp := NewExporter()
go exp.ServeConn(c0)
imp := NewImporter(c1)
return exp, imp
}