mirror of
https://github.com/golang/go
synced 2024-11-24 22:57:57 -07:00
netchan: handle closing of channels.
This also silences some misleading logging. Also improve logging. R=rsc CC=golang-dev https://golang.org/cl/2245041
This commit is contained in:
parent
63623ba2d8
commit
1e4b1f9337
@ -37,6 +37,7 @@ const (
|
|||||||
payError // error structure follows
|
payError // error structure follows
|
||||||
payData // user payload follows
|
payData // user payload follows
|
||||||
payAck // acknowledgement; no payload
|
payAck // acknowledgement; no payload
|
||||||
|
payClosed // channel is now closed
|
||||||
)
|
)
|
||||||
|
|
||||||
// A header is sent as a prefix to every transmission. It will be followed by
|
// A header is sent as a prefix to every transmission. It will be followed by
|
||||||
|
@ -31,6 +31,12 @@ import (
|
|||||||
|
|
||||||
// Export
|
// Export
|
||||||
|
|
||||||
|
// expLog is a logging convenience function. The first argument must be a string.
|
||||||
|
func expLog(args ...interface{}) {
|
||||||
|
args[0] = "netchan export: " + args[0].(string)
|
||||||
|
log.Stderr(args)
|
||||||
|
}
|
||||||
|
|
||||||
// An Exporter allows a set of channels to be published on a single
|
// An Exporter allows a set of channels to be published on a single
|
||||||
// network port. A single machine may have multiple Exporters
|
// network port. A single machine may have multiple Exporters
|
||||||
// but they must use different ports.
|
// but they must use different ports.
|
||||||
@ -60,7 +66,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient {
|
|||||||
|
|
||||||
func (client *expClient) sendError(hdr *header, err string) {
|
func (client *expClient) sendError(hdr *header, err string) {
|
||||||
error := &error{err}
|
error := &error{err}
|
||||||
log.Stderr("export:", error.error)
|
expLog("sending error to client", error.error)
|
||||||
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
|
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
|
||||||
client.mu.Lock()
|
client.mu.Lock()
|
||||||
client.errored = true
|
client.errored = true
|
||||||
@ -96,13 +102,13 @@ func (client *expClient) run() {
|
|||||||
for {
|
for {
|
||||||
*hdr = header{}
|
*hdr = header{}
|
||||||
if err := client.decode(hdrValue); err != nil {
|
if err := client.decode(hdrValue); err != nil {
|
||||||
log.Stderr("error decoding client header:", err)
|
expLog("error decoding client header:", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
switch hdr.payloadType {
|
switch hdr.payloadType {
|
||||||
case payRequest:
|
case payRequest:
|
||||||
if err := client.decode(reqValue); err != nil {
|
if err := client.decode(reqValue); err != nil {
|
||||||
log.Stderr("error decoding client request:", err)
|
expLog("error decoding client request:", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
switch req.dir {
|
switch req.dir {
|
||||||
@ -114,12 +120,14 @@ func (client *expClient) run() {
|
|||||||
// The actual sends will have payload type payData.
|
// The actual sends will have payload type payData.
|
||||||
// TODO: manage the count?
|
// TODO: manage the count?
|
||||||
default:
|
default:
|
||||||
error.error = "export request: can't handle channel direction"
|
error.error = "request: can't handle channel direction"
|
||||||
log.Stderr(error.error, req.dir)
|
expLog(error.error, req.dir)
|
||||||
client.encode(hdr, payError, error)
|
client.encode(hdr, payError, error)
|
||||||
}
|
}
|
||||||
case payData:
|
case payData:
|
||||||
client.serveSend(*hdr)
|
client.serveSend(*hdr)
|
||||||
|
case payClosed:
|
||||||
|
client.serveClosed(*hdr)
|
||||||
case payAck:
|
case payAck:
|
||||||
client.mu.Lock()
|
client.mu.Lock()
|
||||||
if client.ackNum != hdr.seqNum-1 {
|
if client.ackNum != hdr.seqNum-1 {
|
||||||
@ -127,12 +135,14 @@ func (client *expClient) run() {
|
|||||||
// in a single instance of locking client.mu, the messages are guaranteed
|
// in a single instance of locking client.mu, the messages are guaranteed
|
||||||
// to be sent in order. Therefore receipt of acknowledgement N means
|
// to be sent in order. Therefore receipt of acknowledgement N means
|
||||||
// all messages <=N have been seen by the recipient. We check anyway.
|
// all messages <=N have been seen by the recipient. We check anyway.
|
||||||
log.Stderr("netchan export: sequence out of order:", client.ackNum, hdr.seqNum)
|
expLog("sequence out of order:", client.ackNum, hdr.seqNum)
|
||||||
}
|
}
|
||||||
if client.ackNum < hdr.seqNum { // If there has been an error, don't back up the count.
|
if client.ackNum < hdr.seqNum { // If there has been an error, don't back up the count.
|
||||||
client.ackNum = hdr.seqNum
|
client.ackNum = hdr.seqNum
|
||||||
}
|
}
|
||||||
client.mu.Unlock()
|
client.mu.Unlock()
|
||||||
|
default:
|
||||||
|
log.Exit("netchan export: unknown payload type", hdr.payloadType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
client.exp.delClient(client)
|
client.exp.delClient(client)
|
||||||
@ -148,7 +158,9 @@ func (client *expClient) serveRecv(hdr header, count int64) {
|
|||||||
for {
|
for {
|
||||||
val := ech.ch.Recv()
|
val := ech.ch.Recv()
|
||||||
if ech.ch.Closed() {
|
if ech.ch.Closed() {
|
||||||
client.sendError(&hdr, os.EOF.String())
|
if err := client.encode(&hdr, payClosed, nil); err != nil {
|
||||||
|
expLog("error encoding server closed message:", err)
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// We hold the lock during transmission to guarantee messages are
|
// We hold the lock during transmission to guarantee messages are
|
||||||
@ -161,7 +173,7 @@ func (client *expClient) serveRecv(hdr header, count int64) {
|
|||||||
err := client.encode(&hdr, payData, val.Interface())
|
err := client.encode(&hdr, payData, val.Interface())
|
||||||
client.mu.Unlock()
|
client.mu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Stderr("error encoding client response:", err)
|
expLog("error encoding client response:", err)
|
||||||
client.sendError(&hdr, err.String())
|
client.sendError(&hdr, err.String())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -184,11 +196,20 @@ func (client *expClient) serveSend(hdr header) {
|
|||||||
// Create a new value for each received item.
|
// Create a new value for each received item.
|
||||||
val := reflect.MakeZero(ech.ch.Type().(*reflect.ChanType).Elem())
|
val := reflect.MakeZero(ech.ch.Type().(*reflect.ChanType).Elem())
|
||||||
if err := client.decode(val); err != nil {
|
if err := client.decode(val); err != nil {
|
||||||
log.Stderr("exporter value decode:", err)
|
expLog("value decode:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ech.ch.Send(val)
|
ech.ch.Send(val)
|
||||||
// TODO count
|
}
|
||||||
|
|
||||||
|
// Report that client has closed the channel that is sending to us.
|
||||||
|
// The header is passed by value to avoid issues of overwriting.
|
||||||
|
func (client *expClient) serveClosed(hdr header) {
|
||||||
|
ech := client.getChan(&hdr, Recv)
|
||||||
|
if ech == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ech.ch.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *expClient) unackedCount() int64 {
|
func (client *expClient) unackedCount() int64 {
|
||||||
@ -217,7 +238,7 @@ func (exp *Exporter) listen() {
|
|||||||
for {
|
for {
|
||||||
conn, err := exp.listener.Accept()
|
conn, err := exp.listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Stderr("exporter.listen:", err)
|
expLog("listen:", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
client := exp.addClient(conn)
|
client := exp.addClient(conn)
|
||||||
|
@ -14,6 +14,12 @@ import (
|
|||||||
|
|
||||||
// Import
|
// Import
|
||||||
|
|
||||||
|
// impLog is a logging convenience function. The first argument must be a string.
|
||||||
|
func impLog(args ...interface{}) {
|
||||||
|
args[0] = "netchan import: " + args[0].(string)
|
||||||
|
log.Stderr(args)
|
||||||
|
}
|
||||||
|
|
||||||
// An Importer allows a set of channels to be imported from a single
|
// An Importer allows a set of channels to be imported from a single
|
||||||
// remote machine/network port. A machine may have multiple
|
// remote machine/network port. A machine may have multiple
|
||||||
// importers, even from the same machine/network port.
|
// importers, even from the same machine/network port.
|
||||||
@ -66,7 +72,7 @@ func (imp *Importer) run() {
|
|||||||
for {
|
for {
|
||||||
*hdr = header{}
|
*hdr = header{}
|
||||||
if e := imp.decode(hdrValue); e != nil {
|
if e := imp.decode(hdrValue); e != nil {
|
||||||
log.Stderr("importer header:", e)
|
impLog("header:", e)
|
||||||
imp.shutdown()
|
imp.shutdown()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -75,27 +81,30 @@ func (imp *Importer) run() {
|
|||||||
// done lower in loop
|
// done lower in loop
|
||||||
case payError:
|
case payError:
|
||||||
if e := imp.decode(errValue); e != nil {
|
if e := imp.decode(errValue); e != nil {
|
||||||
log.Stderr("importer error:", e)
|
impLog("error:", e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err.error != "" {
|
if err.error != "" {
|
||||||
log.Stderr("importer response error:", err.error)
|
impLog("response error:", err.error)
|
||||||
imp.shutdown()
|
imp.shutdown()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case payClosed:
|
||||||
|
ich := imp.getChan(hdr.name)
|
||||||
|
if ich != nil {
|
||||||
|
ich.ch.Close()
|
||||||
|
}
|
||||||
|
continue
|
||||||
default:
|
default:
|
||||||
log.Stderr("unexpected payload type:", hdr.payloadType)
|
impLog("unexpected payload type:", hdr.payloadType)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
imp.chanLock.Lock()
|
ich := imp.getChan(hdr.name)
|
||||||
ich, ok := imp.chans[hdr.name]
|
if ich == nil {
|
||||||
imp.chanLock.Unlock()
|
continue
|
||||||
if !ok {
|
|
||||||
log.Stderr("unknown name in request:", hdr.name)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if ich.dir != Recv {
|
if ich.dir != Recv {
|
||||||
log.Stderr("cannot happen: receive from non-Recv channel")
|
impLog("cannot happen: receive from non-Recv channel")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Acknowledge receipt
|
// Acknowledge receipt
|
||||||
@ -105,13 +114,24 @@ func (imp *Importer) run() {
|
|||||||
// Create a new value for each received item.
|
// Create a new value for each received item.
|
||||||
value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem())
|
value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem())
|
||||||
if e := imp.decode(value); e != nil {
|
if e := imp.decode(value); e != nil {
|
||||||
log.Stderr("importer value decode:", e)
|
impLog("importer value decode:", e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ich.ch.Send(value)
|
ich.ch.Send(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (imp *Importer) getChan(name string) *chanDir {
|
||||||
|
imp.chanLock.Lock()
|
||||||
|
ich := imp.chans[name]
|
||||||
|
imp.chanLock.Unlock()
|
||||||
|
if ich == nil {
|
||||||
|
impLog("unknown name in netchan request:", name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ich
|
||||||
|
}
|
||||||
|
|
||||||
// Import imports a channel of the given type and specified direction.
|
// Import imports a channel of the given type and specified direction.
|
||||||
// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
|
// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
|
||||||
func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
|
func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
|
||||||
@ -145,18 +165,24 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int)
|
|||||||
}
|
}
|
||||||
imp.chans[name] = &chanDir{ch, dir}
|
imp.chans[name] = &chanDir{ch, dir}
|
||||||
// Tell the other side about this channel.
|
// Tell the other side about this channel.
|
||||||
hdr := &header{name: name, payloadType: payRequest}
|
hdr := &header{name: name}
|
||||||
req := &request{count: int64(n), dir: dir}
|
req := &request{count: int64(n), dir: dir}
|
||||||
if err := imp.encode(hdr, payRequest, req); err != nil {
|
if err = imp.encode(hdr, payRequest, req); err != nil {
|
||||||
log.Stderr("importer request encode:", err)
|
impLog("request encode:", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if dir == Send {
|
if dir == Send {
|
||||||
go func() {
|
go func() {
|
||||||
for i := 0; n == -1 || i < n; i++ {
|
for i := 0; n == -1 || i < n; i++ {
|
||||||
val := ch.Recv()
|
val := ch.Recv()
|
||||||
if err := imp.encode(hdr, payData, val.Interface()); err != nil {
|
if ch.Closed() {
|
||||||
log.Stderr("error encoding client response:", err)
|
if err = imp.encode(hdr, payClosed, nil); err != nil {
|
||||||
|
impLog("error encoding client closed message:", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = imp.encode(hdr, payData, val.Interface()); err != nil {
|
||||||
|
impLog("error encoding client send:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,8 @@ import "testing"
|
|||||||
const count = 10 // number of items in most tests
|
const count = 10 // number of items in most tests
|
||||||
const closeCount = 5 // number of items when sender closes early
|
const closeCount = 5 // number of items when sender closes early
|
||||||
|
|
||||||
|
const base = 23
|
||||||
|
|
||||||
func exportSend(exp *Exporter, n int, t *testing.T) {
|
func exportSend(exp *Exporter, n int, t *testing.T) {
|
||||||
ch := make(chan int)
|
ch := make(chan int)
|
||||||
err := exp.Export("exportedSend", ch, Send)
|
err := exp.Export("exportedSend", ch, Send)
|
||||||
@ -17,7 +19,7 @@ func exportSend(exp *Exporter, n int, t *testing.T) {
|
|||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
ch <- 23+i
|
ch <- base+i
|
||||||
}
|
}
|
||||||
close(ch)
|
close(ch)
|
||||||
}()
|
}()
|
||||||
@ -31,12 +33,32 @@ func exportReceive(exp *Exporter, t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
v := <-ch
|
v := <-ch
|
||||||
if v != 45+i {
|
if closed(ch) {
|
||||||
t.Errorf("export Receive: bad value: expected 4%d; got %d", 45+i, v)
|
if i != closeCount {
|
||||||
|
t.Errorf("exportReceive expected close at %d; got one at %d\n", closeCount, i)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if v != base+i {
|
||||||
|
t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func importSend(imp *Importer, n int, t *testing.T) {
|
||||||
|
ch := make(chan int)
|
||||||
|
err := imp.ImportNValues("exportedRecv", ch, Send, count)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("importSend:", err)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
ch <- base+i
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func importReceive(imp *Importer, t *testing.T, done chan bool) {
|
func importReceive(imp *Importer, t *testing.T, done chan bool) {
|
||||||
ch := make(chan int)
|
ch := make(chan int)
|
||||||
err := imp.ImportNValues("exportedSend", ch, Recv, count)
|
err := imp.ImportNValues("exportedSend", ch, Recv, count)
|
||||||
@ -47,12 +69,12 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) {
|
|||||||
v := <-ch
|
v := <-ch
|
||||||
if closed(ch) {
|
if closed(ch) {
|
||||||
if i != closeCount {
|
if i != closeCount {
|
||||||
t.Errorf("expected close at %d; got one at %d\n", closeCount, i)
|
t.Errorf("importReceive expected close at %d; got one at %d\n", closeCount, i)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if v != 23+i {
|
if v != 23+i {
|
||||||
t.Errorf("importReceive: bad value: expected %d; got %+d", 23+i, v)
|
t.Errorf("importReceive: bad value: expected %%d+%d=%d; got %+d", base, i, base+i, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if done != nil {
|
if done != nil {
|
||||||
@ -60,17 +82,6 @@ func importReceive(imp *Importer, t *testing.T, done chan bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func importSend(imp *Importer, t *testing.T) {
|
|
||||||
ch := make(chan int)
|
|
||||||
err := imp.ImportNValues("exportedRecv", ch, Send, count)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("importSend:", err)
|
|
||||||
}
|
|
||||||
for i := 0; i < count; i++ {
|
|
||||||
ch <- 45+i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestExportSendImportReceive(t *testing.T) {
|
func TestExportSendImportReceive(t *testing.T) {
|
||||||
exp, err := NewExporter("tcp", "127.0.0.1:0")
|
exp, err := NewExporter("tcp", "127.0.0.1:0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -93,7 +104,7 @@ func TestExportReceiveImportSend(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("new importer:", err)
|
t.Fatal("new importer:", err)
|
||||||
}
|
}
|
||||||
go importSend(imp, t)
|
importSend(imp, count, t)
|
||||||
exportReceive(exp, t)
|
exportReceive(exp, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,6 +121,19 @@ func TestClosingExportSendImportReceive(t *testing.T) {
|
|||||||
importReceive(imp, t, nil)
|
importReceive(imp, t, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClosingImportSendExportReceive(t *testing.T) {
|
||||||
|
exp, err := NewExporter("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("new exporter:", err)
|
||||||
|
}
|
||||||
|
imp, err := NewImporter("tcp", exp.Addr().String())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("new importer:", err)
|
||||||
|
}
|
||||||
|
importSend(imp, closeCount, t)
|
||||||
|
exportReceive(exp, t)
|
||||||
|
}
|
||||||
|
|
||||||
// Not a great test but it does at least invoke Drain.
|
// Not a great test but it does at least invoke Drain.
|
||||||
func TestExportDrain(t *testing.T) {
|
func TestExportDrain(t *testing.T) {
|
||||||
exp, err := NewExporter("tcp", "127.0.0.1:0")
|
exp, err := NewExporter("tcp", "127.0.0.1:0")
|
||||||
|
Loading…
Reference in New Issue
Block a user