From da705c621205163a1af0325578d99e04df58837f Mon Sep 17 00:00:00 2001 From: Rob Pike Date: Mon, 20 Sep 2010 15:28:38 +1000 Subject: [PATCH] netchan: provide a method (Importer.Errors()) to recover protocol errors. R=rsc CC=golang-dev https://golang.org/cl/2229044 --- src/pkg/netchan/export.go | 2 +- src/pkg/netchan/import.go | 19 +++++++++++--- src/pkg/netchan/netchan_test.go | 45 ++++++++++++++++++++++++++++++++- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index d7dceead990..73a070c95ce 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -66,7 +66,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient { func (client *expClient) sendError(hdr *header, err string) { error := &error{err} - expLog("sending error to client", error.error) + expLog("sending error to client:", error.error) client.encode(hdr, payError, error) // ignore any encode error, hope client gets it client.mu.Lock() client.errored = true diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index e6bf4cbb328..bb19dd4702f 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -28,6 +28,7 @@ type Importer struct { conn net.Conn chanLock sync.Mutex // protects access to channel map chans map[string]*chanDir + errors chan os.Error } // NewImporter creates a new Importer object to import channels @@ -43,6 +44,7 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) { imp.encDec = newEncDec(conn) imp.conn = conn imp.chans = make(map[string]*chanDir) + imp.errors = make(chan os.Error, 10) go imp.run() return imp, nil } @@ -86,15 +88,18 @@ func (imp *Importer) run() { } if err.error != "" { impLog("response error:", err.error) - imp.shutdown() - return + if sent := imp.errors <- os.ErrorString(err.error); !sent { + imp.shutdown() + return + } + continue // errors are not acknowledged. } case payClosed: ich := imp.getChan(hdr.name) if ich != nil { ich.ch.Close() } - continue + continue // closes are not acknowledged. default: impLog("unexpected payload type:", hdr.payloadType) return @@ -132,6 +137,14 @@ func (imp *Importer) getChan(name string) *chanDir { return ich } +// Errors returns a channel from which transmission and protocol errors +// can be read. Clients of the importer are not required to read the error +// channel for correct execution. However, if too many errors occur +// without being read from the error channel, the importer will shut down. +func (imp *Importer) Errors() chan os.Error { + return imp.errors +} + // Import imports a channel of the given type and specified direction. // It is equivalent to ImportNValues with a count of -1, meaning unbounded. func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error { diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go index 42cb3d1ec1e..4240b07869c 100644 --- a/src/pkg/netchan/netchan_test.go +++ b/src/pkg/netchan/netchan_test.go @@ -4,7 +4,11 @@ package netchan -import "testing" +import ( + "strings" + "testing" + "time" +) const count = 10 // number of items in most tests const closeCount = 5 // number of items when sender closes early @@ -134,6 +138,45 @@ func TestClosingImportSendExportReceive(t *testing.T) { exportReceive(exp, t) } +func TestErrorForIllegalChannel(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + // Now export a channel. + ch := make(chan int, 1) + err = exp.Export("aChannel", ch, Send) + if err != nil { + t.Fatal("export:", err) + } + ch <- 1234 + close(ch) + // Now try to import a different channel. + ch = make(chan int) + err = imp.Import("notAChannel", ch, Recv) + if err != nil { + t.Fatal("import:", err) + } + // Expect an error now. Start a timeout. + timeout := make(chan bool, 1) // buffered so closure will not hang around. + go func() { + time.Sleep(10e9) // very long, to give even really slow machines a chance. + timeout <- true + }() + select { + case err = <-imp.Errors(): + if strings.Index(err.String(), "no such channel") < 0 { + t.Errorf("wrong error for nonexistent channel:", err) + } + case <-timeout: + t.Error("import of nonexistent channel did not receive an error") + } +} + // Not a great test but it does at least invoke Drain. func TestExportDrain(t *testing.T) { exp, err := NewExporter("tcp", "127.0.0.1:0")